From f2e373bb71b9459caf8c7060457f8d8a43b39a8e Mon Sep 17 00:00:00 2001 From: Dmitrii Andreev Date: Tue, 26 May 2026 11:41:55 -0500 Subject: [PATCH 1/4] HYPERFLEET-930 - docs: add CLAUDE.md and AGENTS.md AGENTS.md contains agent-facing documentation for the broker library: verification targets, source of truth table, architecture context, project boundaries, and non-obvious gotchas. CLAUDE.md imports AGENTS.md via @AGENTS.md. --- AGENTS.md | 66 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ CLAUDE.md | 1 + 2 files changed, 67 insertions(+) create mode 100644 AGENTS.md create mode 100644 CLAUDE.md diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..54727cb --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,66 @@ +# AGENTS.md — hyperfleet-broker + +Go library: unified pub/sub API over RabbitMQ and Google Pub/Sub with CloudEvents. Not a binary — consumed as a dependency by other HyperFleet services. Wraps [Watermill](https://github.com/ThreeDotsLabs/watermill) internally; public API never exposes Watermill types. + +## Verification + +| Target | What it runs | +|--------|-------------| +| `make lint` | golangci-lint v2.7.0 (bingo-managed, config in `.golangci.yml`) | +| `make fmt` | `golangci-lint fmt` — gofmt + goimports + gci (4-group imports) + golines (120 char wrap) | +| `make test` | Unit tests: `./broker/... ./pkg/...` (timeout 10m) | +| `make test-integration` | Integration tests: `./test/integration/...` (sequential `-p 1`, timeout 10m) | +| `make test-all` | Both unit + integration | +| `make install-hooks` | Install pre-commit hooks (`pre-commit install --install-hooks`) | + +### Pre-push order + +1. `make fmt` → 2. `make lint` → 3. `make test` → 4. `make test-integration` (if touching broker/ or test/) + +## Source of truth + +| Topic | Location | +|-------|----------| +| Public API | `broker/broker.go`, `broker/publisher.go`, `broker/subscriber.go`, `broker/metrics.go`, `broker/errors.go` | +| Logger interface | `pkg/logger/logger.go` | +| Config structure + validation | `broker/config.go`, `broker/rabbitmq.go`, `broker/googlepubsub.go` | +| Config fields reference | `example/broker.example.yaml` | +| Linter config | `.golangci.yml` | +| Integration test helpers | `test/integration/common/common.go` | +| Mock logger (unit tests) | `pkg/logger/mock.go` — use `NewMockLogger()` | +| Container setup helpers | `test/integration/rabbitmq/setup.go`, `test/integration/googlepubsub/setup.go` | +| Leak & perf integration tests | `test/integration/broker_leak_test.go`, `test/integration/broker_perf_test.go` | +| Examples (separate go.mod) | `example/go.mod`, `example/cmd/publisher/main.go`, `example/cmd/subscriber/main.go` | +| Docker Compose examples | `example/rabbitmq/`, `example/googlepubsub/` | +| Comprehensive user docs | `README.md` | + +## Architecture context + +Only patterns an agent cannot infer from reading the code: + +- **Subscription ID** controls messaging pattern: same ID = shared/load-balanced queue, different IDs = fanout. RabbitMQ queue name = `{topic}-{subscriptionID}` (default) or `{queue}-{subscriptionID}` when `broker.rabbitmq.queue` is set. Google Pub/Sub subscription name = `{subscriptionID}`. +- **Config precedence**: programmatic map > env vars > broker.yaml file > defaults. `BROKER_CONFIG_FILE` env var overrides file path. Env vars use underscore for dots (e.g., `BROKER_RABBITMQ_URL`). +- **Health check asymmetry**: RabbitMQ = in-memory connection state check (no network call). Google Pub/Sub = `GetTopic` API probe with 3s timeout on a non-existent topic; `NotFound` = healthy. +- **Config debugging**: `log_config: true` in broker.yaml (or `LOG_CONFIG=true`) logs full config as JSON at creation time. Passwords masked. +- **DLQ topic naming**: DLQ topic is always `{subscriptionID}-dlq` (hardcoded at `googlepubsub.go`). Not configurable. + +## Project boundaries + +### DO + +- Keep Watermill as internal implementation detail — never expose Watermill types in public API +- Write integration tests for new broker-level behavior using `test/integration/common/` helpers + +### DON'T + +- Don't add a third broker backend without updating `validateConfig`, both constructors, health checks, and integration test infrastructure + +## Gotchas + +- **Google Pub/Sub health check requires `pubsub.topics.get`** — NOT included in `roles/pubsub.publisher`. Grant `roles/pubsub.viewer` or custom role. +- **`subscriber.parallelism` > 1 only needed for RabbitMQ**. Google Pub/Sub handles parallelism internally via `num_goroutines` and `max_outstanding_messages`. +- **Integration tests run sequentially** (`-p 1`) because CI has 1 CPU. Parallel execution causes timeouts. +- **`example/` has its own `go.mod`** — `make test` from root does not test examples. Update `example/go.mod` when changing public API. +- **`error_test.go` "missing rabbitmq url" test is false-passing**: sets `expectError: false` but `validateRabbitMQConfig` rejects empty URLs. Test passes because else branch only asserts when `err == nil`. Do not rely on it as documenting intentional behavior. +- **Google Pub/Sub subscriber auto-creates DLQ topic** (`{subscriptionID}-dlq`) when `create_topic_if_missing` is true — see `googlepubsub.go:188`. +- **Integration tests**: call `common.SetupTestEnvironment()` first in `TestMain`, share one container per package. Topic/subscription name uniqueness is handled internally by `Run*` helper functions in `common/common.go`. Pattern at `test/integration/rabbitmq/rabbitmq_test.go:28`. diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..43c994c --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1 @@ +@AGENTS.md From d7482076a790a5711912892d18a5c076a81b48f7 Mon Sep 17 00:00:00 2001 From: Dmitrii Andreev Date: Tue, 26 May 2026 11:42:11 -0500 Subject: [PATCH 2/4] HYPERFLEET-930 - chore: add pre-commit hooks and golangci-lint config Add .pre-commit-config.yaml using openshift-hyperfleet/hyperfleet-hooks v0.2.1 (commitlint, gofmt, golangci-lint, go-vet). Add .golangci.yml aligned with HyperFleet org standard plus gci (4-group import ordering) and golines (120 char line wrapping). Update Makefile: fmt now uses golangci-lint fmt, add gofmt/go-vet/vet aliases for pre-commit hooks, add install-hooks target. --- .golangci.yml | 108 ++++++++++++++++++++++++++++++++++++++++ .pre-commit-config.yaml | 11 ++++ Makefile | 22 ++++++-- 3 files changed, 137 insertions(+), 4 deletions(-) create mode 100644 .golangci.yml create mode 100644 .pre-commit-config.yaml diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..18b0bba --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,108 @@ +# HyperFleet Broker - golangci-lint configuration +# Aligned with: https://github.com/openshift-hyperfleet/architecture/blob/main/hyperfleet/standards/golangci.yml + +version: "2" + +run: + timeout: 5m + tests: true + modules-download-mode: readonly + +linters: + enable: + # Code Quality + - errcheck + - govet + - staticcheck + - ineffassign + - unused + - unconvert + - unparam + - goconst + - exhaustive + + # Code Style + - misspell + - lll + - revive + - gocritic + + # Security + - gosec + settings: + errcheck: + check-type-assertions: true + check-blank: true + govet: + enable-all: true + goconst: + min-len: 3 + min-occurrences: 3 + misspell: + locale: US + lll: + line-length: 120 + revive: + rules: + - name: exported + severity: warning + disabled: true + - name: unexported-return + severity: warning + disabled: false + - name: var-naming + severity: warning + disabled: false + unparam: + check-exported: false + exhaustive: + default-signifies-exhaustive: true + exclusions: + generated: lax + rules: + # Relaxed rules for test files + - linters: + - gosec + - errcheck + - unparam + path: _test\.go + paths: + - third_party(/|$) + - builtin(/|$) + - example(/|$) + +issues: + max-issues-per-linter: 0 + max-same-issues: 0 + new: false + +formatters: + enable: + - gofmt + - goimports + - gci + - golines + settings: + gofmt: + simplify: true + golines: + max-len: 120 + # Local addition on top of org standard: repo-specific import grouping + gci: + custom-order: true + sections: + - standard + - default + - prefix(github.com/openshift-hyperfleet) + - prefix(github.com/openshift-hyperfleet/hyperfleet-broker) + exclusions: + generated: lax + paths: + - third_party(/|$) + - builtin(/|$) + - example(/|$) + +output: + formats: + text: + path: stdout diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..80deab0 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,11 @@ +default_install_hook_types: [pre-commit, commit-msg] + +repos: + - repo: https://github.com/openshift-hyperfleet/hyperfleet-hooks + rev: v0.2.1 + hooks: + - id: hyperfleet-commitlint + stages: [commit-msg] + - id: hyperfleet-gofmt + - id: hyperfleet-golangci-lint + - id: hyperfleet-go-vet diff --git a/Makefile b/Makefile index aa502de..38cd3c3 100644 --- a/Makefile +++ b/Makefile @@ -1,14 +1,28 @@ include .bingo/Variables.mk -.PHONY: test test-integration test-all lint fmt +.PHONY: test test-integration test-all lint fmt gofmt go-vet vet install-hooks + +# Install pre-commit hooks +install-hooks: + pre-commit install --install-hooks # Run linter lint: $(GOLANGCI_LINT) $(GOLANGCI_LINT) run ./... -# Format code -fmt: - gofmt -s -w . +# Run go vet +vet: + go vet ./... + +# Alias for vet (used by pre-commit hook) +go-vet: vet + +# Alias for fmt (used by pre-commit hook) +gofmt: fmt + +# Format code (gofmt + goimports + gci via golangci-lint) +fmt: $(GOLANGCI_LINT) + $(GOLANGCI_LINT) fmt ./... # Run unit tests test: From 869f0677bc17af0142c44d423cca5b5bdabbc03a Mon Sep 17 00:00:00 2001 From: Dmitrii Andreev Date: Tue, 26 May 2026 11:42:31 -0500 Subject: [PATCH 3/4] HYPERFLEET-930 - fix: resolve all golangci-lint issues Fix 126 pre-existing lint issues caught by new .golangci.yml: - Extract BrokerTypeRabbitMQ/BrokerTypeGooglePubSub constants (goconst) - Fix import ordering with gci 4-group format across all files - Wrap long lines with golines formatter (lll) - Rename shadowed err vars in defer blocks to closeErr/setErr (govet) - Fix struct field alignment (govet/fieldalignment) - Replace else{if} with else-if (gocritic) - Add nolint:errcheck for intentional ignored returns (errcheck) - Rename TEST_DURATION to testDuration (revive) - Remove dead_letter_topic config field - was never read (dead code) --- AGENTS.md | 2 +- README.md | 9 +- broker/broker.go | 41 ++- broker/cloudevents_test.go | 12 +- broker/config.go | 6 +- broker/config_test.go | 25 +- broker/error_test.go | 56 ++-- broker/errors.go | 22 +- broker/googlepubsub.go | 64 +++-- broker/health_test.go | 3 +- broker/metrics.go | 5 +- broker/metrics_test.go | 24 +- broker/options_test.go | 3 +- broker/publisher.go | 1 + broker/rabbitmq.go | 8 +- broker/subscriber.go | 31 +-- example/broker.example.yaml | 3 +- example/cmd/publisher/main.go | 8 +- example/cmd/subscriber/main.go | 9 +- example/googlepubsub/broker.yaml | 4 +- pkg/logger/mock.go | 4 +- test/integration/broker_leak_test.go | 72 +++-- test/integration/broker_perf_test.go | 92 ++++--- test/integration/common/common.go | 251 ++++++++++++------ .../googlepubsub/googlepubsub_test.go | 16 +- test/integration/googlepubsub/setup.go | 4 +- test/integration/rabbitmq/rabbitmq_test.go | 50 +++- test/integration/rabbitmq/setup.go | 4 +- 28 files changed, 522 insertions(+), 307 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 54727cb..d5e2d7f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -44,7 +44,7 @@ Only patterns an agent cannot infer from reading the code: - **Config debugging**: `log_config: true` in broker.yaml (or `LOG_CONFIG=true`) logs full config as JSON at creation time. Passwords masked. - **DLQ topic naming**: DLQ topic is always `{subscriptionID}-dlq` (hardcoded at `googlepubsub.go`). Not configurable. -## Project boundaries +## Boundaries ### DO diff --git a/README.md b/README.md index 5807438..c91878b 100644 --- a/README.md +++ b/README.md @@ -239,8 +239,7 @@ broker: retry_min_backoff: "10s" # 0s to 600s retry_max_backoff: "600s" # 0s to 600s - # Dead letter settings - dead_letter_topic: "my-dead-letter-topic" # Auto-created if create_topic_if_missing is true + # Dead letter settings (DLQ topic is auto-named {subscriptionID}-dlq) dead_letter_max_attempts: 5 # 5-100 (default: 5) # Receive settings (client-side flow control) @@ -289,7 +288,7 @@ broker: The `create_*_if_missing` settings only apply **if you have sufficient permissions** (e.g., `pubsub.topics.create` and/or `pubsub.subscriptions.create` on the GCP project). -If you use **dead letter topics** (`dead_letter_topic`), the library will also auto-create the dead letter topic—as long as `create_topic_if_missing` is enabled. +When `create_topic_if_missing` is enabled, the library also auto-creates the dead letter topic (`{subscriptionID}-dlq`). **Best practice:** - Use the auto-create flags in development environments. @@ -383,9 +382,7 @@ After setting the maximun number of allowed "in flight" messages, further settin - Minimum and maximum delay between delivery retries for failed messages. **Dead Letter Settings:** - - **`broker.googlepubsub.dead_letter_topic`**: - - Topic name for messages that fail delivery after max attempts. - - The dead letter topic is automatically created if `create_topic_if_missing` is true. + - DLQ topic is always named `{subscriptionID}-dlq` (not configurable). Auto-created when `create_topic_if_missing` is true. - **`broker.googlepubsub.dead_letter_max_attempts`** (5-100, default: 5): - Maximum number of delivery attempts before sending to the dead letter topic. diff --git a/broker/broker.go b/broker/broker.go index 1b32e68..817378f 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -12,6 +12,7 @@ import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" "github.com/cloudevents/sdk-go/v2/event" + "github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger" ) @@ -19,6 +20,11 @@ const ( DefaultSubscriberParallelism = 1 DefaultTestParallelism = 1 DefaultShutdownTimeout = 30 * time.Second + + // BrokerTypeRabbitMQ is the broker type identifier for RabbitMQ. + BrokerTypeRabbitMQ = "rabbitmq" + // BrokerTypeGooglePubSub is the broker type identifier for Google Pub/Sub. + BrokerTypeGooglePubSub = "googlepubsub" ) // NewPublisher creates a new publisher with the provided logger, metrics recorder, and optional configuration. @@ -67,13 +73,13 @@ func NewPublisher(log logger.Logger, metrics *MetricsRecorder, configMap ...map[ var healthCloser io.Closer switch cfg.Broker.Type { - case "rabbitmq": + case BrokerTypeRabbitMQ: pub, err = newRabbitMQPublisher(cfg, watermillLogger) if err != nil { return nil, fmt.Errorf("failed to create RabbitMQ publisher: %w", err) } hc = newRabbitMQHealthCheck(pub) - case "googlepubsub": + case BrokerTypeGooglePubSub: pub, err = newGooglePubSubPublisher(cfg, watermillLogger) if err != nil { return nil, fmt.Errorf("failed to create Google Pub/Sub publisher: %w", err) @@ -84,7 +90,11 @@ func NewPublisher(log logger.Logger, metrics *MetricsRecorder, configMap ...map[ healthClient, err = pubsub.NewClient(context.Background(), cfg.Broker.GooglePubSub.ProjectID) if err != nil { if closeErr := pub.Close(); closeErr != nil { - return nil, fmt.Errorf("failed to create health check client: %w (also failed to close publisher: %v)", err, closeErr) + return nil, fmt.Errorf( + "failed to create health check client: %w (also failed to close publisher: %v)", + err, + closeErr, + ) } return nil, fmt.Errorf("failed to create health check client: %w", err) } @@ -104,11 +114,17 @@ func NewPublisher(log logger.Logger, metrics *MetricsRecorder, configMap ...map[ }, nil } -// NewSubscriber creates a new subscriber with the provided logger, subscription ID, metrics recorder, and optional configuration. +// NewSubscriber creates a new subscriber with the provided logger, subscription ID, +// metrics recorder, and optional configuration. // Usage: // - NewSubscriber(logger, "id", metrics) - uses provided logger and loads config from file // - NewSubscriber(logger, "id", metrics, configMap) - uses provided logger with config map -func NewSubscriber(log logger.Logger, subscriptionID string, metrics *MetricsRecorder, configMap ...map[string]string) (Subscriber, error) { +func NewSubscriber( + log logger.Logger, + subscriptionID string, + metrics *MetricsRecorder, + configMap ...map[string]string, +) (Subscriber, error) { if subscriptionID == "" { return nil, fmt.Errorf("subscriptionID is required") } @@ -149,12 +165,12 @@ func NewSubscriber(log logger.Logger, subscriptionID string, metrics *MetricsRec var sub message.Subscriber switch cfg.Broker.Type { - case "rabbitmq": + case BrokerTypeRabbitMQ: sub, err = newRabbitMQSubscriber(cfg, watermillLogger, subscriptionID) if err != nil { return nil, fmt.Errorf("failed to create RabbitMQ subscriber: %w", err) } - case "googlepubsub": + case BrokerTypeGooglePubSub: sub, err = newGooglePubSubSubscriber(cfg, watermillLogger, subscriptionID) if err != nil { return nil, fmt.Errorf("failed to create Google Pub/Sub subscriber: %w", err) @@ -182,7 +198,7 @@ func NewSubscriber(log logger.Logger, subscriptionID string, metrics *MetricsRec func logConfiguration(cfg *config, component string, logger watermill.LoggerAdapter) { // Create a copy of config with masked password for logging logCfg := *cfg - if cfg.Broker.Type == "rabbitmq" && cfg.Broker.RabbitMQ.URL != "" { + if cfg.Broker.Type == BrokerTypeRabbitMQ && cfg.Broker.RabbitMQ.URL != "" { logCfg.Broker.RabbitMQ.URL = maskPassword(cfg.Broker.RabbitMQ.URL) } @@ -196,7 +212,14 @@ func logConfiguration(cfg *config, component string, logger watermill.LoggerAdap return } - logger.Info(fmt.Sprintf("=== %s Configuration (JSON) ===\n%s\n========================================", component, string(jsonBytes)), nil) + logger.Info( + fmt.Sprintf( + "=== %s Configuration (JSON) ===\n%s\n========================================", + component, + string(jsonBytes), + ), + nil, + ) } // maskPassword masks passwords in URLs for logging diff --git a/broker/cloudevents_test.go b/broker/cloudevents_test.go index 6b689f0..47745c8 100644 --- a/broker/cloudevents_test.go +++ b/broker/cloudevents_test.go @@ -14,10 +14,10 @@ import ( func TestMessageToEvent(t *testing.T) { tests := []struct { - name string setupMsg func() *message.Message - expectError bool validate func(*testing.T, *event.Event) + name string + expectError bool }{ { name: "valid message with all attributes", @@ -74,7 +74,9 @@ func TestMessageToEvent(t *testing.T) { msg := message.NewMessage("test-uuid", []byte{}) return msg }, - expectError: true, // unmarshal empty bytes usually fails or returns empty struct which might be invalid CloudEvent if required fields missing + // unmarshal empty bytes usually fails or returns empty struct + // which might be invalid CloudEvent if required fields missing + expectError: true, validate: nil, }, } @@ -99,10 +101,10 @@ func TestMessageToEvent(t *testing.T) { func TestEventToMessage(t *testing.T) { tests := []struct { - name string setupEvent func() *event.Event - expectError bool validate func(*testing.T, *message.Message) + name string + expectError bool }{ { name: "valid event with all attributes", diff --git a/broker/config.go b/broker/config.go index 5de3f9f..3bf704f 100644 --- a/broker/config.go +++ b/broker/config.go @@ -11,9 +11,9 @@ import ( // config holds the broker configuration type config struct { - LogConfig bool `mapstructure:"log_config"` Broker brokerConfig `mapstructure:"broker"` Subscriber subscriberConfig `mapstructure:"subscriber"` + LogConfig bool `mapstructure:"log_config"` } // brokerConfig holds broker-specific configuration @@ -134,9 +134,9 @@ func buildConfigFromMap(configMap map[string]string) (*config, error) { // validateConfig validates the configuration based on the broker type func validateConfig(cfg *config) error { switch cfg.Broker.Type { - case "rabbitmq": + case BrokerTypeRabbitMQ: return validateRabbitMQConfig(cfg) - case "googlepubsub": + case BrokerTypeGooglePubSub: return validateGooglePubSubConfig(cfg) default: return fmt.Errorf("unsupported broker type: %s", cfg.Broker.Type) diff --git a/broker/config_test.go b/broker/config_test.go index 330ac42..b5d007c 100644 --- a/broker/config_test.go +++ b/broker/config_test.go @@ -11,10 +11,10 @@ import ( func TestBuildConfigFromMap(t *testing.T) { tests := []struct { - name string configMap map[string]string - expectError bool validate func(*testing.T, *config) + name string + expectError bool }{ { name: "valid rabbitmq config", @@ -147,11 +147,11 @@ func TestLoadConfig(t *testing.T) { }() tests := []struct { - name string - setup func(*testing.T) string // Returns config file path + setup func(*testing.T) string cleanup func(*testing.T, string) - expectError bool validate func(*testing.T, *config) + name string + expectError bool }{ { name: "load from valid yaml file", @@ -373,12 +373,12 @@ broker: ` err := os.WriteFile(configPath, []byte(configContent), 0644) require.NoError(t, err) - if err := os.Setenv("BROKER_CONFIG_FILE", configPath); err != nil { - t.Fatalf("failed to set BROKER_CONFIG_FILE: %v", err) + if setErr := os.Setenv("BROKER_CONFIG_FILE", configPath); setErr != nil { + t.Fatalf("failed to set BROKER_CONFIG_FILE: %v", setErr) } defer func() { - if err := os.Unsetenv("BROKER_CONFIG_FILE"); err != nil { - t.Logf("failed to unset BROKER_CONFIG_FILE: %v", err) + if unsetErr := os.Unsetenv("BROKER_CONFIG_FILE"); unsetErr != nil { + t.Logf("failed to unset BROKER_CONFIG_FILE: %v", unsetErr) } }() @@ -398,8 +398,8 @@ func TestBuildConfigFromMapWithInvalidTypes(t *testing.T) { // Test that invalid type conversions are handled // Note: Viper might handle some conversions, so these tests verify behavior tests := []struct { - name string configMap map[string]string + name string expectError bool }{ { @@ -437,10 +437,10 @@ func TestBuildConfigFromMapWithInvalidTypes(t *testing.T) { func TestGooglePubSubConfigValidation(t *testing.T) { tests := []struct { - name string configMap map[string]string - expectError bool + name string errorMsg string + expectError bool }{ { name: "valid config with all settings", @@ -453,7 +453,6 @@ func TestGooglePubSubConfigValidation(t *testing.T) { "broker.googlepubsub.enable_message_ordering": "true", "broker.googlepubsub.retry_min_backoff": "10s", "broker.googlepubsub.retry_max_backoff": "600s", - "broker.googlepubsub.dead_letter_topic": "dead-letter", "broker.googlepubsub.dead_letter_max_attempts": "10", "broker.googlepubsub.max_outstanding_messages": "1000", "broker.googlepubsub.max_outstanding_bytes": "104857600", diff --git a/broker/error_test.go b/broker/error_test.go index 284209c..85aeef9 100644 --- a/broker/error_test.go +++ b/broker/error_test.go @@ -3,9 +3,10 @@ package broker import ( "testing" - "github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" + + "github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger" ) func newErrorTestMetrics(t *testing.T) *MetricsRecorder { @@ -15,10 +16,10 @@ func newErrorTestMetrics(t *testing.T) *MetricsRecorder { func TestNewPublisherErrorHandling(t *testing.T) { tests := []struct { - name string configMap map[string]string - expectError bool + name string errorMsg string + expectError bool }{ { name: "unsupported broker type", @@ -84,18 +85,16 @@ func TestNewPublisherErrorHandling(t *testing.T) { if tt.errorMsg != "" { assert.Contains(t, err.Error(), tt.errorMsg) } - } else { + } else if err == nil { // If no error expected, verify publisher is created // Note: Some publishers might fail on actual use, but creation should succeed - if err == nil { - assert.NotNil(t, pub) - if pub != nil { - defer func() { - if err := pub.Close(); err != nil { - t.Logf("failed to close publisher: %v", err) - } - }() - } + assert.NotNil(t, pub) + if pub != nil { + defer func() { + if closeErr := pub.Close(); closeErr != nil { + t.Logf("failed to close publisher: %v", closeErr) + } + }() } } }) @@ -104,11 +103,11 @@ func TestNewPublisherErrorHandling(t *testing.T) { func TestNewSubscriberErrorHandling(t *testing.T) { tests := []struct { + configMap map[string]string name string subscriptionID string - configMap map[string]string - expectError bool errorMsg string + expectError bool }{ { name: "empty subscription ID", @@ -174,17 +173,15 @@ func TestNewSubscriberErrorHandling(t *testing.T) { if tt.errorMsg != "" { assert.Contains(t, err.Error(), tt.errorMsg) } - } else { + } else if err == nil { // If no error expected, verify subscriber is created - if err == nil { - assert.NotNil(t, sub) - if sub != nil { - defer func() { - if err := sub.Close(); err != nil { - t.Logf("failed to close subscriber: %v", err) - } - }() - } + assert.NotNil(t, sub) + if sub != nil { + defer func() { + if closeErr := sub.Close(); closeErr != nil { + t.Logf("failed to close subscriber: %v", closeErr) + } + }() } } }) @@ -207,8 +204,8 @@ func TestPublisherPublishErrorHandling(t *testing.T) { func TestBuildConfigFromMapErrorHandling(t *testing.T) { tests := []struct { - name string configMap map[string]string + name string expectError bool }{ { @@ -238,11 +235,8 @@ func TestBuildConfigFromMapErrorHandling(t *testing.T) { assert.Error(t, err) assert.Nil(t, cfg) } else { - // For empty map, it should return default config - if len(tt.configMap) == 0 { - assert.NoError(t, err) - assert.NotNil(t, cfg) - } + assert.NoError(t, err) + assert.NotNil(t, cfg) } }) } diff --git a/broker/errors.go b/broker/errors.go index e95dab1..330d68d 100644 --- a/broker/errors.go +++ b/broker/errors.go @@ -8,24 +8,12 @@ import ( // SubscriberError represents an error that occurred during message processing // in background goroutines. These errors are sent via the Errors() channel. type SubscriberError struct { - // Op is the operation that failed: "router", "connect", "receive" - Op string - - // Topic where the error occurred - Topic string - - // SubscriptionID of the subscriber + Timestamp time.Time + Err error + Op string + Topic string SubscriptionID string - - // Err is the underlying error - Err error - - // Timestamp is when the error occurred - Timestamp time.Time - - // Fatal indicates if the subscriber has stopped and cannot continue operating. - // If true, the subscriber needs to be recreated. - Fatal bool + Fatal bool } // Error implements the error interface diff --git a/broker/googlepubsub.go b/broker/googlepubsub.go index 94ca9b5..66314ce 100644 --- a/broker/googlepubsub.go +++ b/broker/googlepubsub.go @@ -19,32 +19,20 @@ import ( // googlePubSubConfig holds Google Pub/Sub-specific configuration type googlePubSubConfig struct { - // Connection settings - ProjectID string `mapstructure:"project_id"` - - // Subscription settings - AckDeadlineSeconds int `mapstructure:"ack_deadline_seconds"` - MessageRetentionDuration string `mapstructure:"message_retention_duration"` - ExpirationTTL string `mapstructure:"expiration_ttl"` - EnableMessageOrdering bool `mapstructure:"enable_message_ordering"` - RetryMinBackoff string `mapstructure:"retry_min_backoff"` - RetryMaxBackoff string `mapstructure:"retry_max_backoff"` - - // Dead letter settings - DeadLetterTopic string `mapstructure:"dead_letter_topic"` - DeadLetterMaxAttempts int `mapstructure:"dead_letter_max_attempts"` - - // Topic settings - TopicRetentionDuration string `mapstructure:"topic_retention_duration"` - - // Receive settings (client-side flow control) - MaxOutstandingMessages int `mapstructure:"max_outstanding_messages"` - MaxOutstandingBytes int `mapstructure:"max_outstanding_bytes"` - NumGoroutines int `mapstructure:"num_goroutines"` - - // Behavior flags (default: false - don't auto-create infrastructure) - CreateTopicIfMissing bool `mapstructure:"create_topic_if_missing"` - CreateSubscriptionIfMissing bool `mapstructure:"create_subscription_if_missing"` + RetryMaxBackoff string `mapstructure:"retry_max_backoff"` + ProjectID string `mapstructure:"project_id"` + MessageRetentionDuration string `mapstructure:"message_retention_duration"` + ExpirationTTL string `mapstructure:"expiration_ttl"` + RetryMinBackoff string `mapstructure:"retry_min_backoff"` + TopicRetentionDuration string `mapstructure:"topic_retention_duration"` + MaxOutstandingMessages int `mapstructure:"max_outstanding_messages"` + DeadLetterMaxAttempts int `mapstructure:"dead_letter_max_attempts"` + AckDeadlineSeconds int `mapstructure:"ack_deadline_seconds"` + MaxOutstandingBytes int `mapstructure:"max_outstanding_bytes"` + NumGoroutines int `mapstructure:"num_goroutines"` + EnableMessageOrdering bool `mapstructure:"enable_message_ordering"` + CreateTopicIfMissing bool `mapstructure:"create_topic_if_missing"` + CreateSubscriptionIfMissing bool `mapstructure:"create_subscription_if_missing"` } // parseGoogleCloudDuration parses a Google Cloud duration string (e.g., "604800s", "10m", "1h") @@ -121,12 +109,16 @@ func newGooglePubSubPublisher(cfg *config, logger watermill.LoggerAdapter) (mess } // ensureDeadLetterTopicExists creates the dead letter topic if it doesn't exist -func ensureDeadLetterTopicExists(ctx context.Context, projectID, topicName string, logger watermill.LoggerAdapter) error { +func ensureDeadLetterTopicExists( + ctx context.Context, + projectID, topicName string, + logger watermill.LoggerAdapter, +) error { client, err := pubsub.NewClient(ctx, projectID) if err != nil { return fmt.Errorf("failed to create pubsub client: %w", err) } - defer func() { _ = client.Close() }() + defer func() { _ = client.Close() }() //nolint:errcheck // best-effort cleanup fullyQualifiedName := fmt.Sprintf("projects/%s/topics/%s", projectID, topicName) @@ -177,7 +169,11 @@ func ensureDeadLetterTopicExists(ctx context.Context, projectID, topicName strin } // newGooglePubSubSubscriber creates a Google Pub/Sub subscriber -func newGooglePubSubSubscriber(cfg *config, logger watermill.LoggerAdapter, subscriptionID string) (message.Subscriber, error) { +func newGooglePubSubSubscriber( + cfg *config, + logger watermill.LoggerAdapter, + subscriptionID string, +) (message.Subscriber, error) { gps := cfg.Broker.GooglePubSub // If dead letter topic is configured and we're allowed to create topics, ensure it exists @@ -196,8 +192,10 @@ func newGooglePubSubSubscriber(cfg *config, logger watermill.LoggerAdapter, subs GenerateSubscriptionName: func(topic string) string { return subscriptionID }, - DoNotCreateTopicIfMissing: !gps.CreateTopicIfMissing, // Invert: our positive flag -> watermill's negative flag - DoNotCreateSubscriptionIfMissing: !gps.CreateSubscriptionIfMissing, // Invert: our positive flag -> watermill's negative flag + // Invert: our positive flag -> watermill's negative flag + DoNotCreateTopicIfMissing: !gps.CreateTopicIfMissing, + // Invert: our positive flag -> watermill's negative flag + DoNotCreateSubscriptionIfMissing: !gps.CreateSubscriptionIfMissing, } // Set ReceiveSettings for client-side flow control @@ -345,8 +343,8 @@ func validateGooglePubSubConfig(cfg *config) error { // Validate retry backoff relationship (min <= max) if gps.RetryMinBackoff != "" && gps.RetryMaxBackoff != "" { - minBackoff, _ := parseGoogleCloudDuration(gps.RetryMinBackoff) - maxBackoff, _ := parseGoogleCloudDuration(gps.RetryMaxBackoff) + minBackoff, _ := parseGoogleCloudDuration(gps.RetryMinBackoff) //nolint:errcheck // already validated above + maxBackoff, _ := parseGoogleCloudDuration(gps.RetryMaxBackoff) //nolint:errcheck // already validated above if minBackoff > maxBackoff { return fmt.Errorf("googlepubsub.retry_min_backoff must be less than or equal to retry_max_backoff") } diff --git a/broker/health_test.go b/broker/health_test.go index 9f11344..fc73b3c 100644 --- a/broker/health_test.go +++ b/broker/health_test.go @@ -6,8 +6,9 @@ import ( "testing" "github.com/ThreeDotsLabs/watermill/message" - "github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger" "github.com/stretchr/testify/assert" + + "github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger" ) func TestPublisherHealthWithCustomHealthCheck(t *testing.T) { diff --git a/broker/metrics.go b/broker/metrics.go index f993e3d..2005a93 100644 --- a/broker/metrics.go +++ b/broker/metrics.go @@ -8,13 +8,12 @@ import ( // MetricsRecorder holds Prometheus metrics for the broker library. type MetricsRecorder struct { - component string - version string - messagesConsumed *prometheus.CounterVec messagesPublished *prometheus.CounterVec errors *prometheus.CounterVec messageDuration *prometheus.HistogramVec + component string + version string } // NewMetricsRecorder creates a new MetricsRecorder and registers all metrics diff --git a/broker/metrics_test.go b/broker/metrics_test.go index 8c98bf4..b292708 100644 --- a/broker/metrics_test.go +++ b/broker/metrics_test.go @@ -8,11 +8,12 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/cloudevents/sdk-go/v2/event" - "github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger" ) func newTestRegistry(t *testing.T) (*MetricsRecorder, *prometheus.Registry) { @@ -125,10 +126,20 @@ func TestRecordError(t *testing.T) { m.RecordError("orders", "conversion") m.RecordError("orders", "handler") - handlerLabels := map[string]string{"topic": "orders", "error_type": "handler", "component": "test-component", "version": "v0.1.0"} + handlerLabels := map[string]string{ + "topic": "orders", + "error_type": "handler", + "component": "test-component", + "version": "v0.1.0", + } assert.Equal(t, float64(2), getCounterValue(t, reg, "hyperfleet_broker_errors_total", handlerLabels)) - conversionLabels := map[string]string{"topic": "orders", "error_type": "conversion", "component": "test-component", "version": "v0.1.0"} + conversionLabels := map[string]string{ + "topic": "orders", + "error_type": "conversion", + "component": "test-component", + "version": "v0.1.0", + } assert.Equal(t, float64(1), getCounterValue(t, reg, "hyperfleet_broker_errors_total", conversionLabels)) } @@ -181,7 +192,12 @@ func TestPublisherMetricsIntegration(t *testing.T) { err := p.Publish(context.Background(), "fail-topic", &evt) assert.Error(t, err) - labels := map[string]string{"topic": "fail-topic", "error_type": "publish", "component": "test-publisher", "version": "v1.0.0"} + labels := map[string]string{ + "topic": "fail-topic", + "error_type": "publish", + "component": "test-publisher", + "version": "v1.0.0", + } assert.Equal(t, float64(1), getCounterValue(t, reg, "hyperfleet_broker_errors_total", labels)) }) } diff --git a/broker/options_test.go b/broker/options_test.go index 9c349eb..0cd5ed1 100644 --- a/broker/options_test.go +++ b/broker/options_test.go @@ -3,9 +3,10 @@ package broker import ( "testing" - "github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" + + "github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger" ) // These tests verify parameter validation only. diff --git a/broker/publisher.go b/broker/publisher.go index fcec03b..cc7e430 100644 --- a/broker/publisher.go +++ b/broker/publisher.go @@ -7,6 +7,7 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/cloudevents/sdk-go/v2/event" + "github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger" ) diff --git a/broker/rabbitmq.go b/broker/rabbitmq.go index 5acbb4e..7dbbfe3 100644 --- a/broker/rabbitmq.go +++ b/broker/rabbitmq.go @@ -17,9 +17,9 @@ type rabbitMQConfig struct { ExchangeType string `mapstructure:"exchange_type"` Queue string `mapstructure:"queue"` RoutingKey string `mapstructure:"routing_key"` + ConsumerTag string `mapstructure:"consumer_tag"` PrefetchCount int `mapstructure:"prefetch_count"` PrefetchSize int `mapstructure:"prefetch_size"` - ConsumerTag string `mapstructure:"consumer_tag"` PublisherConfirm bool `mapstructure:"publisher_confirm"` } @@ -78,7 +78,11 @@ func newRabbitMQPublisher(cfg *config, logger watermill.LoggerAdapter) (message. } // newRabbitMQSubscriber creates a RabbitMQ subscriber -func newRabbitMQSubscriber(cfg *config, logger watermill.LoggerAdapter, subscriptionID string) (message.Subscriber, error) { +func newRabbitMQSubscriber( + cfg *config, + logger watermill.LoggerAdapter, + subscriptionID string, +) (message.Subscriber, error) { // Create a queue name generator that incorporates subscription IDs // The topic passed to Subscribe will be the original topic, and we append subscription ID for queue naming queueNameGenerator := func(topic string) string { diff --git a/broker/subscriber.go b/broker/subscriber.go index f80eb54..1dce9b0 100644 --- a/broker/subscriber.go +++ b/broker/subscriber.go @@ -10,6 +10,7 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/router/middleware" "github.com/cloudevents/sdk-go/v2/event" + "github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger" ) @@ -39,26 +40,19 @@ const ( // subscriber wraps a Watermill subscriber and provides worker pools for parallel message processing type subscriber struct { + logger logger.Logger sub message.Subscriber - parallelism int - subscriptionID string + metrics *MetricsRecorder + errorChan chan *SubscriberError brokerType string - logger logger.Logger // Broker logger (always present - default logger if not provided) + subscriptionID string + routers []*message.Router + cancelFns []context.CancelFunc wg sync.WaitGroup - - // Routers and cancel functions for all subscriptions, used by Close() to ensure clean shutdown - routers []*message.Router - cancelFns []context.CancelFunc - routersMu sync.Mutex - - // Error notification channel - errorChan chan *SubscriberError - - // Track if closed to prevent sending on closed channel - closed bool - closeMu sync.RWMutex - - metrics *MetricsRecorder + parallelism int + closeMu sync.RWMutex + routersMu sync.Mutex + closed bool } // Subscribe subscribes to a topic and processes messages with the provided handler @@ -119,7 +113,8 @@ func (s *subscriber) Subscribe(ctx context.Context, topic string, handler Handle s.logger.Errorf(msgCtx, "Handler failed to process event: %v", err) s.metrics.RecordError(topic, "handler") } else { - s.logger.Debugf(msgCtx, "Successfully processed event %s from topic %s subscription %s", evt.ID(), topic, s.subscriptionID) + s.logger.Debugf(msgCtx, "Successfully processed event %s from topic %s subscription %s", + evt.ID(), topic, s.subscriptionID) } return err diff --git a/example/broker.example.yaml b/example/broker.example.yaml index 08f1033..0b2e2b1 100644 --- a/example/broker.example.yaml +++ b/example/broker.example.yaml @@ -52,8 +52,7 @@ broker: retry_min_backoff: "10s" retry_max_backoff: "600s" - # Dead letter settings (for messages that fail repeatedly) - dead_letter_topic: "my-dead-letter-topic" + # Dead letter settings (DLQ topic is auto-named {subscriptionID}-dlq) dead_letter_max_attempts: 5 # 5-100, default: 5 # Topic settings diff --git a/example/cmd/publisher/main.go b/example/cmd/publisher/main.go index 53fac45..1993ab3 100644 --- a/example/cmd/publisher/main.go +++ b/example/cmd/publisher/main.go @@ -9,6 +9,7 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/event" + "github.com/openshift-hyperfleet/hyperfleet-broker/broker" "github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger" ) @@ -54,7 +55,12 @@ func main() { } // Continuous publishing mode - log.Printf("Publisher started. Publishing events to topic: %s with interval: %v, broker type: %s", *topic, *interval, publisher.BrokerType()) + log.Printf( + "Publisher started. Publishing events to topic: %s with interval: %v, broker type: %s", + *topic, + *interval, + publisher.BrokerType(), + ) log.Printf("Press Ctrl+C to stop...") // Publish events diff --git a/example/cmd/subscriber/main.go b/example/cmd/subscriber/main.go index f010ce6..260531e 100644 --- a/example/cmd/subscriber/main.go +++ b/example/cmd/subscriber/main.go @@ -10,6 +10,7 @@ import ( "time" "github.com/cloudevents/sdk-go/v2/event" + "github.com/openshift-hyperfleet/hyperfleet-broker/broker" "github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger" ) @@ -39,7 +40,13 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - log.Printf("Subscriber instance %s started. Listening on topic: %s with subscription ID: %s, broker type: %s", instanceID, *topic, *subscription, subscriber.BrokerType()) + log.Printf( + "Subscriber instance %s started. Listening on topic: %s with subscription ID: %s, broker type: %s", + instanceID, + *topic, + *subscription, + subscriber.BrokerType(), + ) // Start error handler goroutine to monitor infrastructure errors go func() { diff --git a/example/googlepubsub/broker.yaml b/example/googlepubsub/broker.yaml index 8d5df05..c947323 100644 --- a/example/googlepubsub/broker.yaml +++ b/example/googlepubsub/broker.yaml @@ -22,8 +22,7 @@ broker: retry_min_backoff: "10s" retry_max_backoff: "600s" - # Dead letter settings (for messages that fail repeatedly) - # dead_letter_topic: "my-dead-letter-topic" + # Dead letter settings (DLQ topic is auto-named {subscriptionID}-dlq) # dead_letter_max_attempts: 5 # 5-100, default: 5 # Topic settings @@ -39,7 +38,6 @@ broker: # Set to true to automatically create topics/subscriptions if they don't exist create_topic_if_missing: true create_subscription_if_missing: true - dead_letter_topic: "my-dead-letter-topic" dead_letter_max_attempts: 5 subscriber: diff --git a/pkg/logger/mock.go b/pkg/logger/mock.go index e967436..981a259 100644 --- a/pkg/logger/mock.go +++ b/pkg/logger/mock.go @@ -8,14 +8,14 @@ import ( // MockLogger implements the Logger interface for testing type MockLogger struct { - mu sync.RWMutex logs []LogEntry + mu sync.RWMutex } type LogEntry struct { + Context context.Context Level string Message string - Context context.Context } func NewMockLogger() *MockLogger { diff --git a/test/integration/broker_leak_test.go b/test/integration/broker_leak_test.go index de53333..508601f 100644 --- a/test/integration/broker_leak_test.go +++ b/test/integration/broker_leak_test.go @@ -25,7 +25,7 @@ import ( // Shared container URLs created once in TestMain and reused across all tests in this package. var ( - sharedRabbitMQURL string + sharedRabbitMQURL string sharedPubSubProjectID string ) @@ -151,7 +151,11 @@ func setupBrokerTest(t *testing.T, cfg brokerTestConfig) map[string]string { // created by Subscribe(). Acts as a regression guard against goroutine leaks. func testGoroutineLeak(t *testing.T, cfg brokerTestConfig) { configMap := setupBrokerTest(t, cfg) - pub, err := broker.NewPublisher(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), common.NewTestMetrics(t), configMap) + pub, err := broker.NewPublisher( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + common.NewTestMetrics(t), + configMap, + ) require.NoError(t, err) // Clean up environment @@ -160,7 +164,12 @@ func testGoroutineLeak(t *testing.T, cfg brokerTestConfig) { before := runtime.NumGoroutine() t.Logf("📊 Goroutines BEFORE: %d", before) - sub, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), "leak-demo", common.NewTestMetrics(t), configMap) + sub, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + "leak-demo", + common.NewTestMetrics(t), + configMap, + ) require.NoError(t, err) ctx := context.Background() @@ -176,8 +185,8 @@ func testGoroutineLeak(t *testing.T, cfg brokerTestConfig) { t.Log("") for i := 0; i < numSubscriptions; i++ { - err := sub.Subscribe(ctx, fmt.Sprintf("topic-%d", i), handler) - require.NoError(t, err) + subErr := sub.Subscribe(ctx, fmt.Sprintf("topic-%d", i), handler) + require.NoError(t, subErr) } for i := 0; i < numSubscriptions; i++ { @@ -185,8 +194,8 @@ func testGoroutineLeak(t *testing.T, cfg brokerTestConfig) { evt.SetType("com.example.test.event") evt.SetSource("test-source") evt.SetID(fmt.Sprintf("error-id-%d", i)) - if err := evt.SetData(event.ApplicationJSON, map[string]int{"index": i}); err != nil { - require.NoError(t, err, "failed to set event data") + if setErr := evt.SetData(event.ApplicationJSON, map[string]int{"index": i}); setErr != nil { + require.NoError(t, setErr, "failed to set event data") } err = pub.Publish(context.Background(), "topic-"+strconv.Itoa(i), &evt) @@ -231,8 +240,14 @@ func testGoroutineLeak(t *testing.T, cfg brokerTestConfig) { // RESULT if leaked > cfg.leakTolerance { - assert.FailNow(t, - fmt.Sprintf("GOROUTINE LEAK REGRESSION: %d goroutines leaked after Close() (tolerance: %d)", leaked, cfg.leakTolerance)) + assert.FailNow( + t, + fmt.Sprintf( + "GOROUTINE LEAK REGRESSION: %d goroutines leaked after Close() (tolerance: %d)", + leaked, + cfg.leakTolerance, + ), + ) } else { t.Logf("✅ OK: Only %d goroutines remaining (acceptable, tolerance: %d)", leaked, cfg.leakTolerance) } @@ -288,7 +303,12 @@ func testLeakIncreasesWithUsage(t *testing.T, cfg brokerTestConfig) { configMap := setupBrokerTest(t, cfg) - sub, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), "leak-demo", common.NewTestMetrics(t), configMap) + sub, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + "leak-demo", + common.NewTestMetrics(t), + configMap, + ) require.NoError(t, err) ctx := context.Background() @@ -359,10 +379,19 @@ func testMultipleSubscriptionsSameTopic(t *testing.T, cfg brokerTestConfig) { t.Logf("📊 Goroutines BEFORE: %d", before) // Create publisher and subscriber - pub, err := broker.NewPublisher(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), common.NewTestMetrics(t), configMap) + pub, err := broker.NewPublisher( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + common.NewTestMetrics(t), + configMap, + ) require.NoError(t, err) - sub, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), "same-topic-test", common.NewTestMetrics(t), configMap) + sub, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + "same-topic-test", + common.NewTestMetrics(t), + configMap, + ) require.NoError(t, err) ctx := context.Background() @@ -391,8 +420,8 @@ func testMultipleSubscriptionsSameTopic(t *testing.T, cfg brokerTestConfig) { mu.Unlock() return nil } - err := sub.Subscribe(ctx, sameTopic, handler) - require.NoError(t, err, "Failed to subscribe handler %d", i) + subErr := sub.Subscribe(ctx, sameTopic, handler) + require.NoError(t, subErr, "Failed to subscribe handler %d", i) } // Wait for goroutines to start @@ -411,8 +440,8 @@ func testMultipleSubscriptionsSameTopic(t *testing.T, cfg brokerTestConfig) { evt.SetType("com.example.test.event") evt.SetSource("test-source") evt.SetID(fmt.Sprintf("msg-id-%d", i)) - if err := evt.SetData(event.ApplicationJSON, map[string]int{"index": i}); err != nil { - require.NoError(t, err, "failed to set event data") + if setErr := evt.SetData(event.ApplicationJSON, map[string]int{"index": i}); setErr != nil { + require.NoError(t, setErr, "failed to set event data") } err = pub.Publish(context.Background(), sameTopic, &evt) @@ -443,8 +472,15 @@ func testMultipleSubscriptionsSameTopic(t *testing.T, cfg brokerTestConfig) { // Verify no goroutine leaks if leaked > cfg.leakTolerance { - assert.FailNow(t, - fmt.Sprintf("GOROUTINE LEAK REGRESSION: %d goroutines leaked after Close() with multiple subscriptions to same topic (tolerance: %d)", leaked, cfg.leakTolerance)) + assert.FailNow( + t, + fmt.Sprintf( + "GOROUTINE LEAK REGRESSION: %d goroutines leaked after Close() "+ + "with multiple subscriptions to same topic (tolerance: %d)", + leaked, + cfg.leakTolerance, + ), + ) } else { t.Logf("✅ SUCCESS: Only %d goroutines remaining (acceptable, tolerance: %d)", leaked, cfg.leakTolerance) t.Log(" All goroutines from multiple subscriptions to the same topic were properly cleaned up") diff --git a/test/integration/broker_perf_test.go b/test/integration/broker_perf_test.go index b393c93..df0b92e 100644 --- a/test/integration/broker_perf_test.go +++ b/test/integration/broker_perf_test.go @@ -20,19 +20,19 @@ import ( ) // we keep this short for now to make the tests run faster -const TEST_DURATION = 10 * time.Second +const testDuration = 10 * time.Second // PerformanceTestResult holds the results of a performance test type PerformanceTestResult struct { + Timestamp time.Time `json:"timestamp"` BrokerType string `json:"broker_type"` TestDuration string `json:"test_duration"` - MessagesPublished int64 `json:"messages_published"` Subscriber1 SubscriberMetrics `json:"subscriber_1"` Subscriber2 SubscriberMetrics `json:"subscriber_2"` + MessagesPublished int64 `json:"messages_published"` TotalReceived int64 `json:"total_received"` PublishRate float64 `json:"publish_rate_per_sec"` ReceiveRate float64 `json:"receive_rate_per_sec"` - Timestamp time.Time `json:"timestamp"` } // SubscriberMetrics holds metrics for a single subscriber @@ -52,29 +52,43 @@ func TestRabbitMQPerformance(t *testing.T) { configMap := common.BuildConfigMap("rabbitmq", sharedRabbitMQURL, "") // Create publisher - pub, err := broker.NewPublisher(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), common.NewTestMetrics(t), configMap) + pub, err := broker.NewPublisher( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + common.NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := pub.Close(); err != nil { - t.Logf("failed to close publisher: %v", err) + if closeErr := pub.Close(); closeErr != nil { + t.Logf("failed to close publisher: %v", closeErr) } }() // Create two subscribers with the same subscriptionID (shared subscription) subscriptionID := "perf-subscription" - sub1, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), subscriptionID, common.NewTestMetrics(t), configMap) + sub1, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + subscriptionID, + common.NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := sub1.Close(); err != nil { - t.Logf("failed to close subscriber 1: %v", err) + if closeErr := sub1.Close(); closeErr != nil { + t.Logf("failed to close subscriber 1: %v", closeErr) } }() - sub2, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), subscriptionID, common.NewTestMetrics(t), configMap) + sub2, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + subscriptionID, + common.NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := sub2.Close(); err != nil { - t.Logf("failed to close subscriber 2: %v", err) + if closeErr := sub2.Close(); closeErr != nil { + t.Logf("failed to close subscriber 2: %v", closeErr) } }() @@ -86,15 +100,15 @@ func TestRabbitMQPerformance(t *testing.T) { // Set up handlers handler1 := func(ctx context.Context, e *event.Event) error { - //time.Sleep(1000 * time.Millisecond) - //t.Logf("Subscriber 1 received message %s - %d", e.ID(), sub1Received) + // time.Sleep(1000 * time.Millisecond) + // t.Logf("Subscriber 1 received message %s - %d", e.ID(), sub1Received) atomic.AddInt64(&sub1Received, 1) return nil } handler2 := func(ctx context.Context, e *event.Event) error { - //time.Sleep(10 * time.Millisecond) - //t.Logf("Subscriber 2 received message %s - %d", e.ID(), sub2Received) + // time.Sleep(10 * time.Millisecond) + // t.Logf("Subscriber 2 received message %s - %d", e.ID(), sub2Received) atomic.AddInt64(&sub2Received, 1) return nil } @@ -110,7 +124,7 @@ func TestRabbitMQPerformance(t *testing.T) { time.Sleep(500 * time.Millisecond) startTime := time.Now() - endTime := startTime.Add(TEST_DURATION) + endTime := startTime.Add(testDuration) // Start publisher goroutine wg.Add(1) @@ -194,34 +208,48 @@ func TestGooglePubSubPerformance(t *testing.T) { ctx := context.Background() configMap := common.BuildConfigMap("googlepubsub", "", sharedPubSubProjectID) - //configMap["broker.googlepubsub.max_outstanding_messages"] = "10" + // configMap["broker.googlepubsub.max_outstanding_messages"] = "10" // Set NumGoroutines to 1 to use a single streaming pull stream configMap["subscriber.parallelism"] = "10" configMap["broker.googlepubsub.num_goroutines"] = "10" // Create publisher - pub, err := broker.NewPublisher(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), common.NewTestMetrics(t), configMap) + pub, err := broker.NewPublisher( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + common.NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := pub.Close(); err != nil { - t.Logf("failed to close publisher: %v", err) + if closeErr := pub.Close(); closeErr != nil { + t.Logf("failed to close publisher: %v", closeErr) } }() // Create two subscribers with the same subscriptionID (shared subscription) subscriptionID := "perf-subscription" - sub1, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), subscriptionID, common.NewTestMetrics(t), configMap) + sub1, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + subscriptionID, + common.NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := sub1.Close(); err != nil { - t.Logf("failed to close subscriber 1: %v", err) + if closeErr := sub1.Close(); closeErr != nil { + t.Logf("failed to close subscriber 1: %v", closeErr) } }() - sub2, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), subscriptionID, common.NewTestMetrics(t), configMap) + sub2, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + subscriptionID, + common.NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := sub2.Close(); err != nil { - t.Logf("failed to close subscriber 2: %v", err) + if closeErr := sub2.Close(); closeErr != nil { + t.Logf("failed to close subscriber 2: %v", closeErr) } }() @@ -233,15 +261,15 @@ func TestGooglePubSubPerformance(t *testing.T) { // Set up handlers handler1 := func(ctx context.Context, e *event.Event) error { - //time.Sleep(1000 * time.Millisecond) - //t.Logf("Subscriber 1 received message %s - %d", e.ID(), sub1Received) + // time.Sleep(1000 * time.Millisecond) + // t.Logf("Subscriber 1 received message %s - %d", e.ID(), sub1Received) atomic.AddInt64(&sub1Received, 1) return nil } handler2 := func(ctx context.Context, e *event.Event) error { - //time.Sleep(1000 * time.Millisecond) - //t.Logf("Subscriber 2 received message %s - %d", e.ID(), sub2Received) + // time.Sleep(1000 * time.Millisecond) + // t.Logf("Subscriber 2 received message %s - %d", e.ID(), sub2Received) atomic.AddInt64(&sub2Received, 1) return nil } @@ -257,7 +285,7 @@ func TestGooglePubSubPerformance(t *testing.T) { time.Sleep(2 * time.Second) // Run test for at least 1 minute - //testDuration := 1 * time.Minute + // testDuration := 1 * time.Minute testDuration := 10 * time.Second startTime := time.Now() endTime := startTime.Add(testDuration) diff --git a/test/integration/common/common.go b/test/integration/common/common.go index 3a7878d..fd07a8d 100644 --- a/test/integration/common/common.go +++ b/test/integration/common/common.go @@ -1,4 +1,4 @@ -package common +package common //nolint:revive // test helper package import ( "context" @@ -11,11 +11,12 @@ import ( "time" "github.com/cloudevents/sdk-go/v2/event" - "github.com/openshift-hyperfleet/hyperfleet-broker/broker" - "github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/openshift-hyperfleet/hyperfleet-broker/broker" + "github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger" ) // NewTestMetrics creates a MetricsRecorder with an isolated registry for testing. @@ -68,11 +69,11 @@ func SetupTestEnvironment() { // BrokerTestConfig holds broker-specific test configuration type BrokerTestConfig struct { + SetupConfigFunc func(*testing.T, map[string]string) BrokerType string SetupSleep time.Duration ReceiveTimeout time.Duration - SetupConfigFunc func(*testing.T, map[string]string) // Optional function to modify config after initial setup - PublishDelay time.Duration // Delay between publishes (for gradual publishing, 0 = no delay) + PublishDelay time.Duration } // BuildConfigMap creates a configuration map for testing @@ -99,20 +100,29 @@ func RunPublisherSubscriber(t *testing.T, configMap map[string]string, cfg Broke topic := fmt.Sprintf("test-topic-%d", id) ctx := context.Background() // Create publisher - pub, err := broker.NewPublisher(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), NewTestMetrics(t), configMap) + pub, err := broker.NewPublisher( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := pub.Close(); err != nil { - t.Logf("failed to close publisher: %v", err) + if closeErr := pub.Close(); closeErr != nil { + t.Logf("failed to close publisher: %v", closeErr) } }() // Create subscriber subscriptionID := fmt.Sprintf("test-subscription-%d", id) - sub, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), subscriptionID, NewTestMetrics(t), configMap) + sub, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + subscriptionID, + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := sub.Close(); err != nil { - t.Logf("failed to close subscriber: %v", err) + if closeErr := sub.Close(); closeErr != nil { + t.Logf("failed to close subscriber: %v", closeErr) } }() // Create a CloudEvent @@ -120,10 +130,10 @@ func RunPublisherSubscriber(t *testing.T, configMap map[string]string, cfg Broke evt.SetType("com.example.test.event") evt.SetSource("test-source") evt.SetID("test-id-123") - if err := evt.SetData(event.ApplicationJSON, map[string]string{ + if setErr := evt.SetData(event.ApplicationJSON, map[string]string{ "message": "Hello, World!", - }); err != nil { - require.NoError(t, err, "failed to set event data") + }); setErr != nil { + require.NoError(t, setErr, "failed to set event data") } // Set up handler receivedEvents := make(chan *event.Event, 1) @@ -150,23 +160,33 @@ func RunPublisherSubscriber(t *testing.T, configMap map[string]string, cfg Broke } // RunMultipleEvents tests that multiple events are processed correctly + func RunMultipleEvents(t *testing.T, configMap map[string]string, cfg BrokerTestConfig) { id := uniqueSuffix() topic := fmt.Sprintf("routing-topic-%d", id) ctx := context.Background() - pub, err := broker.NewPublisher(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), NewTestMetrics(t), configMap) + pub, err := broker.NewPublisher( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := pub.Close(); err != nil { - t.Logf("failed to close publisher: %v", err) + if closeErr := pub.Close(); closeErr != nil { + t.Logf("failed to close publisher: %v", closeErr) } }() subscriptionID := fmt.Sprintf("test-subscription-%d", id) - sub, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), subscriptionID, NewTestMetrics(t), configMap) + sub, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + subscriptionID, + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := sub.Close(); err != nil { - t.Logf("failed to close subscriber: %v", err) + if closeErr := sub.Close(); closeErr != nil { + t.Logf("failed to close subscriber: %v", closeErr) } }() // Create two different event types @@ -174,15 +194,15 @@ func RunMultipleEvents(t *testing.T, configMap map[string]string, cfg BrokerTest evt1.SetType("com.example.event.type1") evt1.SetSource("test-source") evt1.SetID("id-1") - if err := evt1.SetData(event.ApplicationJSON, map[string]string{"type": "1"}); err != nil { - require.NoError(t, err, "failed to set event data") + if setErr := evt1.SetData(event.ApplicationJSON, map[string]string{"type": "1"}); setErr != nil { + require.NoError(t, setErr, "failed to set event data") } evt2 := event.New() evt2.SetType("com.example.event.type2") evt2.SetSource("test-source") evt2.SetID("id-2") - if err := evt2.SetData(event.ApplicationJSON, map[string]string{"type": "2"}); err != nil { - require.NoError(t, err, "failed to set event data") + if setErr := evt2.SetData(event.ApplicationJSON, map[string]string{"type": "2"}); setErr != nil { + require.NoError(t, setErr, "failed to set event data") } // Set up handler to collect all events receivedEvents := make(chan *event.Event, 2) @@ -218,28 +238,42 @@ func RunSharedSubscription(t *testing.T, configMap map[string]string, cfg Broker id := uniqueSuffix() topic := fmt.Sprintf("shared-topic-%d", id) ctx := context.Background() - pub, err := broker.NewPublisher(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), NewTestMetrics(t), configMap) + pub, err := broker.NewPublisher( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := pub.Close(); err != nil { - t.Logf("failed to close publisher: %v", err) + if closeErr := pub.Close(); closeErr != nil { + t.Logf("failed to close publisher: %v", closeErr) } }() // Create two subscribers with the same subscriptionID // They should share messages (load balancing) subscriptionID := fmt.Sprintf("shared-subscription-%d", id) - sub1, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), subscriptionID, NewTestMetrics(t), configMap) + sub1, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + subscriptionID, + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := sub1.Close(); err != nil { - t.Logf("failed to close subscriber 1: %v", err) + if closeErr := sub1.Close(); closeErr != nil { + t.Logf("failed to close subscriber 1: %v", closeErr) } }() - sub2, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), subscriptionID, NewTestMetrics(t), configMap) + sub2, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + subscriptionID, + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := sub2.Close(); err != nil { - t.Logf("failed to close subscriber 2: %v", err) + if closeErr := sub2.Close(); closeErr != nil { + t.Logf("failed to close subscriber 2: %v", closeErr) } }() // Set up handlers to collect events @@ -266,8 +300,8 @@ func RunSharedSubscription(t *testing.T, configMap map[string]string, cfg Broker evt.SetType("com.example.test.event") evt.SetSource("test-source") evt.SetID(fmt.Sprintf("id-%d", i)) - if err := evt.SetData(event.ApplicationJSON, map[string]int{"index": i}); err != nil { - require.NoError(t, err, "failed to set event data") + if setErr := evt.SetData(event.ApplicationJSON, map[string]int{"index": i}); setErr != nil { + require.NoError(t, setErr, "failed to set event data") } err = pub.Publish(ctx, topic, &evt) require.NoError(t, err) @@ -305,27 +339,41 @@ func RunFanoutSubscription(t *testing.T, configMap map[string]string, cfg Broker id := uniqueSuffix() topic := fmt.Sprintf("fanout-topic-%d", id) ctx := context.Background() - pub, err := broker.NewPublisher(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), NewTestMetrics(t), configMap) + pub, err := broker.NewPublisher( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := pub.Close(); err != nil { - t.Logf("failed to close publisher: %v", err) + if closeErr := pub.Close(); closeErr != nil { + t.Logf("failed to close publisher: %v", closeErr) } }() // Create two subscribers with different subscriptionIDs // Each should receive all messages (fanout behavior) - sub1, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), fmt.Sprintf("fanout-sub-1-%d", id), NewTestMetrics(t), configMap) + sub1, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + fmt.Sprintf("fanout-sub-1-%d", id), + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := sub1.Close(); err != nil { - t.Logf("failed to close subscriber 1: %v", err) + if closeErr := sub1.Close(); closeErr != nil { + t.Logf("failed to close subscriber 1: %v", closeErr) } }() - sub2, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), fmt.Sprintf("fanout-sub-2-%d", id), NewTestMetrics(t), configMap) + sub2, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + fmt.Sprintf("fanout-sub-2-%d", id), + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := sub2.Close(); err != nil { - t.Logf("failed to close subscriber 2: %v", err) + if closeErr := sub2.Close(); closeErr != nil { + t.Logf("failed to close subscriber 2: %v", closeErr) } }() // Set up handlers to collect events @@ -354,8 +402,8 @@ func RunFanoutSubscription(t *testing.T, configMap map[string]string, cfg Broker evt.SetSource("test-source") evt.SetID(fmt.Sprintf("fanout-id-%d", i)) messageIDs[i] = evt.ID() - if err := evt.SetData(event.ApplicationJSON, map[string]int{"index": i}); err != nil { - require.NoError(t, err, "failed to set event data") + if setErr := evt.SetData(event.ApplicationJSON, map[string]int{"index": i}); setErr != nil { + require.NoError(t, setErr, "failed to set event data") } err = pub.Publish(ctx, topic, &evt) require.NoError(t, err) @@ -396,21 +444,25 @@ func RunFanoutSubscription(t *testing.T, configMap map[string]string, cfg Broker func RunSlowSubscriber(t *testing.T, configMap map[string]string, cfg BrokerTestConfig, sub1, sub2 broker.Subscriber) { topic := uniqueTopic("slow-topic") ctx := context.Background() - pub, err := broker.NewPublisher(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), NewTestMetrics(t), configMap) + pub, err := broker.NewPublisher( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := pub.Close(); err != nil { - t.Logf("failed to close publisher: %v", err) + if closeErr := pub.Close(); closeErr != nil { + t.Logf("failed to close publisher: %v", closeErr) } }() defer func() { - if err := sub1.Close(); err != nil { - t.Logf("failed to close subscriber 1: %v", err) + if closeErr := sub1.Close(); closeErr != nil { + t.Logf("failed to close subscriber 1: %v", closeErr) } }() defer func() { - if err := sub2.Close(); err != nil { - t.Logf("failed to close subscriber 2: %v", err) + if closeErr := sub2.Close(); closeErr != nil { + t.Logf("failed to close subscriber 2: %v", closeErr) } }() // Metrics @@ -442,8 +494,8 @@ func RunSlowSubscriber(t *testing.T, configMap map[string]string, cfg BrokerTest evt.SetType("com.example.test.event") evt.SetSource("test-source") evt.SetID(fmt.Sprintf("slow-id-%d", i)) - if err := evt.SetData(event.ApplicationJSON, map[string]int{"index": i}); err != nil { - require.NoError(t, err, "failed to set event data") + if setErr := evt.SetData(event.ApplicationJSON, map[string]int{"index": i}); setErr != nil { + require.NoError(t, setErr, "failed to set event data") } err = pub.Publish(ctx, topic, &evt) require.NoError(t, err) @@ -484,27 +536,41 @@ func RunSlowSubscriber(t *testing.T, configMap map[string]string, cfg BrokerTest func RunErrorSubscriber(t *testing.T, configMap map[string]string, cfg BrokerTestConfig) { topic := uniqueTopic("error-topic") ctx := context.Background() - pub, err := broker.NewPublisher(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), NewTestMetrics(t), configMap) + pub, err := broker.NewPublisher( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := pub.Close(); err != nil { - t.Logf("failed to close publisher: %v", err) + if closeErr := pub.Close(); closeErr != nil { + t.Logf("failed to close publisher: %v", closeErr) } }() // Create two subscribers with the same subscriptionID (shared subscription) subscriptionID := fmt.Sprintf("error-subscription-%d", uniqueSuffix()) - sub1, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), subscriptionID, NewTestMetrics(t), configMap) + sub1, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + subscriptionID, + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := sub1.Close(); err != nil { - t.Logf("failed to close subscriber 1: %v", err) + if closeErr := sub1.Close(); closeErr != nil { + t.Logf("failed to close subscriber 1: %v", closeErr) } }() - sub2, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), subscriptionID, NewTestMetrics(t), configMap) + sub2, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + subscriptionID, + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := sub2.Close(); err != nil { - t.Logf("failed to close subscriber 2: %v", err) + if closeErr := sub2.Close(); closeErr != nil { + t.Logf("failed to close subscriber 2: %v", closeErr) } }() // Metrics @@ -533,8 +599,8 @@ func RunErrorSubscriber(t *testing.T, configMap map[string]string, cfg BrokerTes evt.SetType("com.example.test.event") evt.SetSource("test-source") evt.SetID(fmt.Sprintf("error-id-%d", i)) - if err := evt.SetData(event.ApplicationJSON, map[string]int{"index": i}); err != nil { - require.NoError(t, err, "failed to set event data") + if setErr := evt.SetData(event.ApplicationJSON, map[string]int{"index": i}); setErr != nil { + require.NoError(t, setErr, "failed to set event data") } err = pub.Publish(ctx, topic, &evt) require.NoError(t, err) @@ -561,15 +627,24 @@ func RunCloseWaitsForInFlightMessages(t *testing.T, configMap map[string]string, topic := uniqueTopic("close-test-topic") ctx := context.Background() configMap["subscriber.parallelism"] = "6" - pub, err := broker.NewPublisher(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), NewTestMetrics(t), configMap) + pub, err := broker.NewPublisher( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := pub.Close(); err != nil { - t.Logf("failed to close publisher: %v", err) + if closeErr := pub.Close(); closeErr != nil { + t.Logf("failed to close publisher: %v", closeErr) } }() subscriptionID := fmt.Sprintf("close-test-sub-%d", uniqueSuffix()) - sub, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), subscriptionID, NewTestMetrics(t), configMap) + sub, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + subscriptionID, + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) // Publish 5 messages numMessages := 5 @@ -595,8 +670,8 @@ func RunCloseWaitsForInFlightMessages(t *testing.T, configMap map[string]string, evt.SetType("com.example.test.event") evt.SetSource("test-source") evt.SetID(fmt.Sprintf("close-test-id-%d", i)) - if err := evt.SetData(event.ApplicationJSON, map[string]int{"index": i}); err != nil { - require.NoError(t, err, "failed to set event data") + if setErr := evt.SetData(event.ApplicationJSON, map[string]int{"index": i}); setErr != nil { + require.NoError(t, setErr, "failed to set event data") } err = pub.Publish(ctx, topic, &evt) require.NoError(t, err) @@ -644,15 +719,24 @@ func RunPanicHandler(t *testing.T, configMap map[string]string, cfg BrokerTestCo topic := uniqueTopic("panic-test-topic") ctx := context.Background() configMap["subscriber.parallelism"] = "3" - pub, err := broker.NewPublisher(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), NewTestMetrics(t), configMap) + pub, err := broker.NewPublisher( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := pub.Close(); err != nil { - t.Logf("failed to close publisher: %v", err) + if closeErr := pub.Close(); closeErr != nil { + t.Logf("failed to close publisher: %v", closeErr) } }() subscriptionID := fmt.Sprintf("panic-test-sub-%d", uniqueSuffix()) - sub, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), subscriptionID, NewTestMetrics(t), configMap) + sub, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + subscriptionID, + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) // Track how many times the handler was called (before panic) var handlerCallCount int64 @@ -676,8 +760,8 @@ func RunPanicHandler(t *testing.T, configMap map[string]string, cfg BrokerTestCo evt.SetType("com.example.test.event") evt.SetSource("test-source") evt.SetID(fmt.Sprintf("panic-test-id-%d", i)) - if err := evt.SetData(event.ApplicationJSON, map[string]int{"index": i}); err != nil { - require.NoError(t, err, "failed to set event data") + if setErr := evt.SetData(event.ApplicationJSON, map[string]int{"index": i}); setErr != nil { + require.NoError(t, setErr, "failed to set event data") } err = pub.Publish(ctx, topic, &evt) require.NoError(t, err) @@ -722,11 +806,16 @@ func RunErrorChannelNotification(t *testing.T, configMap map[string]string, cfg ctx := context.Background() // Create subscriber subscriptionID := fmt.Sprintf("error-channel-test-%d", uniqueSuffix()) - sub, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), subscriptionID, NewTestMetrics(t), configMap) + sub, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + subscriptionID, + NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := sub.Close(); err != nil { - t.Logf("failed to close subscriber: %v", err) + if closeErr := sub.Close(); closeErr != nil { + t.Logf("failed to close subscriber: %v", closeErr) } }() // Start goroutine to collect errors diff --git a/test/integration/googlepubsub/googlepubsub_test.go b/test/integration/googlepubsub/googlepubsub_test.go index e846829..0ce5b73 100644 --- a/test/integration/googlepubsub/googlepubsub_test.go +++ b/test/integration/googlepubsub/googlepubsub_test.go @@ -8,13 +8,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" "github.com/openshift-hyperfleet/hyperfleet-broker/broker" "github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger" "github.com/openshift-hyperfleet/hyperfleet-broker/test/integration/common" - "github.com/stretchr/testify/require" ) // sharedProjectID holds the project ID for the shared PubSub emulator container @@ -137,11 +137,21 @@ func TestSlowSubscriber(t *testing.T) { // but with different num_goroutines to simulate fast vs slow subscriptionID := fmt.Sprintf("slow-subscription-%d", time.Now().UnixNano()) configMap["broker.googlepubsub.num_goroutines"] = "5" - sub1, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), subscriptionID, common.NewTestMetrics(t), configMap) + sub1, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + subscriptionID, + common.NewTestMetrics(t), + configMap, + ) require.NoError(t, err) configMap["broker.googlepubsub.num_goroutines"] = "1" - sub2, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), subscriptionID, common.NewTestMetrics(t), configMap) + sub2, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + subscriptionID, + common.NewTestMetrics(t), + configMap, + ) require.NoError(t, err) common.RunSlowSubscriber(t, configMap, common.BrokerTestConfig{ diff --git a/test/integration/googlepubsub/setup.go b/test/integration/googlepubsub/setup.go index f4d6d72..a265321 100644 --- a/test/integration/googlepubsub/setup.go +++ b/test/integration/googlepubsub/setup.go @@ -33,8 +33,8 @@ func SetupPubSubEmulator(t *testing.T) (string, string) { }) require.NoError(t, err) t.Cleanup(func() { - if err := pubsubContainer.Terminate(ctx); err != nil { - t.Logf("failed to terminate pubsub container: %v", err) + if termErr := pubsubContainer.Terminate(ctx); termErr != nil { + t.Logf("failed to terminate pubsub container: %v", termErr) } }) diff --git a/test/integration/rabbitmq/rabbitmq_test.go b/test/integration/rabbitmq/rabbitmq_test.go index a74b216..ed07a35 100644 --- a/test/integration/rabbitmq/rabbitmq_test.go +++ b/test/integration/rabbitmq/rabbitmq_test.go @@ -113,28 +113,42 @@ func TestSubscriptionID(t *testing.T) { ctx := context.Background() configMap := common.BuildConfigMap("rabbitmq", sharedRabbitMQURL, "") - pub, err := broker.NewPublisher(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), common.NewTestMetrics(t), configMap) + pub, err := broker.NewPublisher( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + common.NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := pub.Close(); err != nil { - t.Logf("failed to close publisher: %v", err) + if closeErr := pub.Close(); closeErr != nil { + t.Logf("failed to close publisher: %v", closeErr) } }() // Create two subscribers with different subscription IDs - sub1, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), "subscription-1", common.NewTestMetrics(t), configMap) + sub1, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + "subscription-1", + common.NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := sub1.Close(); err != nil { - t.Logf("failed to close subscriber 1: %v", err) + if closeErr := sub1.Close(); closeErr != nil { + t.Logf("failed to close subscriber 1: %v", closeErr) } }() - sub2, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), "subscription-2", common.NewTestMetrics(t), configMap) + sub2, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + "subscription-2", + common.NewTestMetrics(t), + configMap, + ) require.NoError(t, err) defer func() { - if err := sub2.Close(); err != nil { - t.Logf("failed to close subscriber 2: %v", err) + if closeErr := sub2.Close(); closeErr != nil { + t.Logf("failed to close subscriber 2: %v", closeErr) } }() @@ -142,8 +156,8 @@ func TestSubscriptionID(t *testing.T) { evt.SetType("com.example.test.event") evt.SetSource("test-source") evt.SetID("test-id") - if err := evt.SetData(event.ApplicationJSON, map[string]string{"message": "test"}); err != nil { - require.NoError(t, err, "failed to set event data") + if setErr := evt.SetData(event.ApplicationJSON, map[string]string{"message": "test"}); setErr != nil { + require.NoError(t, setErr, "failed to set event data") } // Both subscribers should receive the event (fanout behavior) @@ -192,10 +206,20 @@ func TestSlowSubscriber(t *testing.T) { configMap := common.BuildConfigMap("rabbitmq", sharedRabbitMQURL, "") subscriptionID := fmt.Sprintf("slow-subscription-%d", time.Now().UnixNano()) - sub1, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), subscriptionID, common.NewTestMetrics(t), configMap) + sub1, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + subscriptionID, + common.NewTestMetrics(t), + configMap, + ) require.NoError(t, err) - sub2, err := broker.NewSubscriber(logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), subscriptionID, common.NewTestMetrics(t), configMap) + sub2, err := broker.NewSubscriber( + logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), + subscriptionID, + common.NewTestMetrics(t), + configMap, + ) require.NoError(t, err) common.RunSlowSubscriber(t, configMap, common.BrokerTestConfig{ diff --git a/test/integration/rabbitmq/setup.go b/test/integration/rabbitmq/setup.go index 3e29d35..12fb824 100644 --- a/test/integration/rabbitmq/setup.go +++ b/test/integration/rabbitmq/setup.go @@ -28,8 +28,8 @@ func SetupRabbitMQContainer(t *testing.T) string { ) require.NoError(t, err) t.Cleanup(func() { - if err := rabbitmqContainer.Terminate(ctx); err != nil { - t.Logf("failed to terminate rabbitmq container: %v", err) + if termErr := rabbitmqContainer.Terminate(ctx); termErr != nil { + t.Logf("failed to terminate rabbitmq container: %v", termErr) } }) From 3fe50fc41f19639a0c4b27955afdc9bdad4cd38e Mon Sep 17 00:00:00 2001 From: Dmitrii Andreev Date: Wed, 27 May 2026 11:34:14 -0500 Subject: [PATCH 4/4] HYPERFLEET-930 - fix: skip parallelism > 1 for Google Pub/Sub tests Google Pub/Sub's Receive() can only be called once per subscriber. Setting subscriber.parallelism > 1 creates multiple handlers that compete for the same Receive call, causing "Receive already in progress" errors and flaky package-level failures in CI. Only set parallelism > 1 for RabbitMQ in RunCloseWaitsForInFlightMessages and RunPanicHandler. Google Pub/Sub handles parallelism internally via num_goroutines and max_outstanding_messages. --- test/integration/common/common.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/test/integration/common/common.go b/test/integration/common/common.go index fd07a8d..1e47af4 100644 --- a/test/integration/common/common.go +++ b/test/integration/common/common.go @@ -626,7 +626,9 @@ func RunErrorSubscriber(t *testing.T, configMap map[string]string, cfg BrokerTes func RunCloseWaitsForInFlightMessages(t *testing.T, configMap map[string]string, cfg BrokerTestConfig) { topic := uniqueTopic("close-test-topic") ctx := context.Background() - configMap["subscriber.parallelism"] = "6" + if cfg.BrokerType == broker.BrokerTypeRabbitMQ { + configMap["subscriber.parallelism"] = "6" + } pub, err := broker.NewPublisher( logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), NewTestMetrics(t), @@ -718,7 +720,9 @@ func RunCloseWaitsForInFlightMessages(t *testing.T, configMap map[string]string, func RunPanicHandler(t *testing.T, configMap map[string]string, cfg BrokerTestConfig) { topic := uniqueTopic("panic-test-topic") ctx := context.Background() - configMap["subscriber.parallelism"] = "3" + if cfg.BrokerType == broker.BrokerTypeRabbitMQ { + configMap["subscriber.parallelism"] = "3" + } pub, err := broker.NewPublisher( logger.NewTestLogger(logger.WithLevel(slog.LevelWarn)), NewTestMetrics(t),