Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# HyperFleet Broker - golangci-lint configuration
# Aligned with: https://github.com/openshift-hyperfleet/architecture/blob/main/hyperfleet/standards/golangci.yml

version: "2"

run:
timeout: 5m
tests: true
modules-download-mode: readonly

linters:
enable:
# Code Quality
- errcheck
- govet
- staticcheck
- ineffassign
- unused
- unconvert
- unparam
- goconst
- exhaustive

# Code Style
- misspell
- lll
- revive
- gocritic

# Security
- gosec
settings:
errcheck:
check-type-assertions: true
check-blank: true
govet:
enable-all: true
goconst:
min-len: 3
min-occurrences: 3
misspell:
locale: US
lll:
line-length: 120
revive:
rules:
- name: exported
severity: warning
disabled: true
- name: unexported-return
severity: warning
disabled: false
- name: var-naming
severity: warning
disabled: false
unparam:
check-exported: false
exhaustive:
default-signifies-exhaustive: true
exclusions:
generated: lax
rules:
# Relaxed rules for test files
- linters:
- gosec
- errcheck
- unparam
path: _test\.go
paths:
- third_party(/|$)
- builtin(/|$)
- example(/|$)

Comment thread
coderabbitai[bot] marked this conversation as resolved.
issues:
max-issues-per-linter: 0
max-same-issues: 0
new: false

formatters:
enable:
- gofmt
- goimports
- gci
- golines
settings:
gofmt:
simplify: true
golines:
max-len: 120
# Local addition on top of org standard: repo-specific import grouping
gci:
custom-order: true
sections:
- standard
- default
- prefix(github.com/openshift-hyperfleet)
- prefix(github.com/openshift-hyperfleet/hyperfleet-broker)
exclusions:
generated: lax
paths:
- third_party(/|$)
- builtin(/|$)
- example(/|$)

output:
formats:
text:
path: stdout
11 changes: 11 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
default_install_hook_types: [pre-commit, commit-msg]

repos:
- repo: https://github.com/openshift-hyperfleet/hyperfleet-hooks
rev: v0.2.1
hooks:
- id: hyperfleet-commitlint
stages: [commit-msg]
- id: hyperfleet-gofmt
- id: hyperfleet-golangci-lint
- id: hyperfleet-go-vet
66 changes: 66 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# AGENTS.md — hyperfleet-broker

