From 76f8f3fa46625d86dc4b170f2f9248b37ab6548e Mon Sep 17 00:00:00 2001 From: Fernando Nogueira Date: Sat, 30 May 2026 14:49:42 +0100 Subject: [PATCH 1/3] feat: implement batch insertion for drift analysis projects and add background job supervision --- go.mod | 26 ++ go.sum | 133 +++++++-- main.go | 10 +- pkg/observability/metrics.go | 12 + pkg/observability/supervisor.go | 37 +++ pkg/repository/drift_analysis_repository.go | 5 + pkg/repository/queries/copyfrom.go | 49 ++++ pkg/repository/queries/db.go | 1 + pkg/repository/queries/drift_analysis.sql | 4 + pkg/repository/queries/drift_analysis.sql.go | 11 + pkg/usecase/auth/github/oauth.go | 12 +- pkg/usecase/drift_stream/api.go | 47 ++-- pkg/usecase/sync/org/github/sync.go | 10 +- .../sync/user_resources/github/sync.go | 12 +- test/integration/drift_ingest_test.go | 254 ++++++++++++++++++ test/integration/main_test.go | 202 ++++++++++++++ test/integration/perms_test.go | 97 +++++++ 17 files changed, 857 insertions(+), 65 deletions(-) create mode 100644 pkg/observability/supervisor.go create mode 100644 pkg/repository/queries/copyfrom.go create mode 100644 test/integration/drift_ingest_test.go create mode 100644 test/integration/main_test.go create mode 100644 test/integration/perms_test.go diff --git a/go.mod b/go.mod index e15ba5b..65a7546 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/jackc/pgx/v5 v5.9.2 github.com/jferrl/go-githubauth v1.6.0 github.com/joho/godotenv v1.5.1 + github.com/ory/dockertest/v3 v3.12.0 go.opentelemetry.io/otel v1.43.0 go.opentelemetry.io/otel/metric v1.43.0 go.opentelemetry.io/otel/sdk/metric v1.43.0 @@ -24,16 +25,29 @@ require ( cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect cloud.google.com/go/compute/metadata v0.9.0 // indirect cloud.google.com/go/monitoring v1.24.2 // indirect + dario.cat/mergo v1.0.0 // indirect + github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.56.0 // indirect github.com/MicahParks/keyfunc/v2 v2.1.0 // indirect + github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect github.com/andybalholm/brotli v1.2.1 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/containerd/continuity v0.4.5 // indirect + github.com/docker/cli v27.4.1+incompatible // indirect + github.com/docker/docker v27.1.1+incompatible // indirect + github.com/docker/go-connections v0.5.0 // indirect + github.com/docker/go-units v0.5.0 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-viper/mapstructure/v2 v2.1.0 // indirect github.com/gofiber/schema v1.7.1 // indirect github.com/gofiber/utils/v2 v2.0.5 // indirect + github.com/gogo/protobuf v1.3.2 // indirect github.com/google/go-querystring v1.2.0 // indirect github.com/google/s2a-go v0.1.9 // indirect + github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect github.com/googleapis/gax-go/v2 v2.15.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect @@ -42,10 +56,21 @@ require ( github.com/klauspost/compress v1.18.6 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.22 // indirect + github.com/moby/docker-image-spec v1.3.1 // indirect + github.com/moby/sys/user v0.3.0 // indirect + github.com/moby/term v0.5.0 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/opencontainers/image-spec v1.1.0 // indirect + github.com/opencontainers/runc v1.2.3 // indirect github.com/philhofer/fwd v1.2.0 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect github.com/tinylib/msgp v1.6.4 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.71.0 // indirect + github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect + github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect + github.com/xeipuuv/gojsonschema v1.2.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 // indirect go.opentelemetry.io/otel/sdk v1.43.0 // indirect @@ -62,4 +87,5 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/grpc v1.79.3 // indirect google.golang.org/protobuf v1.36.10 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 394f7e2..68a25d1 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,12 @@ cloud.google.com/go/monitoring v1.24.2 h1:5OTsoJ1dXYIiMiuL+sYscLc9BumrL3CarVLL7d cloud.google.com/go/monitoring v1.24.2/go.mod h1:x7yzPWcgDRnPEv3sI+jJGBkwl5qINf+6qY4eq0I9B4U= cloud.google.com/go/trace v1.11.6 h1:2O2zjPzqPYAHrn3OKl029qlqG6W8ZdYaOWRyr8NgMT4= cloud.google.com/go/trace v1.11.6/go.mod h1:GA855OeDEBiBMzcckLPE2kDunIpC72N+Pq8WFieFjnI= +dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= +dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= +github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.56.0 h1:O2sXMyJh8b7devAGdE+163xtRurt0RVpB6DIzX5vGfg= github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.56.0/go.mod h1:hEpiGU18xf70qb3jbTcIggWAiEfX/cOIVc2OTe4OegA= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.56.0 h1:ZIT85vKP7LBS84XJ0WdJ3dPOX3iz4j3c0+lpajGQMyo= @@ -21,15 +27,34 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapp github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.56.0/go.mod h1:6ZZMQhZKDvUvkJw2rc+oDP90tMMzuU/J+5HG1ZmPOmE= github.com/MicahParks/keyfunc/v2 v2.1.0 h1:6ZXKb9Rp6qp1bDbJefnG7cTH8yMN1IC/4nf+GVjO99k= github.com/MicahParks/keyfunc/v2 v2.1.0/go.mod h1:rW42fi+xgLJ2FRRXAfNx9ZA8WpD4OeE/yHVMteCkw9k= +github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= +github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= +github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= github.com/andybalholm/brotli v1.2.1 h1:R+f5xP285VArJDRgowrfb9DqL18yVK0gKAW/F+eTWro= github.com/andybalholm/brotli v1.2.1/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5 h1:6xNmx7iTtyBRev0+D/Tv1FZd4SCg8axKApyNyRsAt/w= github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5/go.mod h1:KdCmV+x/BuvyMxRnYBlmVaq4OLiKW6iRQfvC62cvdkI= +github.com/containerd/continuity v0.4.5 h1:ZRoN1sXq9u7V6QoHMcVWGhOwDFqZ4B9i5H6un1Wh0x4= +github.com/containerd/continuity v0.4.5/go.mod h1:/lNJvtJKUQStBzpVQ1+rasXO1LAWtUQssk28EZvJ3nE= +github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= +github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/docker/cli v27.4.1+incompatible h1:VzPiUlRJ/xh+otB75gva3r05isHMo5wXDfPRi5/b4hI= +github.com/docker/cli v27.4.1+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= +github.com/docker/docker v27.1.1+incompatible h1:hO/M4MtV36kzKldqnA37IWhebRA+LnqqcqDja6kVaKY= +github.com/docker/docker v27.1.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= +github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= +github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= +github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/envoyproxy/go-control-plane v0.14.0 h1:hbG2kr4RuFj222B6+7T83thSPqLjwBIfQawTkC++2HA= github.com/envoyproxy/go-control-plane/envoy v1.36.0 h1:yg/JjO5E7ubRyKX3m07GF3reDNEnfOboJ0QySbH736g= github.com/envoyproxy/go-control-plane/envoy v1.36.0/go.mod h1:ty89S1YCCVruQAm9OtKeEkQLTb+Lkz0k8v9W0Oxsv98= @@ -37,31 +62,27 @@ github.com/envoyproxy/protoc-gen-validate v1.3.0 h1:TvGH1wof4H33rezVKWSpqKz5NXWg github.com/envoyproxy/protoc-gen-validate v1.3.0/go.mod h1:HvYl7zwPa5mffgyeTUHA9zHIH36nmrm7oCbo4YKoSWA= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/fxamacker/cbor/v2 v2.9.1 h1:2rWm8B193Ll4VdjsJY28jxs70IdDsHRWgQYAI80+rMQ= -github.com/fxamacker/cbor/v2 v2.9.1/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= +github.com/fxamacker/cbor/v2 v2.9.2 h1:X4Ksno9+x3cz0TZv69ec1hxP/+tymuR8PXQJyDwfh78= +github.com/fxamacker/cbor/v2 v2.9.2/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/gofiber/contrib/v3/jwt v1.1.1 h1:zEkpLR6+Qbt8iS4tpW6Q0CR/R/inzPkygowXCcd+DYk= -github.com/gofiber/contrib/v3/jwt v1.1.1/go.mod h1:Syod2lfqRIeJC4ll+9rTVrV5F1jcWwNWF9ZqY6IvrP8= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/go-viper/mapstructure/v2 v2.1.0 h1:gHnMa2Y/pIxElCH2GlZZ1lZSsn6XMtufpGyP1XxdC/w= +github.com/go-viper/mapstructure/v2 v2.1.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/gofiber/contrib/v3/jwt v1.1.5 h1:W8K91JePPXAptQG+3fsUvmKtjub4llKeYrd9ngzYqHQ= github.com/gofiber/contrib/v3/jwt v1.1.5/go.mod h1:2JVxnG5byQa7fm32hRlKQ5Uc81+SyIoyqpbtYCbkq3Y= -github.com/gofiber/fiber/v3 v3.1.0 h1:1p4I820pIa+FGxfwWuQZ5rAyX0WlGZbGT6Hnuxt6hKY= -github.com/gofiber/fiber/v3 v3.1.0/go.mod h1:n2nYQovvL9z3Too/FGOfgtERjW3GQcAUqgfoezGBZdU= github.com/gofiber/fiber/v3 v3.2.0 h1:g9+09D320foINPpCnR3ibQ5oBEFHjAWRRfDG1te54u8= github.com/gofiber/fiber/v3 v3.2.0/go.mod h1:FHOsc2Db7HhHpsE62QAaJlXVV1pNkbZEptZ4jtti7m4= -github.com/gofiber/schema v1.7.0 h1:yNM+FNRZjyYEli9Ey0AXRBrAY9jTnb+kmGs3lJGPvKg= -github.com/gofiber/schema v1.7.0/go.mod h1:A/X5Ffyru4p9eBdp99qu+nzviHzQiZ7odLT+TwxWhbk= github.com/gofiber/schema v1.7.1 h1:oSJBKdgP8JeIME4TQSAqlNKTU2iBB+2RNmKi8Nsc+TI= github.com/gofiber/schema v1.7.1/go.mod h1:A/X5Ffyru4p9eBdp99qu+nzviHzQiZ7odLT+TwxWhbk= -github.com/gofiber/utils/v2 v2.0.3 h1:qJyfS/t7s7Z4+/zlU1i1pafYNP2+xLupVPgkW8ce1uI= -github.com/gofiber/utils/v2 v2.0.3/go.mod h1:GGERKU3Vhj5z6hS8YKvxL99A54DjOvTFZ0cjZnG4Lj4= -github.com/gofiber/utils/v2 v2.0.4 h1:WwAxUA7L4MW2DjdEHF234lfqvBqd2vYYuBtA9TJq2ec= -github.com/gofiber/utils/v2 v2.0.4/go.mod h1:GGERKU3Vhj5z6hS8YKvxL99A54DjOvTFZ0cjZnG4Lj4= github.com/gofiber/utils/v2 v2.0.5 h1:IMXoI2A5Dao/aMMBURTNxnhbtQO4kUwUFOgcwFSIjLU= github.com/gofiber/utils/v2 v2.0.5/go.mod h1:FwwopfzwAQsoXLCHhOT24eH2jQfBgrrra9S5p0+luxg= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY= github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= @@ -75,6 +96,8 @@ github.com/google/go-querystring v1.2.0 h1:yhqkPbu2/OH+V9BfpCVPZkNmUXhb2gBxJArfh github.com/google/go-querystring v1.2.0/go.mod h1:8IFJqpSRITyJ8QhQ13bmbeMBDfmeEJZD5A0egEOmkqU= github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0= github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM= +github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= +github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.3.6 h1:GW/XbdyBFQ8Qe+YAmFU9uHLo7OnF5tL52HFAgMmyrf4= @@ -85,8 +108,6 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc= -github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= github.com/jackc/pgx/v5 v5.9.2 h1:3ZhOzMWnR4yJ+RW1XImIPsD1aNSz4T4fyP7zlQb56hw= github.com/jackc/pgx/v5 v5.9.2/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= @@ -95,25 +116,49 @@ github.com/jferrl/go-githubauth v1.6.0 h1:By+4kqdNPhvizKztD1uVbwk3cp2o9bNIVATZ9o github.com/jferrl/go-githubauth v1.6.0/go.mod h1:JfSoHpcaY93/UduD45AY15pLgkcE1LnsZfH+Gqf/TBI= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= -github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE= -github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.18.6 h1:2jupLlAwFm95+YDR+NwD2MEfFO9d4z4Prjl1XXDjuao= github.com/klauspost/compress v1.18.6/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= -github.com/mattn/go-isatty v0.0.21 h1:xYae+lCNBP7QuW4PUnNG61ffM4hVIfm+zUzDuSzYLGs= -github.com/mattn/go-isatty v0.0.21/go.mod h1:ZXfXG4SQHsB/w3ZeOYbR0PrPwLy+n6xiMrJlRFqopa4= github.com/mattn/go-isatty v0.0.22 h1:j8l17JJ9i6VGPUFUYoTUKPSgKe/83EYU2zBC7YNKMw4= github.com/mattn/go-isatty v0.0.22/go.mod h1:ZXfXG4SQHsB/w3ZeOYbR0PrPwLy+n6xiMrJlRFqopa4= +github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= +github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= +github.com/moby/sys/user v0.3.0 h1:9ni5DlcW5an3SvRSx4MouotOygvzaXbaSrc/wGDFWPo= +github.com/moby/sys/user v0.3.0/go.mod h1:bG+tYYYJgaMtRKgEmuueC0hJEAZWwtIbZTB+85uoHjs= +github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= +github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= +github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= +github.com/opencontainers/runc v1.2.3 h1:fxE7amCzfZflJO2lHXf4y/y8M1BoAqp+FVmG19oYB80= +github.com/opencontainers/runc v1.2.3/go.mod h1:nSxcWUydXrsBZVYNSkTjoQ/N6rcyTtn+1SD5D4+kRIM= +github.com/ory/dockertest/v3 v3.12.0 h1:3oV9d0sDzlSQfHtIaB5k6ghUCVMVLpAY8hwrqoCyRCw= +github.com/ory/dockertest/v3 v3.12.0/go.mod h1:aKNDTva3cp8dwOWwb9cWuX84aH5akkxXRvO7KCwWVjE= github.com/philhofer/fwd v1.2.0 h1:e6DnBTl7vGY+Gz322/ASL4Gyp1FspeMvx1RNDoToZuM= github.com/philhofer/fwd v1.2.0/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo= github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/shamaton/msgpack/v3 v3.1.0 h1:jsk0vEAqVvvS9+fTZ5/EcQ9tz860c9pWxJ4Iwecz8gU= github.com/shamaton/msgpack/v3 v3.1.0/go.mod h1:DcQG8jrdrQCIxr3HlMYkiXdMhK+KfN2CitkyzsQV4uc= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -123,14 +168,21 @@ github.com/tinylib/msgp v1.6.4 h1:mOwYbyYDLPj35mkA2BjjYejgJk9BuHxDdvRnb6v2ZcQ= github.com/tinylib/msgp v1.6.4/go.mod h1:RSp0LW9oSxFut3KzESt5Voq4GVWyS+PSulT77roAqEA= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.70.0 h1:LAhMGcWk13QZWm85+eg8ZBNbrq5mnkWFGbHMUJHIdXA= -github.com/valyala/fasthttp v1.70.0/go.mod h1:oDZEHHkJ/Buyklg6uURmYs19442zFSnCIfX3j1FY3pE= github.com/valyala/fasthttp v1.71.0 h1:tepR7H+Guh9VUqxxcPggYi8R3lGUu2Rsdh+z7/FCY3k= github.com/valyala/fasthttp v1.71.0/go.mod h1:z1sDUvOShhXq/C9mwH/fSm1Vb71tUJwmQdgkBrBNwnA= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= +github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= +github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= +github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= +github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= +github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74= +github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 h1:YH4g8lQroajqUwWbq/tr2QX1JFmEXaDLgG+ew9bLMWo= @@ -147,28 +199,47 @@ go.opentelemetry.io/otel/sdk/metric v1.43.0 h1:S88dyqXjJkuBNLeMcVPRFXpRw2fuwdvfC go.opentelemetry.io/otel/sdk/metric v1.43.0/go.mod h1:C/RJtwSEJ5hzTiUz5pXF1kILHStzb9zFlIEe85bhj6A= go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A= go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0= -golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= -golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI= golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8= -golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= -golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.54.0 h1:2zJIZAxAHV/OHCDTCOHAYehQzLfSXuf/5SoL/Dv6w/w= golang.org/x/net v0.54.0/go.mod h1:Sj4oj8jK6XmHpBZU/zWHw3BV3abl4Kvi+Ut7cQcY+cQ= golang.org/x/oauth2 v0.36.0 h1:peZ/1z27fi9hUOFCAZaHyrpWG5lwe0RJEEEeH0ThlIs= golang.org/x/oauth2 v0.36.0/go.mod h1:YDBUJMTkDnJS+A4BP4eZBjCqtokkg1hODuPjwiGPO7Q= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= -golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= -golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ= golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= -golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= -golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc= golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38= golang.org/x/time v0.13.0 h1:eUlYslOIt32DgYD6utsuUeHs4d7AsEYLuIAdg7FlYgI= golang.org/x/time v0.13.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/api v0.249.0 h1:0VrsWAKzIZi058aeq+I86uIXbNhm9GxSHpbmZ92a38w= @@ -184,8 +255,14 @@ google.golang.org/grpc v1.79.3/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhH google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= +gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= resty.dev/v3 v3.0.0-beta.6 h1:ghRdNpoE8/wBCv+kTKIOauW1aCrSIeTq7GxtfYgtevU= resty.dev/v3 v3.0.0-beta.6/go.mod h1:NTOerrC/4T7/FE6tXIZGIysXXBdgNqwMZuKtxpea9NM= diff --git a/main.go b/main.go index f88d65d..4487f96 100644 --- a/main.go +++ b/main.go @@ -164,10 +164,12 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Start background jobs with cancellable context - go ghTokenRefresher.RefreshTokens(ctx) - go userSync.StartSyncLoop() - go orgSync.StartSyncLoop() + // Start background jobs with cancellable context. Each runs under + // observability.SuperviseLoop so a panic produces a metric + log instead + // of silently killing the loop. + go observability.SuperviseLoop(ctx, "gh_token_refresher", ghTokenRefresher.RefreshTokens) + go observability.SuperviseLoop(ctx, "user_sync", userSync.StartSyncLoop) + go observability.SuperviseLoop(ctx, "org_sync", orgSync.StartSyncLoop) // Handle shutdown signals go func() { diff --git a/pkg/observability/metrics.go b/pkg/observability/metrics.go index d4a8ff7..1761b99 100644 --- a/pkg/observability/metrics.go +++ b/pkg/observability/metrics.go @@ -25,6 +25,9 @@ type Metrics struct { TokenRefreshFailure metric.Int64Counter TokenRefreshDisabled metric.Int64Counter TokenRefreshRateLimit metric.Int64Counter + + // Background job supervisor metrics + BgJobPanicsTotal metric.Int64Counter } // metricsInstance is the singleton instance @@ -123,6 +126,14 @@ func createMetrics(meter metric.Meter) (*Metrics, error) { return nil, err } + bgJobPanicsTotal, err := meter.Int64Counter( + "bg_job_panics_total", + metric.WithDescription("Number of times a supervised background job has panicked (by job name)"), + ) + if err != nil { + return nil, err + } + return &Metrics{ meter: meter, TokenRefreshTotal: tokenRefreshTotal, @@ -130,5 +141,6 @@ func createMetrics(meter metric.Meter) (*Metrics, error) { TokenRefreshFailure: tokenRefreshFailure, TokenRefreshDisabled: tokenRefreshDisabled, TokenRefreshRateLimit: tokenRefreshRateLimit, + BgJobPanicsTotal: bgJobPanicsTotal, }, nil } diff --git a/pkg/observability/supervisor.go b/pkg/observability/supervisor.go new file mode 100644 index 0000000..2f19f72 --- /dev/null +++ b/pkg/observability/supervisor.go @@ -0,0 +1,37 @@ +package observability + +import ( + "context" + "runtime/debug" + + "github.com/gofiber/fiber/v3/log" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// SuperviseLoop runs fn in a panic-safe loop until ctx is cancelled. fn is +// expected to be a long-running daemon that returns only when ctx is done; if +// it returns earlier or panics, the supervisor restarts it. Every panic is +// logged with a stack trace and emits the bg_job_panics_total counter so +// silent crashes show up in alerting instead of just stopping the loop. +func SuperviseLoop(ctx context.Context, name string, fn func(context.Context)) { + for { + if ctx.Err() != nil { + log.Infof("supervised loop %q exiting: %v", name, ctx.Err()) + return + } + runOnce(ctx, name, fn) + } +} + +func runOnce(ctx context.Context, name string, fn func(context.Context)) { + defer func() { + if r := recover(); r != nil { + log.Errorf("supervised loop %q panicked: %v\n%s", name, r, debug.Stack()) + if m := GetMetrics(); m != nil && m.BgJobPanicsTotal != nil { + m.BgJobPanicsTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("job", name))) + } + } + }() + fn(ctx) +} diff --git a/pkg/repository/drift_analysis_repository.go b/pkg/repository/drift_analysis_repository.go index 71252b5..36abad9 100644 --- a/pkg/repository/drift_analysis_repository.go +++ b/pkg/repository/drift_analysis_repository.go @@ -11,6 +11,7 @@ import ( type DriftAnalysisRepository interface { CreateDriftAnalysisRun(ctx context.Context, params queries.CreateDriftAnalysisRunParams) (queries.DriftAnalysisRun, error) CreateDriftAnalysisProject(ctx context.Context, params queries.CreateDriftAnalysisProjectParams) (queries.DriftAnalysisProject, error) + CreateDriftAnalysisProjectsBatch(ctx context.Context, rows []queries.CreateDriftAnalysisProjectsBatchParams) (int64, error) FindDriftAnalysisRunsByRepositoryID(ctx context.Context, repoId int64, page int) ([]queries.DriftAnalysisRun, error) FindDriftAnalysisRunByUUID(ctx context.Context, uuid uuid.UUID) (queries.DriftAnalysisRun, error) FindRunByRepoAndIdempotencyKey(ctx context.Context, repoId int64, idempotencyKey string) (queries.DriftAnalysisRun, error) @@ -42,6 +43,10 @@ func (r *DriftAnalysisRepo) CreateDriftAnalysisProject(ctx context.Context, para return r.db.Queries(ctx).CreateDriftAnalysisProject(ctx, params) } +func (r *DriftAnalysisRepo) CreateDriftAnalysisProjectsBatch(ctx context.Context, rows []queries.CreateDriftAnalysisProjectsBatchParams) (int64, error) { + return r.db.Queries(ctx).CreateDriftAnalysisProjectsBatch(ctx, rows) +} + func (r *DriftAnalysisRepo) FindDriftAnalysisRunsByRepositoryID(ctx context.Context, repoId int64, page int) ([]queries.DriftAnalysisRun, error) { params := queries.FindDriftAnalysisRunsByRepositoryIdParams{ RepositoryID: repoId, diff --git a/pkg/repository/queries/copyfrom.go b/pkg/repository/queries/copyfrom.go new file mode 100644 index 0000000..69a18d6 --- /dev/null +++ b/pkg/repository/queries/copyfrom.go @@ -0,0 +1,49 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.31.1 +// source: copyfrom.go + +package queries + +import ( + "context" +) + +// iteratorForCreateDriftAnalysisProjectsBatch implements pgx.CopyFromSource. +type iteratorForCreateDriftAnalysisProjectsBatch struct { + rows []CreateDriftAnalysisProjectsBatchParams + skippedFirstNextCall bool +} + +func (r *iteratorForCreateDriftAnalysisProjectsBatch) Next() bool { + if len(r.rows) == 0 { + return false + } + if !r.skippedFirstNextCall { + r.skippedFirstNextCall = true + return true + } + r.rows = r.rows[1:] + return len(r.rows) > 0 +} + +func (r iteratorForCreateDriftAnalysisProjectsBatch) Values() ([]interface{}, error) { + return []interface{}{ + r.rows[0].DriftAnalysisRunID, + r.rows[0].Dir, + r.rows[0].Type, + r.rows[0].Drifted, + r.rows[0].Succeeded, + r.rows[0].InitOutput, + r.rows[0].PlanOutput, + r.rows[0].SkippedDueToPr, + }, nil +} + +func (r iteratorForCreateDriftAnalysisProjectsBatch) Err() error { + return nil +} + +func (q *Queries) CreateDriftAnalysisProjectsBatch(ctx context.Context, arg []CreateDriftAnalysisProjectsBatchParams) (int64, error) { + return q.db.CopyFrom(ctx, []string{"drift_analysis_project"}, []string{"drift_analysis_run_id", "dir", "type", "drifted", "succeeded", "init_output", "plan_output", "skipped_due_to_pr"}, &iteratorForCreateDriftAnalysisProjectsBatch{rows: arg}) +} diff --git a/pkg/repository/queries/db.go b/pkg/repository/queries/db.go index c69f0c5..9ab2515 100644 --- a/pkg/repository/queries/db.go +++ b/pkg/repository/queries/db.go @@ -15,6 +15,7 @@ type DBTX interface { Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) Query(context.Context, string, ...interface{}) (pgx.Rows, error) QueryRow(context.Context, string, ...interface{}) pgx.Row + CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) } func New(db DBTX) *Queries { diff --git a/pkg/repository/queries/drift_analysis.sql b/pkg/repository/queries/drift_analysis.sql index 8964186..636e9c6 100644 --- a/pkg/repository/queries/drift_analysis.sql +++ b/pkg/repository/queries/drift_analysis.sql @@ -13,6 +13,10 @@ INSERT INTO drift_analysis_project (drift_analysis_run_id, dir, type, drifted, s VALUES (@drift_analysis_run_id, @dir, @type, @drifted, @succeeded, @init_output, @plan_output, @skipped_due_to_pr) RETURNING *; +-- name: CreateDriftAnalysisProjectsBatch :copyfrom +INSERT INTO drift_analysis_project (drift_analysis_run_id, dir, type, drifted, succeeded, init_output, plan_output, skipped_due_to_pr) +VALUES (@drift_analysis_run_id, @dir, @type, @drifted, @succeeded, @init_output, @plan_output, @skipped_due_to_pr); + -- name: FindDriftAnalysisRunsByRepositoryId :many SELECT * FROM drift_analysis_run diff --git a/pkg/repository/queries/drift_analysis.sql.go b/pkg/repository/queries/drift_analysis.sql.go index 247df8c..d500de8 100644 --- a/pkg/repository/queries/drift_analysis.sql.go +++ b/pkg/repository/queries/drift_analysis.sql.go @@ -56,6 +56,17 @@ func (q *Queries) CreateDriftAnalysisProject(ctx context.Context, arg CreateDrif return i, err } +type CreateDriftAnalysisProjectsBatchParams struct { + DriftAnalysisRunID uuid.UUID + Dir string + Type string + Drifted bool + Succeeded bool + InitOutput *string + PlanOutput *string + SkippedDueToPr bool +} + const createDriftAnalysisRun = `-- name: CreateDriftAnalysisRun :one INSERT INTO drift_analysis_run (uuid, repository_id, total_projects, total_projects_drifted, total_projects_errored, total_projects_skipped, analysis_duration_millis, idempotency_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) diff --git a/pkg/usecase/auth/github/oauth.go b/pkg/usecase/auth/github/oauth.go index 32a2398..c16718f 100644 --- a/pkg/usecase/auth/github/oauth.go +++ b/pkg/usecase/auth/github/oauth.go @@ -206,10 +206,6 @@ func (o *OAuthHandler) Callback(c fiber.Ctx) error { } err = o.db.WithTx(ctx, func(ctx context.Context) error { - if err != nil { - return err - } - accessTokenExpiresAt := time.Unix(epoch+int64(tokenResponse.ExpiresIn), 0) refreshTokenExpiresAt := time.Unix(epoch+int64(tokenResponse.RefreshTokenExpiresIn), 0) @@ -225,7 +221,7 @@ func (o *OAuthHandler) Callback(c fiber.Ctx) error { RefreshTokenExpiresAt: &refreshTokenExpiresAt, } - _, err = o.userRepository.UpsertUserOnLogin(ctx, upsertUserParams) + _, err := o.userRepository.UpsertUserOnLogin(ctx, upsertUserParams) if err != nil { return err } @@ -236,14 +232,14 @@ func (o *OAuthHandler) Callback(c fiber.Ctx) error { existingUser, err := o.userRepository.FindUserByProviderAndProviderId(ctx, args) if err != nil { log.Error("error finding user by provider and provider id: ", err) - return c.SendStatus(fiber.StatusInternalServerError) + return err } _, err = o.syncStatusUserRepository.CreateOrUpdateSyncStatusUser(ctx, existingUser.ID) if err != nil { if !errors.Is(err, sql.ErrNoRows) { log.Error("error creating sync status user: ", err) - return c.SendStatus(fiber.StatusInternalServerError) + return err } } @@ -255,7 +251,7 @@ func (o *OAuthHandler) Callback(c fiber.Ctx) error { jwtToken, err := jwt.GenerateJWTToken(userToken, o.cfg.Auth.JwtSecret) if err != nil { log.Error("error generating jwt token: ", err) - return c.SendStatus(fiber.StatusInternalServerError) + return err } // Use redirect URL from state if provided and allowed, otherwise use default diff --git a/pkg/usecase/drift_stream/api.go b/pkg/usecase/drift_stream/api.go index f728496..279022d 100644 --- a/pkg/usecase/drift_stream/api.go +++ b/pkg/usecase/drift_stream/api.go @@ -141,6 +141,18 @@ func (d *DriftStateHandler) HandleUpdate(c fiber.Ctx) error { idemKeyPtr = &idemKey } + // Pre-validate all project types before opening a transaction. + // (returning c.SendStatus from inside the closure would commit the partial tx). + projectTypes := make([]string, len(state.ProjectResults)) + for i, project := range state.ProjectResults { + projectType, err := projectTypeToDBString(project.Project.Type) + if err != nil { + log.Errorf("Invalid project type %q at index %d: %v", project.Project.Type, i, err) + return c.SendStatus(fiber.StatusBadRequest) + } + projectTypes[i] = projectType + } + var runUUID uuid.UUID err = d.driftAnalysisRepository.WithTx(c.Context(), func(ctx context.Context) error { params := queries.CreateDriftAnalysisRunParams{ @@ -161,29 +173,26 @@ func (d *DriftStateHandler) HandleUpdate(c fiber.Ctx) error { } runUUID = run.Uuid - for _, project := range state.ProjectResults { - projectType, err := projectTypeToDBString(project.Project.Type) - if err != nil { - log.Errorf("Error converting project type to db string: %v", err) - return c.SendStatus(fiber.StatusBadRequest) - } - - projectParams := queries.CreateDriftAnalysisProjectParams{ - DriftAnalysisRunID: run.Uuid, - Dir: project.Project.Dir, - Type: projectType, - Drifted: project.Drifted, - Succeeded: project.Succeeded, - InitOutput: &project.InitOutput, - PlanOutput: &project.PlanOutput, - SkippedDueToPr: project.SkippedDueToPR, + if len(state.ProjectResults) > 0 { + batch := make([]queries.CreateDriftAnalysisProjectsBatchParams, len(state.ProjectResults)) + for i, project := range state.ProjectResults { + batch[i] = queries.CreateDriftAnalysisProjectsBatchParams{ + DriftAnalysisRunID: run.Uuid, + Dir: project.Project.Dir, + Type: projectTypes[i], + Drifted: project.Drifted, + Succeeded: project.Succeeded, + InitOutput: &project.InitOutput, + PlanOutput: &project.PlanOutput, + SkippedDueToPr: project.SkippedDueToPR, + } } - res, err := d.driftAnalysisRepository.CreateDriftAnalysisProject(ctx, projectParams) + inserted, err := d.driftAnalysisRepository.CreateDriftAnalysisProjectsBatch(ctx, batch) if err != nil { - log.Errorf("Error creating drift analysis project: %v", err) + log.Errorf("Error batch-inserting drift analysis projects: %v", err) return err } - log.Debugf("Created drift analysis project: [ID: %d, dir: %s]", res.ID, project.Project.Dir) + log.Debugf("Batch-inserted %d drift analysis projects for run %s", inserted, run.Uuid) } log.Info("Created drift analysis run: ", run.Uuid) diff --git a/pkg/usecase/sync/org/github/sync.go b/pkg/usecase/sync/org/github/sync.go index 45b29e4..5437b1d 100644 --- a/pkg/usecase/sync/org/github/sync.go +++ b/pkg/usecase/sync/org/github/sync.go @@ -27,9 +27,8 @@ func NewSyncOrganization(orgRepository repository.GitOrgRepository, repoReposito } } -func (so SyncOrganization) StartSyncLoop() { +func (so SyncOrganization) StartSyncLoop(ctx context.Context) { for { - ctx := context.Background() err := so.orgSyncRepository.WithTx(ctx, func(ctx context.Context) error { orgSync, err := so.orgSyncRepository.FindOnePending(ctx) if err != nil { @@ -63,7 +62,12 @@ func (so SyncOrganization) StartSyncLoop() { if err != nil { log.Errorf("error handling sync transaction: %v", err) } - time.Sleep(2 * time.Second) + select { + case <-ctx.Done(): + log.Info("org sync loop shutting down...") + return + case <-time.After(2 * time.Second): + } } } diff --git a/pkg/usecase/sync/user_resources/github/sync.go b/pkg/usecase/sync/user_resources/github/sync.go index 2b4dd12..4fdbbf4 100644 --- a/pkg/usecase/sync/user_resources/github/sync.go +++ b/pkg/usecase/sync/user_resources/github/sync.go @@ -127,9 +127,12 @@ func (s *UserResourceSyncer) SyncUserResources(ctx context.Context, userId int64 return nil } -func (s *UserResourceSyncer) StartSyncLoop() { +func (s *UserResourceSyncer) StartSyncLoop(ctx context.Context) { for { - ctx := context.Background() + if ctx.Err() != nil { + log.Info("user resource sync loop shutting down...") + return + } err := s.syncStatusRepository.WithTx(ctx, func(ctx context.Context) error { result, err := s.syncStatusRepository.FindOnePendingSyncStatusUser(ctx) if err != nil { @@ -145,7 +148,10 @@ func (s *UserResourceSyncer) StartSyncLoop() { } } else { log.Debug("no pending sync status user found") - time.Sleep(5 * time.Second) + select { + case <-ctx.Done(): + case <-time.After(5 * time.Second): + } } return nil }) diff --git a/test/integration/drift_ingest_test.go b/test/integration/drift_ingest_test.go new file mode 100644 index 0000000..f1d9841 --- /dev/null +++ b/test/integration/drift_ingest_test.go @@ -0,0 +1,254 @@ +package integration + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "driftive.cloud/api/pkg/config" + "driftive.cloud/api/pkg/repository" + "driftive.cloud/api/pkg/usecase/cleanup" + "driftive.cloud/api/pkg/usecase/drift_stream" + "github.com/gofiber/fiber/v3" +) + +const ( + seedOrgName = "acme" + seedProvider = "GITHUB" + seedRepoName = "infra" + seedAnalysisToken = "test-analysis-token-abc" + seedProviderOrgId = "555" + seedProviderRepoId = "777" +) + +// seedOrgAndRepo inserts one org + one repo with a known analysis token and +// returns the inserted repo ID. +func seedOrgAndRepo(t *testing.T) (repoID int64) { + t.Helper() + ctx := context.Background() + pool := withPool(t) + + var orgID int64 + err := pool.QueryRow(ctx, + `INSERT INTO git_organization (provider, provider_id, name) + VALUES ($1, $2, $3) RETURNING id`, + seedProvider, seedProviderOrgId, seedOrgName).Scan(&orgID) + if err != nil { + t.Fatalf("seed org: %v", err) + } + + err = pool.QueryRow(ctx, + `INSERT INTO git_repository (organization_id, provider_id, name, is_private, analysis_token) + VALUES ($1, $2, $3, $4, $5) RETURNING id`, + orgID, seedProviderRepoId, seedRepoName, false, seedAnalysisToken).Scan(&repoID) + if err != nil { + t.Fatalf("seed repo: %v", err) + } + return repoID +} + +// newIngestApp builds a minimal Fiber app exposing just the drift ingest endpoint +// against the shared testDB. Mirrors the public-route registration in main.go. +func newIngestApp(t *testing.T) *fiber.App { + t.Helper() + repos := repository.NewRepository(testDB, &config.Config{}) + cleanupSvc := cleanup.NewCleanupService(repos.DriftAnalysisRepository(), 400) + cfg := &config.Config{ + Frontend: config.FrontendConfig{FrontendURL: "http://test.local"}, + } + handler := drift_stream.NewDriftStateHandler( + cfg, + repos.GitOrgRepository(), + repos.GitRepoRepository(), + repos.DriftAnalysisRepository(), + cleanupSvc, + ) + app := fiber.New() + app.Post("/api/v1/drift_analysis", func(c fiber.Ctx) error { return handler.HandleUpdate(c) }) + return app +} + +func sampleState() drift_stream.DriftDetectionResult { + totalErrored := int32(0) + return drift_stream.DriftDetectionResult{ + ProjectResults: []drift_stream.DriftProjectResult{ + { + Project: drift_stream.TypedProject{Dir: "/projects/a", Type: drift_stream.Terraform}, + Drifted: true, + Succeeded: true, + InitOutput: "init-a", + PlanOutput: "plan-a", + }, + { + Project: drift_stream.TypedProject{Dir: "/projects/b", Type: drift_stream.Tofu}, + Drifted: false, + Succeeded: true, + InitOutput: "init-b", + PlanOutput: "plan-b", + }, + { + Project: drift_stream.TypedProject{Dir: "/projects/c", Type: drift_stream.Terragrunt}, + Drifted: false, + Succeeded: true, + SkippedDueToPR: true, + }, + }, + TotalDrifted: 1, + TotalErrored: &totalErrored, + TotalSkipped: 1, + TotalProjects: 3, + TotalChecked: 3, + Duration: 250 * time.Millisecond, + } +} + +func postIngest(t *testing.T, app *fiber.App, token, idemKey string, body any) (int, []byte) { + t.Helper() + buf, err := json.Marshal(body) + if err != nil { + t.Fatalf("marshal body: %v", err) + } + req := httptest.NewRequestWithContext(context.Background(), + http.MethodPost, "/api/v1/drift_analysis", bytes.NewReader(buf)) + req.Header.Set("Content-Type", "application/json") + if token != "" { + req.Header.Set("X-Token", token) + } + if idemKey != "" { + req.Header.Set("Idempotency-Key", idemKey) + } + resp, err := app.Test(req, fiber.TestConfig{Timeout: 30 * time.Second}) + if err != nil { + t.Fatalf("app.Test: %v", err) + } + defer resp.Body.Close() + respBody, _ := io.ReadAll(resp.Body) + return resp.StatusCode, respBody +} + +func TestDriftIngest_HappyPath(t *testing.T) { + truncateAll(t) + repoID := seedOrgAndRepo(t) + app := newIngestApp(t) + + status, body := postIngest(t, app, seedAnalysisToken, "", sampleState()) + + if status != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", status, string(body)) + } + + var got struct { + RunID string `json:"run_id"` + DashboardURL string `json:"dashboard_url"` + } + if err := json.Unmarshal(body, &got); err != nil { + t.Fatalf("decode response: %v", err) + } + if got.RunID == "" { + t.Fatalf("response missing run_id: %s", body) + } + if got.DashboardURL == "" { + t.Fatalf("response missing dashboard_url: %s", body) + } + + // Verify the run + 3 projects landed in the DB. + ctx := context.Background() + pool := withPool(t) + var runCount, projectCount int + if err := pool.QueryRow(ctx, + `SELECT COUNT(*) FROM drift_analysis_run WHERE repository_id = $1`, repoID). + Scan(&runCount); err != nil { + t.Fatalf("count runs: %v", err) + } + if runCount != 1 { + t.Errorf("expected 1 run, got %d", runCount) + } + if err := pool.QueryRow(ctx, + `SELECT COUNT(*) FROM drift_analysis_project p + JOIN drift_analysis_run r ON r.uuid = p.drift_analysis_run_id + WHERE r.repository_id = $1`, repoID).Scan(&projectCount); err != nil { + t.Fatalf("count projects: %v", err) + } + if projectCount != 3 { + t.Errorf("expected 3 projects, got %d", projectCount) + } +} + +func TestDriftIngest_InvalidToken(t *testing.T) { + truncateAll(t) + seedOrgAndRepo(t) + app := newIngestApp(t) + + status, _ := postIngest(t, app, "no-such-token", "", sampleState()) + if status != http.StatusUnauthorized { + t.Fatalf("expected 401, got %d", status) + } +} + +// TestDriftIngest_IdempotencyRace exercises the pgUniqueViolation recovery path +// in HandleUpdate: two concurrent POSTs with the same Idempotency-Key must both +// resolve to the same run UUID, and only one row should exist. +func TestDriftIngest_IdempotencyRace(t *testing.T) { + truncateAll(t) + repoID := seedOrgAndRepo(t) + app := newIngestApp(t) + + const idemKey = "race-key-1" + state := sampleState() + + var wg sync.WaitGroup + type result struct { + status int + body []byte + } + results := make([]result, 2) + for i := 0; i < 2; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + status, body := postIngest(t, app, seedAnalysisToken, idemKey, state) + results[idx] = result{status: status, body: body} + }(i) + } + wg.Wait() + + for i, r := range results { + if r.status != http.StatusOK { + t.Fatalf("request %d: expected 200, got %d: %s", i, r.status, r.body) + } + } + + parse := func(b []byte) string { + var r struct { + RunID string `json:"run_id"` + } + if err := json.Unmarshal(b, &r); err != nil { + t.Fatalf("decode response: %v", err) + } + return r.RunID + } + runA := parse(results[0].body) + runB := parse(results[1].body) + if runA == "" || runB == "" { + t.Fatalf("missing run ids: %q %q", runA, runB) + } + if runA != runB { + t.Errorf("expected both concurrent calls to converge on one run, got %s vs %s", runA, runB) + } + + var runCount int + if err := testDB.Pool.QueryRow(context.Background(), + `SELECT COUNT(*) FROM drift_analysis_run WHERE repository_id = $1`, repoID). + Scan(&runCount); err != nil { + t.Fatalf("count runs: %v", err) + } + if runCount != 1 { + t.Errorf("idempotent retry inserted %d rows (expected 1)", runCount) + } +} diff --git a/test/integration/main_test.go b/test/integration/main_test.go new file mode 100644 index 0000000..9c06a11 --- /dev/null +++ b/test/integration/main_test.go @@ -0,0 +1,202 @@ +// Package integration brings up a real Postgres via dockertest, applies the +// repo's Flyway migrations as plain SQL, and exposes a shared *db.DB to each +// test. Requires Docker. Skipped if DOCKER_HOST / docker socket is unreachable. +// +// Run with: go test ./test/integration/... +package integration + +import ( + "context" + "fmt" + "log" + "net" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "testing" + "time" + + "driftive.cloud/api/pkg/config" + "driftive.cloud/api/pkg/db" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" +) + +// testDB is the *db.DB shared by every test. Populated by TestMain. +var testDB *db.DB + +// migrationsDir resolves to api/migrations/ regardless of where the test is run from. +var migrationsDir string + +const ( + pgUser = "driftive" + pgPassword = "driftive" + pgDatabase = "driftive" +) + +func TestMain(m *testing.M) { + // Best-effort discovery of the migrations directory so the test runs from + // either the package dir or the repo root. + for _, candidate := range []string{"../../migrations", "migrations"} { + if abs, err := filepath.Abs(candidate); err == nil { + if info, err := os.Stat(abs); err == nil && info.IsDir() { + migrationsDir = abs + break + } + } + } + if migrationsDir == "" { + log.Fatal("integration tests: could not locate migrations directory") + } + + pool, err := dockertest.NewPool("") + if err != nil { + log.Printf("integration tests: docker unreachable, skipping: %v", err) + os.Exit(0) + } + if err := pool.Client.Ping(); err != nil { + log.Printf("integration tests: docker daemon not responding, skipping: %v", err) + os.Exit(0) + } + + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "postgres", + Tag: "16-alpine", + Env: []string{ + "POSTGRES_USER=" + pgUser, + "POSTGRES_PASSWORD=" + pgPassword, + "POSTGRES_DB=" + pgDatabase, + "listen_addresses=*", + }, + }, func(hc *docker.HostConfig) { + hc.AutoRemove = true + hc.RestartPolicy = docker.RestartPolicy{Name: "no"} + }) + if err != nil { + log.Fatalf("integration tests: failed to start postgres: %v", err) + } + if err := resource.Expire(120); err != nil { + log.Printf("integration tests: could not set container expiry: %v", err) + } + + hostPort := resource.GetHostPort("5432/tcp") + host, portStr, err := net.SplitHostPort(hostPort) + if err != nil { + log.Fatalf("integration tests: invalid host:port %q: %v", hostPort, err) + } + port, _ := strconv.Atoi(portStr) + + pool.MaxWait = 60 * time.Second + connStr := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable", + pgUser, pgPassword, net.JoinHostPort(host, portStr), pgDatabase) + if err := pool.Retry(func() error { + conn, err := pgx.Connect(context.Background(), connStr) + if err != nil { + return err + } + defer conn.Close(context.Background()) + return conn.Ping(context.Background()) + }); err != nil { + log.Fatalf("integration tests: postgres never became reachable: %v", err) + } + + if err := applyMigrations(connStr); err != nil { + log.Fatalf("integration tests: migrations failed: %v", err) + } + + cfg := config.Config{ + Database: config.Database{ + User: pgUser, + Password: pgPassword, + Host: host, + Port: port, + Database: pgDatabase, + Connections: 4, + }, + } + testDB = db.NewDB(cfg) + + code := m.Run() + + testDB.Pool.Close() + if err := pool.Purge(resource); err != nil { + log.Printf("integration tests: failed to purge postgres container: %v", err) + } + os.Exit(code) +} + +// applyMigrations runs every *.sql file under migrationsDir in filename order. +// The Flyway naming convention (V__name.sql) sorts lexicographically +// to the right order, so plain sort.Strings is enough. +func applyMigrations(connStr string) error { + entries, err := os.ReadDir(migrationsDir) + if err != nil { + return fmt.Errorf("read migrations dir: %w", err) + } + names := make([]string, 0, len(entries)) + for _, e := range entries { + if !e.IsDir() && strings.HasSuffix(e.Name(), ".sql") { + names = append(names, e.Name()) + } + } + sort.Strings(names) + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + conn, err := pgx.Connect(ctx, connStr) + if err != nil { + return fmt.Errorf("connect for migrations: %w", err) + } + defer conn.Close(ctx) + + for _, name := range names { + body, err := os.ReadFile(filepath.Join(migrationsDir, name)) + if err != nil { + return fmt.Errorf("read %s: %w", name, err) + } + if _, err := conn.Exec(ctx, string(body)); err != nil { + return fmt.Errorf("apply %s: %w", name, err) + } + } + return nil +} + +// truncateAll wipes test data between cases. Uses TRUNCATE ... CASCADE so it's +// safe regardless of FK order. Skip if testDB hasn't been initialised (Docker +// unreachable; TestMain returned 0 above). +func truncateAll(t *testing.T) { + t.Helper() + if testDB == nil { + t.Skip("integration tests skipped (no testDB)") + } + tables := []string{ + "drift_analysis_project", + "drift_analysis_run", + "git_repository", + "user_git_organization", + "git_organization", + "sync_status_user", + "git_organization_sync", + "users", + } + for _, tbl := range tables { + _, err := testDB.Pool.Exec(context.Background(), "TRUNCATE TABLE "+tbl+" RESTART IDENTITY CASCADE") + if err != nil { + // Table may not exist on all schemas — log but don't fail. + t.Logf("truncate %s: %v", tbl, err) + } + } +} + +// withPool returns the raw pgxpool for direct seeding helpers. +func withPool(t *testing.T) *pgxpool.Pool { + t.Helper() + if testDB == nil { + t.Skip("integration tests skipped (no testDB)") + } + return testDB.Pool +} diff --git a/test/integration/perms_test.go b/test/integration/perms_test.go new file mode 100644 index 0000000..111295a --- /dev/null +++ b/test/integration/perms_test.go @@ -0,0 +1,97 @@ +package integration + +import ( + "context" + "sort" + "testing" + + "driftive.cloud/api/pkg/config" + "driftive.cloud/api/pkg/repository" +) + +// TestPermsMiddleware_OrgScoping verifies that FindAllUserOrganizationIds — +// the query that backs the perms middleware (pkg/middleware/perms/session.go) +// — only returns org IDs the user actually belongs to. This is the +// org-scoping contract the rest of the authenticated API relies on. +func TestPermsMiddleware_OrgScoping(t *testing.T) { + truncateAll(t) + ctx := context.Background() + pool := withPool(t) + + // Two users. + insertUser := func(providerID, username string) int64 { + var id int64 + email := username + "@test" + err := pool.QueryRow(ctx, + `INSERT INTO users (provider, provider_id, name, username, email, access_token, refresh_token) + VALUES ('GITHUB', $1, $2, $3, $4, 'at', 'rt') RETURNING id`, + providerID, username, username, email).Scan(&id) + if err != nil { + t.Fatalf("insert user %s: %v", username, err) + } + return id + } + alice := insertUser("100", "alice") + bob := insertUser("101", "bob") + + // Three orgs. + insertOrg := func(providerID, name string) int64 { + var id int64 + err := pool.QueryRow(ctx, + `INSERT INTO git_organization (provider, provider_id, name) VALUES ('GITHUB', $1, $2) RETURNING id`, + providerID, name).Scan(&id) + if err != nil { + t.Fatalf("insert org %s: %v", name, err) + } + return id + } + orgA := insertOrg("o-a", "acme") + orgB := insertOrg("o-b", "beta") + orgC := insertOrg("o-c", "gamma") + + // alice is in orgA + orgB; bob is in orgC only. + link := func(userID, orgID int64) { + if _, err := pool.Exec(ctx, + `INSERT INTO user_git_organization (user_id, git_organization_id, role) VALUES ($1, $2, 'member')`, + userID, orgID); err != nil { + t.Fatalf("link user %d -> org %d: %v", userID, orgID, err) + } + } + link(alice, orgA) + link(alice, orgB) + link(bob, orgC) + + repos := repository.NewRepository(testDB, &config.Config{}) + orgRepo := repos.GitOrgRepository() + + aliceOrgs, err := orgRepo.FindAllUserOrganizationIds(ctx, alice) + if err != nil { + t.Fatalf("FindAllUserOrganizationIds(alice): %v", err) + } + sort.Slice(aliceOrgs, func(i, j int) bool { return aliceOrgs[i] < aliceOrgs[j] }) + want := []int64{orgA, orgB} + sort.Slice(want, func(i, j int) bool { return want[i] < want[j] }) + if len(aliceOrgs) != len(want) { + t.Fatalf("alice: expected %v, got %v", want, aliceOrgs) + } + for i := range want { + if aliceOrgs[i] != want[i] { + t.Fatalf("alice: expected %v, got %v", want, aliceOrgs) + } + } + + bobOrgs, err := orgRepo.FindAllUserOrganizationIds(ctx, bob) + if err != nil { + t.Fatalf("FindAllUserOrganizationIds(bob): %v", err) + } + if len(bobOrgs) != 1 || bobOrgs[0] != orgC { + t.Fatalf("bob: expected [%d], got %v", orgC, bobOrgs) + } + + // orgC must NOT appear in alice's set — the scoping invariant. + for _, id := range aliceOrgs { + if id == orgC { + t.Errorf("orgC leaked into alice's org IDs: %v", aliceOrgs) + } + } +} From c95c033633b34726d53d2341a4390f0b9ce47e31 Mon Sep 17 00:00:00 2001 From: Fernando Nogueira Date: Sat, 30 May 2026 14:56:08 +0100 Subject: [PATCH 2/3] feat: implement batch insertion for drift analysis projects and add background job supervision --- .github/workflows/pr.yaml | 5 ++++- .github/workflows/release.yml | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index b7107a7..e87e5fb 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -15,11 +15,14 @@ jobs: - name: Set up Go uses: actions/setup-go@v6 with: - go-version: "1.26" + go-version-file: 'go.mod' - name: Run golangci-lint uses: golangci/golangci-lint-action@v9 + - name: Run tests + run: go test ./... + - name: Run GoReleaser uses: goreleaser/goreleaser-action@v7 with: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 741f1f7..2f1ac91 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -22,7 +22,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v6 with: - go-version: "1.26" + go-version-file: 'go.mod' - name: Login to GHCR run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u ${{ github.actor }} --password-stdin From 4df9c8b897b2f74df8d23313534f4fb5c2377172 Mon Sep 17 00:00:00 2001 From: Fernando Nogueira Date: Sat, 30 May 2026 15:02:20 +0100 Subject: [PATCH 3/3] feat: implement batch insertion for drift analysis projects and add background job supervision --- pkg/observability/supervisor.go | 15 +++++--- pkg/usecase/drift_stream/api.go | 2 +- test/integration/drift_ingest_test.go | 50 +++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/pkg/observability/supervisor.go b/pkg/observability/supervisor.go index 2f19f72..4fcdd59 100644 --- a/pkg/observability/supervisor.go +++ b/pkg/observability/supervisor.go @@ -3,17 +3,16 @@ package observability import ( "context" "runtime/debug" + "time" "github.com/gofiber/fiber/v3/log" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" ) -// SuperviseLoop runs fn in a panic-safe loop until ctx is cancelled. fn is -// expected to be a long-running daemon that returns only when ctx is done; if -// it returns earlier or panics, the supervisor restarts it. Every panic is -// logged with a stack trace and emits the bg_job_panics_total counter so -// silent crashes show up in alerting instead of just stopping the loop. +const superviseRestartBackoff = time.Second + +// SuperviseLoop runs fn in a panic-safe loop until ctx is cancelled. func SuperviseLoop(ctx context.Context, name string, fn func(context.Context)) { for { if ctx.Err() != nil { @@ -21,6 +20,12 @@ func SuperviseLoop(ctx context.Context, name string, fn func(context.Context)) { return } runOnce(ctx, name, fn) + select { + case <-ctx.Done(): + log.Infof("supervised loop %q exiting: %v", name, ctx.Err()) + return + case <-time.After(superviseRestartBackoff): + } } } diff --git a/pkg/usecase/drift_stream/api.go b/pkg/usecase/drift_stream/api.go index 279022d..350a96e 100644 --- a/pkg/usecase/drift_stream/api.go +++ b/pkg/usecase/drift_stream/api.go @@ -147,7 +147,7 @@ func (d *DriftStateHandler) HandleUpdate(c fiber.Ctx) error { for i, project := range state.ProjectResults { projectType, err := projectTypeToDBString(project.Project.Type) if err != nil { - log.Errorf("Invalid project type %q at index %d: %v", project.Project.Type, i, err) + log.Errorf("Invalid project type %v at index %d: %v", project.Project.Type, i, err) return c.SendStatus(fiber.StatusBadRequest) } projectTypes[i] = projectType diff --git a/test/integration/drift_ingest_test.go b/test/integration/drift_ingest_test.go index f1d9841..2304e50 100644 --- a/test/integration/drift_ingest_test.go +++ b/test/integration/drift_ingest_test.go @@ -178,6 +178,56 @@ func TestDriftIngest_HappyPath(t *testing.T) { if projectCount != 3 { t.Errorf("expected 3 projects, got %d", projectCount) } + + rows, err := pool.Query(ctx, + `SELECT p.dir, p.type, p.drifted, p.succeeded, p.init_output, p.plan_output, p.skipped_due_to_pr + FROM drift_analysis_project p + JOIN drift_analysis_run r ON r.uuid = p.drift_analysis_run_id + WHERE r.repository_id = $1 + ORDER BY p.dir`, repoID) + if err != nil { + t.Fatalf("query projects: %v", err) + } + defer rows.Close() + type projRow struct { + dir, ptype string + drifted, succeeded, skipd bool + initOut, planOut *string + } + var gotRows []projRow + for rows.Next() { + var r projRow + if err := rows.Scan(&r.dir, &r.ptype, &r.drifted, &r.succeeded, &r.initOut, &r.planOut, &r.skipd); err != nil { + t.Fatalf("scan: %v", err) + } + gotRows = append(gotRows, r) + } + wantRows := []projRow{ + {dir: "/projects/a", ptype: "TERRAFORM", drifted: true, succeeded: true, initOut: ptr("init-a"), planOut: ptr("plan-a"), skipd: false}, + {dir: "/projects/b", ptype: "TOFU", drifted: false, succeeded: true, initOut: ptr("init-b"), planOut: ptr("plan-b"), skipd: false}, + {dir: "/projects/c", ptype: "TERRAGRUNT", drifted: false, succeeded: true, initOut: ptr(""), planOut: ptr(""), skipd: true}, + } + if len(gotRows) != len(wantRows) { + t.Fatalf("got %d rows, want %d", len(gotRows), len(wantRows)) + } + for i := range wantRows { + if gotRows[i].dir != wantRows[i].dir || gotRows[i].ptype != wantRows[i].ptype || + gotRows[i].drifted != wantRows[i].drifted || gotRows[i].succeeded != wantRows[i].succeeded || + gotRows[i].skipd != wantRows[i].skipd || + !strPtrEq(gotRows[i].initOut, wantRows[i].initOut) || + !strPtrEq(gotRows[i].planOut, wantRows[i].planOut) { + t.Errorf("row %d mismatch:\n got=%+v\nwant=%+v", i, gotRows[i], wantRows[i]) + } + } +} + +func ptr(s string) *string { return &s } + +func strPtrEq(a, b *string) bool { + if a == nil || b == nil { + return a == b + } + return *a == *b } func TestDriftIngest_InvalidToken(t *testing.T) {