From 3937d6341112cec0aa0f4af4d6b54b5448fd189f Mon Sep 17 00:00:00 2001 From: jyjiangkai Date: Sun, 18 Sep 2022 19:22:18 +0800 Subject: [PATCH 1/8] test env --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 1080edc..e1aa4a0 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ # Vanus +test [![License](https://img.shields.io/badge/License-Apache_2.0-green.svg)](https://github.com/linkall-labs/vanus/blob/main/LICENSE) [![Language](https://img.shields.io/badge/Language-Go-blue.svg)](https://golang.org/) From 0eb36f92ee689bac5bc0fa018f33ab548ca2ad6a Mon Sep 17 00:00:00 2001 From: jyjiangkai Date: Sun, 18 Sep 2022 19:27:41 +0800 Subject: [PATCH 2/8] test env --- .github/workflows/e2e.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 6bc73b1..361d0e0 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -39,6 +39,7 @@ jobs: run: | export IMAGE_TAG=$(git log -1 --format='%h' | awk '{print $0}') export VERSION=$(cat ./deploy/all-in-one.yaml| grep -w "image" | awk -F ":" '{print $NF}' | awk 'FNR == 1') + export BUILDPLATFORM=$(uname -m) sed -i "s/${VERSION}/${IMAGE_TAG}/g" ./deploy/all-in-one.yaml eval $(minikube docker-env) make docker-build From fb812b88eb24771fb623143e4316e0e76e6cc2ba Mon Sep 17 00:00:00 2001 From: jyjiangkai Date: Sun, 18 Sep 2022 19:40:25 +0800 Subject: [PATCH 3/8] test env --- build/images/controller/Dockerfile | 2 +- build/images/gateway/Dockerfile | 2 +- build/images/store/Dockerfile | 2 +- build/images/trigger/Dockerfile | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/build/images/controller/Dockerfile b/build/images/controller/Dockerfile index 56fb86e..4cb0e10 100644 --- a/build/images/controller/Dockerfile +++ b/build/images/controller/Dockerfile @@ -1,4 +1,4 @@ -FROM --platform=$BUILDPLATFORM golang:1.18 as builder +FROM --platform=amd64 golang:1.18 as builder WORKDIR /workspace COPY . . diff --git a/build/images/gateway/Dockerfile b/build/images/gateway/Dockerfile index 396dc45..7c4f6f5 100644 --- a/build/images/gateway/Dockerfile +++ b/build/images/gateway/Dockerfile @@ -1,4 +1,4 @@ -FROM --platform=$BUILDPLATFORM golang:1.18 as builder +FROM --platform=amd64 golang:1.18 as builder WORKDIR /workspace COPY . . diff --git a/build/images/store/Dockerfile b/build/images/store/Dockerfile index ebb6d19..13da9ee 100644 --- a/build/images/store/Dockerfile +++ b/build/images/store/Dockerfile @@ -1,4 +1,4 @@ -FROM --platform=$BUILDPLATFORM golang:1.18 as builder +FROM --platform=amd64 golang:1.18 as builder WORKDIR /workspace COPY . . diff --git a/build/images/trigger/Dockerfile b/build/images/trigger/Dockerfile index 630f878..44b3738 100644 --- a/build/images/trigger/Dockerfile +++ b/build/images/trigger/Dockerfile @@ -1,4 +1,4 @@ -FROM --platform=$BUILDPLATFORM golang:1.18 as builder +FROM --platform=amd64 golang:1.18 as builder WORKDIR /workspace COPY . . From 0425c564ba94d029c865a91772ef20608bffd490 Mon Sep 17 00:00:00 2001 From: jyjiangkai Date: Sun, 18 Sep 2022 20:59:52 +0800 Subject: [PATCH 4/8] add smoke --- .github/workflows/smoke.yml | 63 +++++++ Makefile | 3 + test/smoke/main.go | 329 ++++++++++++++++++++++++++++++++++++ 3 files changed, 395 insertions(+) create mode 100644 .github/workflows/smoke.yml create mode 100644 test/smoke/main.go diff --git a/.github/workflows/smoke.yml b/.github/workflows/smoke.yml new file mode 100644 index 0000000..9766ee9 --- /dev/null +++ b/.github/workflows/smoke.yml @@ -0,0 +1,63 @@ +name: smoke +on: + pull_request: + branches: + - main +permissions: + contents: read + # Optional: allow read access to pull request. Use with `only-new-issues` option. + # pull-requests: read- +jobs: + smoke: + name: smoke + if: | + startsWith(github.event.pull_request.title, 'fix:') || + startsWith(github.event.pull_request.title, 'fix(') || + startsWith(github.event.pull_request.title, 'feat:') || + startsWith(github.event.pull_request.title, 'feat(') || + startsWith(github.event.pull_request.title, 'feat!:') || + startsWith(github.event.pull_request.title, 'refactor:') || + startsWith(github.event.pull_request.title, 'refactor(') || + true + runs-on: ubuntu-latest + env: + VANUS_GATEWAY: 192.168.49.2:30001 + BUILDPLATFORM: linux/amd64 + + steps: + - name: Check out repository code + uses: actions/checkout@v3 + - name: Environmental preparation + run: | + curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 + sudo install minikube-linux-amd64 /usr/local/bin/minikube + minikube start + curl -LO https://dl.k8s.io/release/v1.24.0/bin/linux/amd64/kubectl + sudo install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl + + - name: Deploy Vanus + run: | + export IMAGE_TAG=$(git log -1 --format='%h' | awk '{print $0}') + export VERSION=$(cat ./deploy/all-in-one.yaml| grep -w "image" | awk -F ":" '{print $NF}' | awk 'FNR == 1') + export BUILDPLATFORM=$(uname -m) + sed -i "s/${VERSION}/${IMAGE_TAG}/g" ./deploy/all-in-one.yaml + eval $(minikube docker-env) + make docker-build + kubectl apply -f ./deploy/all-in-one.yaml + kubectl apply -f ./test/yaml/display.yml + kubectl apply -f ./test/yaml/etcd-srv.yml + # make build-cmd + # chmod ug+x ./bin/vsctl + # sudo mv ./bin/vsctl /usr/local/bin/vsctl + sleep 60s && for i in $(seq 1 20); do kubectl get pod -n vanus --no-headers | grep -v Run || break;sleep 5s;done + + - name: Exec smoke + run: | + make build-smoke + ./bin/smoke + + - name: Check smoke + run: | + kubectl get pod -n vanus | grep quick-display | awk '{print $1}' | xargs kubectl logs -n vanus | grep -n "total" | wc -l + # [[ $(kubectl get pod -n vanus | grep quick-display | awk '{print $1}' | xargs kubectl logs -n vanus | grep -n "total" | wc -l) -eq 10001 ]] && echo "success" || echo "failed" + diff --git a/Makefile b/Makefile index 51f325b..f3bcae9 100644 --- a/Makefile +++ b/Makefile @@ -83,6 +83,9 @@ build-gw-util: build-e2e: go build -o bin/e2e test/e2e/quick-start/main.go +build-smoke: + go build -o bin/smoke test/smoke/main.go + build-destruct: go build -o bin/destruct test/e2e/destruct/main.go diff --git a/test/smoke/main.go b/test/smoke/main.go new file mode 100644 index 0000000..1162fc2 --- /dev/null +++ b/test/smoke/main.go @@ -0,0 +1,329 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "encoding/json" + "fmt" + "os" + "strconv" + "strings" + "sync" + "time" + + ce "github.com/cloudevents/sdk-go/v2" + "github.com/go-resty/resty/v2" + "github.com/google/uuid" + log "k8s.io/klog/v2" + + "github.com/fatih/color" + "github.com/linkall-labs/vanus/internal/kv" + "github.com/linkall-labs/vanus/internal/kv/etcd" + ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller" + "github.com/linkall-labs/vanus/proto/pkg/meta" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const ( + VolumeKeyPrefixInKVStore = "/vanus/internal/resource/volume/metadata" + BlockKeyPrefixInKVStore = "/vanus/internal/resource/volume/block" + VolumeInstanceKeyPrefixInKVStore = "/vanus/internal/resource/volume/instance" + + EventbusKeyPrefixInKVStore = "/vanus/internal/resource/eventbus" + EventlogKeyPrefixInKVStore = "/vanus/internal/resource/eventlog" + SegmentKeyPrefixInKVStore = "/vanus/internal/resource/segment" + + EventlogSegmentsKeyPrefixInKVStore = "/vanus/internal/resource/segs_of_eventlog" +) + +const ( + HttpPrefix = "http://" + EventBus = "quick-start" +) + +var ( + Sink = "http://quick-display:80" + Source = "" + Filters = "" + Transformer = "" + + EventType = "examples" + EventBody = "Hello Vanus" + EventSource = "quick-start" + + HttpClient = resty.New() + Endpoint = os.Getenv("VANUS_GATEWAY") + EtcdClient kv.Client + err error +) + +func init() { + kvStoreEndpoints := []string{"192.168.49.2:30007"} + kvKeyPrefix := "/vanus" + EtcdClient, err = etcd.NewEtcdClientV3(kvStoreEndpoints, kvKeyPrefix) + if err != nil { + log.Fatalf("NewEtcdClientV3 failed, err: %+v\n", err) + } +} + +func mustGetControllerProxyConn(ctx context.Context) *grpc.ClientConn { + splits := strings.Split(os.Getenv("VANUS_GATEWAY"), ":") + port, err := strconv.Atoi(splits[1]) + if err != nil { + log.Error("parsing gateway port failed") + return nil + } + leaderConn := createGRPCConn(ctx, fmt.Sprintf("%s:%d", splits[0], port+2)) + if leaderConn == nil { + log.Error("failed to connect to gateway") + return nil + } + return leaderConn +} + +func createGRPCConn(ctx context.Context, addr string) *grpc.ClientConn { + if addr == "" { + return nil + } + addr = strings.TrimPrefix(addr, "http://") + var err error + var opts []grpc.DialOption + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + opts = append(opts, grpc.WithBlock()) + ctx, cancel := context.WithCancel(ctx) + timeout := false + go func() { + ticker := time.Tick(time.Second) + select { + case <-ctx.Done(): + case <-ticker: + cancel() + timeout = true + } + }() + conn, err := grpc.DialContext(ctx, addr, opts...) + cancel() + if timeout { + color.Yellow("dial to controller: %s timeout, try to another controller", addr) + return nil + } else if err != nil { + color.Red("dial to controller: %s failed", addr) + return nil + } + return conn +} + +func createEventbus(eb string) error { + ctx := context.Background() + grpcConn := mustGetControllerProxyConn(ctx) + defer func() { + _ = grpcConn.Close() + }() + + cli := ctrlpb.NewEventBusControllerClient(grpcConn) + res, err := cli.CreateEventBus(ctx, &ctrlpb.CreateEventBusRequest{ + Name: eb, + }) + if err != nil { + log.Errorf("create eventbus failed, err: %s", err) + return err + } + log.Infof("create eventbus[%s] success.", res.Name) + return nil +} + +func createSubscription(eventbus, sink, source, filters, transformer string) error { + ctx := context.Background() + grpcConn := mustGetControllerProxyConn(ctx) + defer func() { + _ = grpcConn.Close() + }() + var filter []*meta.Filter + if filters != "" { + err := json.Unmarshal([]byte(filters), &filter) + if err != nil { + log.Errorf("the filter invalid, err: %s", err) + return err + } + } + + var trans *meta.Transformer + if transformer != "" { + err := json.Unmarshal([]byte(transformer), &trans) + if err != nil { + log.Errorf("the transformer invalid, err: %s", err) + return err + } + } + + cli := ctrlpb.NewTriggerControllerClient(grpcConn) + res, err := cli.CreateSubscription(ctx, &ctrlpb.CreateSubscriptionRequest{ + Subscription: &ctrlpb.SubscriptionRequest{ + Source: source, + Filters: filter, + Sink: sink, + EventBus: eventbus, + Transformer: trans, + }, + }) + if err != nil { + log.Errorf("create subscription failed, err: %s", err) + return err + } + log.Infof("create subscription[%d] success.", res.Id) + return nil +} + +func putEvent(eventbus, eventID, eventType, eventBody, eventSource string) error { + p, err := ce.NewHTTP() + if err != nil { + log.Errorf("init ce protocol error: %s\n", err) + return err + } + c, err := ce.NewClient(p, ce.WithTimeNow(), ce.WithUUIDs()) + if err != nil { + log.Errorf("create ce client error: %s\n", err) + return err + } + + if eventID == "" { + eventID = uuid.NewString() + } + + ceCtx := ce.ContextWithTarget(context.Background(), fmt.Sprintf("%s%s/gateway/%s", HttpPrefix, Endpoint, eventbus)) + event := ce.NewEvent() + event.SetID(eventID) + event.SetSource(eventSource) + event.SetType(eventType) + err = event.SetData(ce.TextPlain, eventBody) + if err != nil { + log.Errorf("set data failed: %s\n", err) + return err + } + c.Send(ceCtx, event) + log.Infof("put event[%s] success.", event.ID()) + return nil +} + +func putEvents(offset, eventNum, threadNum int64, eventBus, eventBody, eventSource string) error { + var ( + i int64 + eventid int64 = offset + wg sync.WaitGroup + ) + for i = 1; i <= threadNum; i++ { + first := eventid + last := eventid + eventNum/threadNum + wg.Add(1) + go func(first, last int64) { + for n := first; n < last; n++ { + putEvent(eventBus, fmt.Sprintf("%d", n), EventType, eventBody, eventSource) + } + wg.Done() + }(first, last) + eventid = eventid + eventNum/threadNum + } + wg.Wait() + return nil +} + +func getEvent(eventbus, offset, number string) error { + idx := strings.LastIndex(Endpoint, ":") + port, err := strconv.Atoi(Endpoint[idx+1:]) + if err != nil { + log.Errorf("parse gateway port failed: %s, endpoint: %s", err, Endpoint) + return err + } + newEndpoint := fmt.Sprintf("%s:%d", Endpoint[:idx], port+1) + url := fmt.Sprintf("%s%s/getEvents?eventbus=%s&offset=%s&number=%s", HttpPrefix, newEndpoint, eventbus, offset, number) + event, err := HttpClient.NewRequest().Get(url) + if err != nil { + log.Errorf("get event from eventbus[%s]&offset[%s]&number[%s] failed, err: %s\n", eventbus, offset, number, err) + return err + } + log.Infof("get event from eventbus[%s]&offset[%s]&number[%s] success, event: %s\n", eventbus, offset, number, event.String()) + return nil +} + +func Test_e2e_base() { + eventBus := "eventbus-base" + err = createEventbus(eventBus) + if err != nil { + return + } + + err = createSubscription(eventBus, Sink, Source, Filters, Transformer) + if err != nil { + return + } + + // putEvents(0, 10000, 100, eventBus, EventBody, EventSource) + putEvent(eventBus, "id", EventType, EventBody, EventSource) + + err = getEvent(eventBus, "0", "1") + if err != nil { + log.Error("Test_e2e_base get event failed") + return + } + log.Info("Test_e2e_base get event success") +} + +func Test_e2e_filter() { + eventBus := "eventbus-filter" + err = createEventbus(eventBus) + if err != nil { + return + } + + filters := "[{\"exact\": {\"source\":\"filter\"}}]" + err = createSubscription(eventBus, Sink, Source, filters, Transformer) + if err != nil { + return + } + + filters = "[{\"cel\": \"$key.(string) == \\\"value\\\"\"}]" + err = createSubscription(eventBus, Sink, Source, filters, Transformer) + if err != nil { + return + } + + // putEvents(0, 2000, 100, eventBus, EventBody, EventSource) + putEvent(eventBus, "id", EventType, EventBody, EventSource) + eventSource := "filter" + // putEvents(2000, 4000, 10, eventBus, EventBody, eventSource) + putEvent(eventBus, "id", EventType, EventBody, eventSource) + eventBody := "{\"key\":\"value\"}" + // putEvents(4000, 4000, 100, eventBus, eventBody, EventSource) + putEvent(eventBus, "id", EventType, eventBody, eventSource) + + err = getEvent(eventBus, "0", "2") + if err != nil { + log.Error("Test_e2e_filter get event failed") + return + } + log.Info("Test_e2e_filter get event success") +} + +func main() { + log.Info("start e2e test base case...") + + Test_e2e_base() + + Test_e2e_filter() + + log.Info("finish e2e test base case...") +} From 612e29737c5d0d950dc93826b620f119c2eda9c8 Mon Sep 17 00:00:00 2001 From: jyjiangkai Date: Sun, 18 Sep 2022 21:15:51 +0800 Subject: [PATCH 5/8] add smoke --- .github/workflows/smoke.yml | 8 +++++++- test/smoke/main.go | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/.github/workflows/smoke.yml b/.github/workflows/smoke.yml index 9766ee9..0375429 100644 --- a/.github/workflows/smoke.yml +++ b/.github/workflows/smoke.yml @@ -59,5 +59,11 @@ jobs: - name: Check smoke run: | kubectl get pod -n vanus | grep quick-display | awk '{print $1}' | xargs kubectl logs -n vanus | grep -n "total" | wc -l - # [[ $(kubectl get pod -n vanus | grep quick-display | awk '{print $1}' | xargs kubectl logs -n vanus | grep -n "total" | wc -l) -eq 10001 ]] && echo "success" || echo "failed" + echo sleep5s + sleep 5s + kubectl get pod -n vanus | grep quick-display | awk '{print $1}' | xargs kubectl logs -n vanus | grep -n "total" | wc -l + echo sleep5s + sleep 5s + kubectl get pod -n vanus | grep quick-display | awk '{print $1}' | xargs kubectl logs -n vanus | grep -n "total" | wc -l + [[ $(kubectl get pod -n vanus | grep quick-display | awk '{print $1}' | xargs kubectl logs -n vanus | grep -n "total" | wc -l) -eq 4 ]] && echo "success" || echo "failed" diff --git a/test/smoke/main.go b/test/smoke/main.go index 1162fc2..3067774 100644 --- a/test/smoke/main.go +++ b/test/smoke/main.go @@ -255,7 +255,7 @@ func getEvent(eventbus, offset, number string) error { log.Errorf("get event from eventbus[%s]&offset[%s]&number[%s] failed, err: %s\n", eventbus, offset, number, err) return err } - log.Infof("get event from eventbus[%s]&offset[%s]&number[%s] success, event: %s\n", eventbus, offset, number, event.String()) + log.Infof("get event from eventbus[%s]&offset[%s]&number[%s] success, event: %+v\n", eventbus, offset, number, event) return nil } From a75f9536453497d5b4bb8f487e93aef637efe666 Mon Sep 17 00:00:00 2001 From: jyjiangkai Date: Sun, 18 Sep 2022 21:38:48 +0800 Subject: [PATCH 6/8] add smoke --- .github/workflows/smoke.yml | 20 ++++++++++-- test/smoke/main.go | 65 ++++++++++++++++++++++++++++++------- 2 files changed, 70 insertions(+), 15 deletions(-) diff --git a/.github/workflows/smoke.yml b/.github/workflows/smoke.yml index 0375429..8d04b0e 100644 --- a/.github/workflows/smoke.yml +++ b/.github/workflows/smoke.yml @@ -46,10 +46,14 @@ jobs: kubectl apply -f ./deploy/all-in-one.yaml kubectl apply -f ./test/yaml/display.yml kubectl apply -f ./test/yaml/etcd-srv.yml - # make build-cmd - # chmod ug+x ./bin/vsctl - # sudo mv ./bin/vsctl /usr/local/bin/vsctl + make build-cmd + chmod ug+x ./bin/vsctl + sudo mv ./bin/vsctl /usr/local/bin/vsctl sleep 60s && for i in $(seq 1 20); do kubectl get pod -n vanus --no-headers | grep -v Run || break;sleep 5s;done + echo "Post Deploy Vanus" + kubectl get pod -n vanus + minikube service list + vsctl cluster controller topology - name: Exec smoke run: | @@ -58,6 +62,16 @@ jobs: - name: Check smoke run: | + vsctl event get eventbus-base --offset 0 --number 5 + vsctl event get eventbus-filter --offset 0 --number 5 + vsctl eventbus create --name "quick-start" + vsctl event put quick-start --data-format plain --body "Hello Vanus" --id "1" --source "quick-start" + echo sleep1s + sleep 1s + vsctl event get quick-start --offset 0 --number 1 + echo sleep1s + sleep 1s + vsctl event get quick-start --offset 0 --number 1 kubectl get pod -n vanus | grep quick-display | awk '{print $1}' | xargs kubectl logs -n vanus | grep -n "total" | wc -l echo sleep5s sleep 5s diff --git a/test/smoke/main.go b/test/smoke/main.go index 3067774..330caf7 100644 --- a/test/smoke/main.go +++ b/test/smoke/main.go @@ -255,7 +255,7 @@ func getEvent(eventbus, offset, number string) error { log.Errorf("get event from eventbus[%s]&offset[%s]&number[%s] failed, err: %s\n", eventbus, offset, number, err) return err } - log.Infof("get event from eventbus[%s]&offset[%s]&number[%s] success, event: %+v\n", eventbus, offset, number, event) + log.Infof("get event from eventbus[%s]&offset[%s]&number[%s] success, event: %s\n", eventbus, offset, number, event.String()) return nil } @@ -271,10 +271,9 @@ func Test_e2e_base() { return } - // putEvents(0, 10000, 100, eventBus, EventBody, EventSource) - putEvent(eventBus, "id", EventType, EventBody, EventSource) + putEvents(0, 2, 1, eventBus, EventBody, EventSource) - err = getEvent(eventBus, "0", "1") + err = getEvent(eventBus, "0", "10000") if err != nil { log.Error("Test_e2e_base get event failed") return @@ -301,16 +300,13 @@ func Test_e2e_filter() { return } - // putEvents(0, 2000, 100, eventBus, EventBody, EventSource) - putEvent(eventBus, "id", EventType, EventBody, EventSource) + putEvents(0, 2000, 100, eventBus, EventBody, EventSource) eventSource := "filter" - // putEvents(2000, 4000, 10, eventBus, EventBody, eventSource) - putEvent(eventBus, "id", EventType, EventBody, eventSource) + putEvents(2000, 4000, 10, eventBus, EventBody, eventSource) eventBody := "{\"key\":\"value\"}" - // putEvents(4000, 4000, 100, eventBus, eventBody, EventSource) - putEvent(eventBus, "id", EventType, eventBody, eventSource) + putEvents(4000, 4000, 100, eventBus, eventBody, EventSource) - err = getEvent(eventBus, "0", "2") + err = getEvent(eventBus, "0", "8000") if err != nil { log.Error("Test_e2e_filter get event failed") return @@ -318,12 +314,57 @@ func Test_e2e_filter() { log.Info("Test_e2e_filter get event success") } +func Test_e2e_transformation() { + eventBus := "eventbus-transformation" + err = createEventbus(eventBus) + if err != nil { + return + } + + transformer := "{\"template\": \"{\\\"transKey\\\": \\\"transValue\\\"}\"}" + err = createSubscription(eventBus, Sink, Source, Filters, transformer) + if err != nil { + return + } + + putEvents(0, 10000, 100, eventBus, EventBody, EventSource) + + err = getEvent(eventBus, "0", "10000") + if err != nil { + log.Error("Test_e2e_transformation get event failed") + return + } + log.Info("Test_e2e_filter get event success") +} + +func Test_e2e_metadata() { + eventBus := "eventbus-meta" + err = createEventbus(eventBus) + if err != nil { + return + } + + // Currently, only check metadata of eventbus + var path string = fmt.Sprintf("%s/%s", EventbusKeyPrefixInKVStore, eventBus) + ctx := context.Background() + meta, err := EtcdClient.Get(ctx, path) + if err != nil { + log.Errorf("get metadata failed, path: %s, err: %s\n", path, err.Error()) + return + } + log.Infof("get metadata success, path: %s, mata: %s\n", path, string(meta)) +} + func main() { log.Info("start e2e test base case...") Test_e2e_base() - Test_e2e_filter() + // Test_e2e_filter() + + // Test_e2e_transformation() + + // Test_e2e_metadata() log.Info("finish e2e test base case...") } From 32da79e04de99a930f3722d6bce76dd7f5c88cd7 Mon Sep 17 00:00:00 2001 From: jyjiangkai Date: Wed, 5 Oct 2022 23:06:59 +0800 Subject: [PATCH 7/8] test --- .github/workflows/smoke.yml | 8 +------- test/smoke/main.go | 21 ++++++++++----------- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/.github/workflows/smoke.yml b/.github/workflows/smoke.yml index 8d04b0e..4fad302 100644 --- a/.github/workflows/smoke.yml +++ b/.github/workflows/smoke.yml @@ -11,13 +11,6 @@ jobs: smoke: name: smoke if: | - startsWith(github.event.pull_request.title, 'fix:') || - startsWith(github.event.pull_request.title, 'fix(') || - startsWith(github.event.pull_request.title, 'feat:') || - startsWith(github.event.pull_request.title, 'feat(') || - startsWith(github.event.pull_request.title, 'feat!:') || - startsWith(github.event.pull_request.title, 'refactor:') || - startsWith(github.event.pull_request.title, 'refactor(') || true runs-on: ubuntu-latest env: @@ -62,6 +55,7 @@ jobs: - name: Check smoke run: | + vsctl eventbus list vsctl event get eventbus-base --offset 0 --number 5 vsctl event get eventbus-filter --offset 0 --number 5 vsctl eventbus create --name "quick-start" diff --git a/test/smoke/main.go b/test/smoke/main.go index 330caf7..26bdc63 100644 --- a/test/smoke/main.go +++ b/test/smoke/main.go @@ -189,14 +189,9 @@ func createSubscription(eventbus, sink, source, filters, transformer string) err } func putEvent(eventbus, eventID, eventType, eventBody, eventSource string) error { - p, err := ce.NewHTTP() + client, err := ce.NewClientHTTP() if err != nil { - log.Errorf("init ce protocol error: %s\n", err) - return err - } - c, err := ce.NewClient(p, ce.WithTimeNow(), ce.WithUUIDs()) - if err != nil { - log.Errorf("create ce client error: %s\n", err) + log.Errorf("new cloudevents client failed, err: %s\n", err.Error()) return err } @@ -204,7 +199,7 @@ func putEvent(eventbus, eventID, eventType, eventBody, eventSource string) error eventID = uuid.NewString() } - ceCtx := ce.ContextWithTarget(context.Background(), fmt.Sprintf("%s%s/gateway/%s", HttpPrefix, Endpoint, eventbus)) + ctx := ce.ContextWithTarget(context.Background(), fmt.Sprintf("%s%s/gateway/%s", HttpPrefix, Endpoint, eventbus)) event := ce.NewEvent() event.SetID(eventID) event.SetSource(eventSource) @@ -214,7 +209,10 @@ func putEvent(eventbus, eventID, eventType, eventBody, eventSource string) error log.Errorf("set data failed: %s\n", err) return err } - c.Send(ceCtx, event) + if result := client.Send(ctx, event); ce.IsUndelivered(result) { + log.Errorf("failed to send event, err: %s\n", result.Error()) + return err + } log.Infof("put event[%s] success.", event.ID()) return nil } @@ -270,10 +268,11 @@ func Test_e2e_base() { if err != nil { return } + time.Sleep(time.Second) - putEvents(0, 2, 1, eventBus, EventBody, EventSource) + putEvents(0, 10000, 100, eventBus, EventBody, EventSource) - err = getEvent(eventBus, "0", "10000") + err = getEvent(eventBus, "0", "10") if err != nil { log.Error("Test_e2e_base get event failed") return From 5f310565e33f53917815ec63f164c2b1f69e7536 Mon Sep 17 00:00:00 2001 From: jyjiangkai Date: Tue, 1 Nov 2022 15:33:05 +0800 Subject: [PATCH 8/8] modify smoke --- test/smoke/main.go | 83 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 62 insertions(+), 21 deletions(-) diff --git a/test/smoke/main.go b/test/smoke/main.go index 26bdc63..28cd980 100644 --- a/test/smoke/main.go +++ b/test/smoke/main.go @@ -52,7 +52,7 @@ const ( const ( HttpPrefix = "http://" - EventBus = "quick-start" + EventBus = "e2e" ) var ( @@ -63,7 +63,7 @@ var ( EventType = "examples" EventBody = "Hello Vanus" - EventSource = "quick-start" + EventSource = "e2e" HttpClient = resty.New() Endpoint = os.Getenv("VANUS_GATEWAY") @@ -146,7 +146,7 @@ func createEventbus(eb string) error { return nil } -func createSubscription(eventbus, sink, source, filters, transformer string) error { +func createSubscription(eventbus, sink, source, filters, transformer string) (uint64, error) { ctx := context.Background() grpcConn := mustGetControllerProxyConn(ctx) defer func() { @@ -157,7 +157,7 @@ func createSubscription(eventbus, sink, source, filters, transformer string) err err := json.Unmarshal([]byte(filters), &filter) if err != nil { log.Errorf("the filter invalid, err: %s", err) - return err + return 0, err } } @@ -166,7 +166,7 @@ func createSubscription(eventbus, sink, source, filters, transformer string) err err := json.Unmarshal([]byte(transformer), &trans) if err != nil { log.Errorf("the transformer invalid, err: %s", err) - return err + return 0, err } } @@ -182,9 +182,28 @@ func createSubscription(eventbus, sink, source, filters, transformer string) err }) if err != nil { log.Errorf("create subscription failed, err: %s", err) - return err + return 0, err } log.Infof("create subscription[%d] success.", res.Id) + return res.Id, nil +} + +func deleteSubscription(id uint64) error { + ctx := context.Background() + grpcConn := mustGetControllerProxyConn(ctx) + defer func() { + _ = grpcConn.Close() + }() + + cli := ctrlpb.NewTriggerControllerClient(grpcConn) + _, err := cli.DeleteSubscription(ctx, &ctrlpb.DeleteSubscriptionRequest{ + Id: id, + }) + if err != nil { + log.Errorf("delete subscription failed, err: %s", err) + return err + } + log.Infof("delete subscription[%d] success.", id) return nil } @@ -213,7 +232,7 @@ func putEvent(eventbus, eventID, eventType, eventBody, eventSource string) error log.Errorf("failed to send event, err: %s\n", result.Error()) return err } - log.Infof("put event[%s] success.", event.ID()) + // log.Infof("put event[%s] success.", event.ID()) return nil } @@ -236,6 +255,7 @@ func putEvents(offset, eventNum, threadNum int64, eventBus, eventBody, eventSour eventid = eventid + eventNum/threadNum } wg.Wait() + log.Infof("put %d events success.", eventNum) return nil } @@ -257,55 +277,76 @@ func getEvent(eventbus, offset, number string) error { return nil } +func Init() error { + err := createEventbus(EventBus) + if err != nil { + return err + } + + // wait for eventbus ready + time.Sleep(3 * time.Second) + return nil +} + func Test_e2e_base() { + log.Info("Start test e2e basecase") eventBus := "eventbus-base" err = createEventbus(eventBus) if err != nil { return } - err = createSubscription(eventBus, Sink, Source, Filters, Transformer) + id, err := createSubscription(EventBus, Sink, Source, Filters, Transformer) if err != nil { + log.Error("create subsription failed") return } - time.Sleep(time.Second) + log.Info("create subsription success") - putEvents(0, 10000, 100, eventBus, EventBody, EventSource) + err = putEvents(0, 10000, 100, EventBus, EventBody, EventSource) + if err != nil { + log.Error("put events failed") + return + } + log.Info("put events success") - err = getEvent(eventBus, "0", "10") + err = getEvent(EventBus, "9999", "1") if err != nil { - log.Error("Test_e2e_base get event failed") + log.Error("get event failed") return } - log.Info("Test_e2e_base get event success") + log.Info("get event success") + + deleteSubscription(id) + log.Info("Finish test e2e basecase") } func Test_e2e_filter() { eventBus := "eventbus-filter" - err = createEventbus(eventBus) + err = createEventbus(EventBus) if err != nil { return } filters := "[{\"exact\": {\"source\":\"filter\"}}]" - err = createSubscription(eventBus, Sink, Source, filters, Transformer) + err = createSubscription(EventBus, Sink, Source, filters, Transformer) if err != nil { return } filters = "[{\"cel\": \"$key.(string) == \\\"value\\\"\"}]" - err = createSubscription(eventBus, Sink, Source, filters, Transformer) + err = createSubscription(EventBus, Sink, Source, filters, Transformer) if err != nil { return } - putEvents(0, 2000, 100, eventBus, EventBody, EventSource) + putEvents(0, 2000, 100, EventBus, EventBody, EventSource) eventSource := "filter" - putEvents(2000, 4000, 10, eventBus, EventBody, eventSource) + putEvents(2000, 4000, 10, EventBus, EventBody, eventSource) eventBody := "{\"key\":\"value\"}" - putEvents(4000, 4000, 100, eventBus, eventBody, EventSource) + putEvents(4000, 4000, 100, EventBus, eventBody, EventSource) - err = getEvent(eventBus, "0", "8000") + err = getEvent(EventBus, "0", "8000", false) if err != nil { log.Error("Test_e2e_filter get event failed") return @@ -328,7 +369,7 @@ func Test_e2e_transformation() { putEvents(0, 10000, 100, eventBus, EventBody, EventSource) - err = getEvent(eventBus, "0", "10000") + err = getEvent(eventBus, "0", "10000", false) if err != nil { log.Error("Test_e2e_transformation get event failed") return