Go library: unified pub/sub API over RabbitMQ and Google Pub/Sub with CloudEvents. Not a binary — consumed as a dependency by other HyperFleet services. Wraps [Watermill](https://github.com/ThreeDotsLabs/watermill) internally; public API never exposes Watermill types.

## Verification

| Target | What it runs |
|--------|-------------|
| `make lint` | golangci-lint v2.7.0 (bingo-managed, config in `.golangci.yml`) |
| `make fmt` | `golangci-lint fmt` — gofmt + goimports + gci (4-group imports) + golines (120 char wrap) |
| `make test` | Unit tests: `./broker/... ./pkg/...` (timeout 10m) |
| `make test-integration` | Integration tests: `./test/integration/...` (sequential `-p 1`, timeout 10m) |
| `make test-all` | Both unit + integration |
| `make install-hooks` | Install pre-commit hooks (`pre-commit install --install-hooks`) |

### Pre-push order

1. `make fmt` → 2. `make lint` → 3. `make test` → 4. `make test-integration` (if touching broker/ or test/)

## Source of truth

| Topic | Location |
|-------|----------|
| Public API | `broker/broker.go`, `broker/publisher.go`, `broker/subscriber.go`, `broker/metrics.go`, `broker/errors.go` |
| Logger interface | `pkg/logger/logger.go` |
| Config structure + validation | `broker/config.go`, `broker/rabbitmq.go`, `broker/googlepubsub.go` |
| Config fields reference | `example/broker.example.yaml` |
| Linter config | `.golangci.yml` |
| Integration test helpers | `test/integration/common/common.go` |
| Mock logger (unit tests) | `pkg/logger/mock.go` — use `NewMockLogger()` |
| Container setup helpers | `test/integration/rabbitmq/setup.go`, `test/integration/googlepubsub/setup.go` |
| Leak & perf integration tests | `test/integration/broker_leak_test.go`, `test/integration/broker_perf_test.go` |
| Examples (separate go.mod) | `example/go.mod`, `example/cmd/publisher/main.go`, `example/cmd/subscriber/main.go` |
| Docker Compose examples | `example/rabbitmq/`, `example/googlepubsub/` |
| Comprehensive user docs | `README.md` |
Comment thread
coderabbitai[bot] marked this conversation as resolved.

## Architecture context

Only patterns an agent cannot infer from reading the code:

- **Subscription ID** controls messaging pattern: same ID = shared/load-balanced queue, different IDs = fanout. RabbitMQ queue name = `{topic}-{subscriptionID}` (default) or `{queue}-{subscriptionID}` when `broker.rabbitmq.queue` is set. Google Pub/Sub subscription name = `{subscriptionID}`.
- **Config precedence**: programmatic map > env vars > broker.yaml file > defaults. `BROKER_CONFIG_FILE` env var overrides file path. Env vars use underscore for dots (e.g., `BROKER_RABBITMQ_URL`).
- **Health check asymmetry**: RabbitMQ = in-memory connection state check (no network call). Google Pub/Sub = `GetTopic` API probe with 3s timeout on a non-existent topic; `NotFound` = healthy.
- **Config debugging**: `log_config: true` in broker.yaml (or `LOG_CONFIG=true`) logs full config as JSON at creation time. Passwords masked.
- **DLQ topic naming**: DLQ topic is always `{subscriptionID}-dlq` (hardcoded at `googlepubsub.go`). Not configurable.

## Boundaries

### DO

- Keep Watermill as internal implementation detail — never expose Watermill types in public API
- Write integration tests for new broker-level behavior using `test/integration/common/` helpers

### DON'T

- Don't add a third broker backend without updating `validateConfig`, both constructors, health checks, and integration test infrastructure

## Gotchas

- **Google Pub/Sub health check requires `pubsub.topics.get`** — NOT included in `roles/pubsub.publisher`. Grant `roles/pubsub.viewer` or custom role.
- **`subscriber.parallelism` > 1 only needed for RabbitMQ**. Google Pub/Sub handles parallelism internally via `num_goroutines` and `max_outstanding_messages`.
- **Integration tests run sequentially** (`-p 1`) because CI has 1 CPU. Parallel execution causes timeouts.
- **`example/` has its own `go.mod`** — `make test` from root does not test examples. Update `example/go.mod` when changing public API.
- **`error_test.go` "missing rabbitmq url" test is false-passing**: sets `expectError: false` but `validateRabbitMQConfig` rejects empty URLs. Test passes because else branch only asserts when `err == nil`. Do not rely on it as documenting intentional behavior.
- **Google Pub/Sub subscriber auto-creates DLQ topic** (`{subscriptionID}-dlq`) when `create_topic_if_missing` is true — see `googlepubsub.go:188`.
- **Integration tests**: call `common.SetupTestEnvironment()` first in `TestMain`, share one container per package. Topic/subscription name uniqueness is handled internally by `Run*` helper functions in `common/common.go`. Pattern at `test/integration/rabbitmq/rabbitmq_test.go:28`.
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
@AGENTS.md
22 changes: 18 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
include .bingo/Variables.mk

.PHONY: test test-integration test-all lint fmt
.PHONY: test test-integration test-all lint fmt gofmt go-vet vet install-hooks

# Install pre-commit hooks
install-hooks:
pre-commit install --install-hooks

# Run linter
lint: $(GOLANGCI_LINT)
$(GOLANGCI_LINT) run ./...

# Format code
fmt:
gofmt -s -w .
# Run go vet
vet:
go vet ./...

# Alias for vet (used by pre-commit hook)
go-vet: vet

# Alias for fmt (used by pre-commit hook)
gofmt: fmt

# Format code (gofmt + goimports + gci via golangci-lint)
fmt: $(GOLANGCI_LINT)
$(GOLANGCI_LINT) fmt ./...

# Run unit tests
test:
Expand Down
9 changes: 3 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ broker:
retry_min_backoff: "10s" # 0s to 600s
retry_max_backoff: "600s" # 0s to 600s

# Dead letter settings
dead_letter_topic: "my-dead-letter-topic" # Auto-created if create_topic_if_missing is true
# Dead letter settings (DLQ topic is auto-named {subscriptionID}-dlq)
dead_letter_max_attempts: 5 # 5-100 (default: 5)

# Receive settings (client-side flow control)
Expand Down Expand Up @@ -289,7 +288,7 @@ broker:

The `create_*_if_missing` settings only apply **if you have sufficient permissions** (e.g., `pubsub.topics.create` and/or `pubsub.subscriptions.create` on the GCP project).

If you use **dead letter topics** (`dead_letter_topic`), the library will also auto-create the dead letter topic—as long as `create_topic_if_missing` is enabled.
When `create_topic_if_missing` is enabled, the library also auto-creates the dead letter topic (`{subscriptionID}-dlq`).

**Best practice:**
- Use the auto-create flags in development environments.
Expand Down Expand Up @@ -383,9 +382,7 @@ After setting the maximun number of allowed "in flight" messages, further settin
- Minimum and maximum delay between delivery retries for failed messages.

**Dead Letter Settings:**
- **`broker.googlepubsub.dead_letter_topic`**:
- Topic name for messages that fail delivery after max attempts.
- The dead letter topic is automatically created if `create_topic_if_missing` is true.
- DLQ topic is always named `{subscriptionID}-dlq` (not configurable). Auto-created when `create_topic_if_missing` is true.
- **`broker.googlepubsub.dead_letter_max_attempts`** (5-100, default: 5):
- Maximum number of delivery attempts before sending to the dead letter topic.

Expand Down
41 changes: 32 additions & 9 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ import (
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/cloudevents/sdk-go/v2/event"

"github.com/openshift-hyperfleet/hyperfleet-broker/pkg/logger"
)

const (
DefaultSubscriberParallelism = 1
DefaultTestParallelism = 1
DefaultShutdownTimeout = 30 * time.Second

// BrokerTypeRabbitMQ is the broker type identifier for RabbitMQ.
BrokerTypeRabbitMQ = "rabbitmq"
// BrokerTypeGooglePubSub is the broker type identifier for Google Pub/Sub.
BrokerTypeGooglePubSub = "googlepubsub"
)

// NewPublisher creates a new publisher with the provided logger, metrics recorder, and optional configuration.
Expand Down Expand Up @@ -67,13 +73,13 @@ func NewPublisher(log logger.Logger, metrics *MetricsRecorder, configMap ...map[
var healthCloser io.Closer

switch cfg.Broker.Type {
case "rabbitmq":
case BrokerTypeRabbitMQ:
pub, err = newRabbitMQPublisher(cfg, watermillLogger)
if err != nil {
return nil, fmt.Errorf("failed to create RabbitMQ publisher: %w", err)
}
hc = newRabbitMQHealthCheck(pub)
case "googlepubsub":
case BrokerTypeGooglePubSub:
pub, err = newGooglePubSubPublisher(cfg, watermillLogger)
if err != nil {
return nil, fmt.Errorf("failed to create Google Pub/Sub publisher: %w", err)
Expand All @@ -84,7 +90,11 @@ func NewPublisher(log logger.Logger, metrics *MetricsRecorder, configMap ...map[
healthClient, err = pubsub.NewClient(context.Background(), cfg.Broker.GooglePubSub.ProjectID)
if err != nil {
if closeErr := pub.Close(); closeErr != nil {
return nil, fmt.Errorf("failed to create health check client: %w (also failed to close publisher: %v)", err, closeErr)
return nil, fmt.Errorf(
"failed to create health check client: %w (also failed to close publisher: %v)",
err,
closeErr,
)
}
return nil, fmt.Errorf("failed to create health check client: %w", err)
}
Expand All @@ -104,11 +114,17 @@ func NewPublisher(log logger.Logger, metrics *MetricsRecorder, configMap ...map[
}, nil
}

// NewSubscriber creates a new subscriber with the provided logger, subscription ID, metrics recorder, and optional configuration.
// NewSubscriber creates a new subscriber with the provided logger, subscription ID,
// metrics recorder, and optional configuration.
// Usage:
// - NewSubscriber(logger, "id", metrics) - uses provided logger and loads config from file
// - NewSubscriber(logger, "id", metrics, configMap) - uses provided logger with config map
func NewSubscriber(log logger.Logger, subscriptionID string, metrics *MetricsRecorder, configMap ...map[string]string) (Subscriber, error) {
func NewSubscriber(
log logger.Logger,
subscriptionID string,
metrics *MetricsRecorder,
configMap ...map[string]string,
) (Subscriber, error) {
if subscriptionID == "" {
return nil, fmt.Errorf("subscriptionID is required")
}
Expand Down Expand Up @@ -149,12 +165,12 @@ func NewSubscriber(log logger.Logger, subscriptionID string, metrics *MetricsRec
var sub message.Subscriber

switch cfg.Broker.Type {
case "rabbitmq":
case BrokerTypeRabbitMQ:
sub, err = newRabbitMQSubscriber(cfg, watermillLogger, subscriptionID)
if err != nil {
return nil, fmt.Errorf("failed to create RabbitMQ subscriber: %w", err)
}
case "googlepubsub":
case BrokerTypeGooglePubSub:
sub, err = newGooglePubSubSubscriber(cfg, watermillLogger, subscriptionID)
if err != nil {
return nil, fmt.Errorf("failed to create Google Pub/Sub subscriber: %w", err)
Expand Down Expand Up @@ -182,7 +198,7 @@ func NewSubscriber(log logger.Logger, subscriptionID string, metrics *MetricsRec
func logConfiguration(cfg *config, component string, logger watermill.LoggerAdapter) {
// Create a copy of config with masked password for logging
logCfg := *cfg
if cfg.Broker.Type == "rabbitmq" && cfg.Broker.RabbitMQ.URL != "" {
if cfg.Broker.Type == BrokerTypeRabbitMQ && cfg.Broker.RabbitMQ.URL != "" {
logCfg.Broker.RabbitMQ.URL = maskPassword(cfg.Broker.RabbitMQ.URL)
}

Expand All @@ -196,7 +212,14 @@ func logConfiguration(cfg *config, component string, logger watermill.LoggerAdap
return
}

logger.Info(fmt.Sprintf("=== %s Configuration (JSON) ===\n%s\n========================================", component, string(jsonBytes)), nil)
logger.Info(
fmt.Sprintf(
"=== %s Configuration (JSON) ===\n%s\n========================================",
component,
string(jsonBytes),
),
nil,
)
}

// maskPassword masks passwords in URLs for logging
Expand Down
12 changes: 7 additions & 5 deletions broker/cloudevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (

func TestMessageToEvent(t *testing.T) {
tests := []struct {
name string
setupMsg func() *message.Message
expectError bool
validate func(*testing.T, *event.Event)
name string
expectError bool
}{
{
name: "valid message with all attributes",
Expand Down Expand Up @@ -74,7 +74,9 @@ func TestMessageToEvent(t *testing.T) {
msg := message.NewMessage("test-uuid", []byte{})
return msg
},
expectError: true, // unmarshal empty bytes usually fails or returns empty struct which might be invalid CloudEvent if required fields missing
// unmarshal empty bytes usually fails or returns empty struct
// which might be invalid CloudEvent if required fields missing
expectError: true,
validate: nil,
},
}
Expand All @@ -99,10 +101,10 @@ func TestMessageToEvent(t *testing.T) {

func TestEventToMessage(t *testing.T) {
tests := []struct {
name string
setupEvent func() *event.Event
expectError bool
validate func(*testing.T, *message.Message)
name string
expectError bool
}{
{
name: "valid event with all attributes",
Expand Down
Loading