diff --git a/CLAUDE.md b/CLAUDE.md index f2fc62a..01e5e94 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -10,25 +10,30 @@ project: the point is to *prove understanding of queue internals*, not to wrap a library. Do not introduce a queue dependency (BullMQ, asynq, Machinery, Celery, etc.) — the mechanics are the deliverable. -**Status: Phase 1 complete; Phase 2 nearly complete.** The core engine plus delayed jobs, the -promoter, retry backoff, priority, idempotency enforcement, and per-queue rate limiting are built, -tested against a real Redis under `-race`, and CI is green (only Prometheus metrics remain in -Phase 2). Repo: . What exists today: +**Status: Phase 1 complete; Phase 2 complete.** The core engine plus delayed jobs, the promoter, +retry backoff, priority, idempotency enforcement, per-queue rate limiting, and Prometheus metrics +are built, tested against a real Redis under `-race`, and CI is green. Phase 3 (API/dashboard) is +next. Repo: . What exists today: - `internal/job` — the `Job` model + Redis-hash encoding (`ToHash`/`FromHash`). - `internal/broker` — `Enqueue` (with `WithDelay`/`WithReadyAt`/`WithPriority`/`WithIdempotencyKey` options), atomic `Claim`, `Ack`, `Nack` (full-jitter backoff via the delayed set), `Reap`, `Promote`, `Extend` (heartbeat), with Lua under `internal/broker/scripts/`: `enqueue.lua`, `claim.lua`, `ack.lua`, `nack.lua`, `reaper.lua`, `promote.lua`, `heartbeat.lua`. Broker options: `WithBackoff`, `WithDedupTTL`, - `WithRateLimit(queue, rate, burst)` (token-bucket per-queue rate limiting via Redis hash). + `WithRateLimit(queue, rate, burst)` (token-bucket per-queue rate limiting via Redis hash), + `WithMetrics(m)` (installs a `broker.Metrics` implementation; default is a no-op). +- `internal/metrics` — Prometheus `Recorder` (implements `broker.Metrics`; counters + `relay_jobs_*_total`, histogram `relay_job_latency_seconds`, all labelled by queue) and + `DepthCollector` (a `prometheus.Collector` reporting `relay_queue_depth{queue,state}` gauges + by reading ZCARD/LLEN at scrape time). - `internal/worker` — `Worker` (claim loop, dispatch, heartbeat, graceful shutdown), plus `Reaper` and `Promoter` background loops sharing one `runDrainLoop` helper. - `cmd/worker`, `cmd/demo` — thin runnable entrypoints (worker pool + reaper + promoter; load - generator with `--delay`). + generator with `--delay`). `cmd/worker` accepts `--metrics-addr` (default "" = off); when set, + serves `/metrics` and registers the depth collector with graceful shutdown. - `.github/workflows/ci.yml` — Redis service + `go test -race` + `golangci-lint`. -Remaining Phase 2 (Prometheus metrics) and Phase 3 (API/dashboard/`cmd/server`, docker-compose, -deploy) are **not** built yet. +Phase 3 (API/dashboard/`cmd/server`, docker-compose, deploy) is **not** built yet. ## Source of truth @@ -65,6 +70,7 @@ spec disagree, the spec wins until the spec is deliberately updated. (`broker.WithBackoff`). - **Idempotency is enqueue-only, TTL-window.** A keyed duplicate is dropped within the dedup TTL (default 24h, `WithDedupTTL`); the key is not released on completion. Delivery remains at-least-once — consumers needing exactly-once effects still dedup on the key. - **Rate-limit config is per-worker, not stored in Redis.** All workers on a queue must register the same `WithRateLimit` (they share one Redis bucket and pass rate/burst on every claim); mismatched configs refill inconsistently. A rate-limited claim is indistinguishable from an empty queue to the worker (it polls again). +- **Metrics are per-process and opt-in.** `broker.WithMetrics` installs a Prometheus recorder (default is a no-op); `cmd/worker --metrics-addr` serves `/metrics`. Counters/latency are per worker process — aggregate across workers in Prometheus. Queue-depth gauges read shared Redis at scrape time (one round-trip per queue/state), so every worker reports the same depths (aggregate with max/avg, not sum). Label cardinality is per queue. The endpoint lives on `cmd/worker` until the Phase 3 server exists. ## Redis data model & job lifecycle (the architecture in brief) @@ -111,7 +117,8 @@ internal/job/ # ✅ job model + hash encoding internal/broker/ # ✅ enqueue/claim/ack/nack/reap/promote/extend internal/broker/scripts/*.lua # ✅ enqueue, claim, ack, nack, reaper, promote, heartbeat (go:embed) internal/worker/ # ✅ Worker + Reaper + Promoter runtime -internal/{client,api,metrics}/ # ◻ producer SDK / HTTP API / Prometheus (Phase 2–3) +internal/metrics/ # ✅ Prometheus Recorder + DepthCollector +internal/{client,api}/ # ◻ producer SDK / HTTP API (Phase 3) web/ # ◻ embedded dashboard assets (Phase 3) deployments/docker-compose.yml # ◻ redis + server + N workers + demo (Phase 3) .github/workflows/ci.yml # ✅ Redis service + go test -race + golangci-lint @@ -123,7 +130,7 @@ Use `internal/` for everything not meant as a public import surface. `cmd/` hold ## Build order (do not jump ahead) 1. **Phase 1 — core: ✅ done.** job model; enqueue/claim/ack/nack Lua; reaper; worker runtime; basic DLQ; integration tests; CI. A working, testable queue ships first. -2. **Phase 2 — depth (in progress):** delayed jobs + promoter ✅; backoff + jitter ✅; priority ✅; idempotency ✅; rate limiting ✅; Prometheus metrics still to do. +2. **Phase 2 — depth: ✅ done.** delayed jobs + promoter ✅; backoff + jitter ✅; priority ✅; idempotency ✅; rate limiting ✅; Prometheus metrics ✅. 3. **Phase 3 — polish:** dashboard; docker-compose demo; deployed demo; README + diagram. 4. **Future work (NOT now):** Postgres-backed (`SKIP LOCKED`) mode; exactly-once via consumer outbox. @@ -144,11 +151,15 @@ Use `internal/` for everything not meant as a public import surface. `cmd/` hold - **Module:** `github.com/StrangeNoob/relay`. - **Toolchain:** `go 1.24` with `toolchain go1.25.11` pinned in `go.mod` (go-redis v9 needs ≥1.24). If a `go1.24` toolchain download fails, the pin makes the build use the already-cached 1.25.x. -- **Only dependency:** `github.com/redis/go-redis/v9` — a Redis *driver*, not a queue library; it - does not violate the "no queue dependency" rule. The queue logic is ours. -- **Tests need a real Redis** at `localhost:6379` (override with `REDIS_ADDR`). They use **DB 15** - and `FlushDB` per test, and **skip** (not fail) when Redis is unreachable — so a green local run - with no Redis means the broker/worker suites were skipped. CI provides a Redis service. +- **Direct dependencies (two):** + - `github.com/redis/go-redis/v9` — a Redis *driver*, not a queue library. + - `github.com/prometheus/client_golang` — a metrics instrumentation library, not a queue library. + Neither violates the "build the queue from scratch on Redis" rule. The queue logic is ours. +- **Tests need a real Redis** at `localhost:6379` (override with `REDIS_ADDR`). Each Redis-using + package claims its own logical DB so `go test ./...` runs them in parallel without flushing each + other (broker → **DB 15**, worker → **DB 14**, metrics → **DB 13**; a new one picks another), with + `FlushDB` per test, and they **skip** (not fail) when Redis is unreachable — so a green local run + with no Redis means those suites were skipped. CI provides a Redis service. ```sh go build ./... diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 6da2bc6..c2a8ee6 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -12,16 +12,19 @@ import ( "fmt" "log/slog" "math/rand" + "net/http" "os" "os/signal" "sync" "syscall" "time" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/redis/go-redis/v9" "github.com/StrangeNoob/relay/internal/broker" "github.com/StrangeNoob/relay/internal/job" + "github.com/StrangeNoob/relay/internal/metrics" "github.com/StrangeNoob/relay/internal/worker" ) @@ -36,6 +39,7 @@ func main() { backoffMax := flag.Duration("backoff-max", 10*time.Minute, "retry backoff ceiling") rate := flag.Float64("rate", 0, "max claims/second for this queue (0 = unlimited)") burst := flag.Int("burst", 0, "rate-limit burst capacity (defaults to 1 when --rate is set)") + metricsAddr := flag.String("metrics-addr", "", "address to serve Prometheus /metrics on (e.g. :9090); empty = disabled") flag.Parse() logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) @@ -61,7 +65,32 @@ func main() { } brokerOpts = append(brokerOpts, broker.WithRateLimit(*queue, *rate, *burst)) } + + // Metrics: build recorder and register a depth collector only when + // --metrics-addr is set; otherwise rec stays nil and all metric paths + // are skipped, preserving byte-identical behaviour to before. + var rec *metrics.Recorder + if *metricsAddr != "" { + rec = metrics.NewRecorder() + rec.Registry().MustRegister(metrics.NewDepthCollector(rdb, *queue)) + brokerOpts = append(brokerOpts, broker.WithMetrics(rec)) + } + b := broker.New(rdb, brokerOpts...) + // Start the Prometheus metrics HTTP server when --metrics-addr is set. + var metricsSrv *http.Server + if rec != nil { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor(rec.Registry(), promhttp.HandlerOpts{})) + metricsSrv = &http.Server{Addr: *metricsAddr, Handler: mux} + go func() { + logger.Info("metrics server listening", "addr", *metricsAddr) + if err := metricsSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Error("metrics server error", "err", err) + } + }() + } + handler := demoHandler(*failRate, logger) var wg sync.WaitGroup @@ -98,6 +127,17 @@ func main() { <-ctx.Done() logger.Info("shutdown signal received, draining in-flight jobs") wg.Wait() + + // Shut down the metrics server after all workers have drained, so the + // final scrape can still observe counters from the last batch of jobs. + if metricsSrv != nil { + shutCtx, shutCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutCancel() + if err := metricsSrv.Shutdown(shutCtx); err != nil { + logger.Error("metrics server shutdown error", "err", err) + } + } + logger.Info("relay worker stopped cleanly") } diff --git a/docs/superpowers/plans/2026-06-08-relay-phase2-metrics.md b/docs/superpowers/plans/2026-06-08-relay-phase2-metrics.md new file mode 100644 index 0000000..3bd458e --- /dev/null +++ b/docs/superpowers/plans/2026-06-08-relay-phase2-metrics.md @@ -0,0 +1,1269 @@ +# Phase 2 Metrics Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Make Relay observable by instrumenting every broker job-state transition with Prometheus counters/histogram and exposing live per-queue depth gauges at a `/metrics` endpoint on `cmd/worker`. + +**Architecture:** A `Metrics` interface in `broker` with a no-op default (opt-in via `WithMetrics`) keeps the broker free of a hard prometheus dependency and makes instrumentation purely additive. Counters/histogram are pushed inline after each Redis op succeeds; per-queue depth gauges are pulled at scrape time by a `prometheus.Collector` running `ZCARD`/`LLEN`. The concrete recorder + collector live in a new `internal/metrics` package that satisfies `broker.Metrics` structurally (no import of `broker`). + +**Tech Stack:** Go, `github.com/redis/go-redis/v9`, `github.com/prometheus/client_golang` (new dependency), real-Redis integration tests. + +**Spec:** [`docs/superpowers/specs/2026-06-08-relay-phase2-metrics-design.md`](../specs/2026-06-08-relay-phase2-metrics-design.md) + +--- + +## File Structure + +- **Create `internal/broker/metrics.go`** — `Metrics` interface, `noopMetrics` default, `WithMetrics` option. Responsibility: the consumer-side instrumentation contract and its no-op. +- **Modify `internal/broker/broker.go`** — add `metrics Metrics` field to `Broker`, initialise to `noopMetrics{}` in `New`, and record at each transition (`Enqueue`, `Claim`, `Ack`, `Nack`, `Reap`, `Promote`). +- **Create `internal/broker/metrics_test.go`** (`package broker`) — internal unit tests for option wiring / noop default. +- **Create `internal/broker/instrumentation_test.go`** (`package broker_test`) — a fake recorder + behavior tests that each transition records the right call against real Redis. +- **Modify `internal/broker/broker_test.go`** — refactor `newTestBroker` to delegate to a new `newTestBrokerWith(t, opts...)` so instrumentation tests can inject `WithMetrics`. +- **Create `internal/metrics/recorder.go`** — `Recorder` (prometheus counters + histogram over a private registry) implementing `broker.Metrics`. +- **Create `internal/metrics/depth.go`** — `DepthCollector` (`prometheus.Collector`) reading `ZCARD`/`LLEN` at scrape time. +- **Create `internal/metrics/recorder_test.go`** — compile assertion + `testutil` value assertions. +- **Create `internal/metrics/depth_test.go`** — `DepthCollector` against real Redis (DB 13). +- **Modify `cmd/worker/main.go`** — `--metrics-addr` flag; when set, build recorder, wire `WithMetrics`, register depth collector, serve `/metrics`, shut down gracefully. +- **Modify `CLAUDE.md`** — flip Phase 2 to complete, mark `internal/metrics` ✅, document `WithMetrics` + the new dependency + known limitations. +- **Modify `go.mod` / `go.sum`** — add `prometheus/client_golang` (happens automatically on first import + `go mod tidy`). + +--- + +## Task 1: `Metrics` interface, noop default, and `WithMetrics` option + +**Files:** +- Create: `internal/broker/metrics.go` +- Create: `internal/broker/metrics_test.go` (`package broker`) +- Modify: `internal/broker/broker.go` (the `Broker` struct + `New`) + +- [ ] **Step 1: Write the failing test** + +Create `internal/broker/metrics_test.go`: + +```go +package broker + +import "testing" + +func TestNewDefaultsToNoopMetrics(t *testing.T) { + b := New(nil) + if b.metrics == nil { + t.Fatal("New: metrics field is nil, want noopMetrics default") + } + if _, ok := b.metrics.(noopMetrics); !ok { + t.Fatalf("New: metrics = %T, want noopMetrics", b.metrics) + } +} + +func TestWithMetricsInstallsRecorder(t *testing.T) { + rec := noopMetrics{} // any Metrics value; identity is what we check + var custom Metrics = rec + b := New(nil, WithMetrics(custom)) + if b.metrics == nil { + t.Fatal("WithMetrics: metrics is nil") + } +} + +func TestWithMetricsNilIsIgnored(t *testing.T) { + b := New(nil, WithMetrics(nil)) + if _, ok := b.metrics.(noopMetrics); !ok { + t.Fatalf("WithMetrics(nil): metrics = %T, want noopMetrics retained", b.metrics) + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/broker/ -run 'TestNewDefaultsToNoopMetrics|TestWithMetrics' -v` +Expected: FAIL — compile error, `b.metrics` undefined / `noopMetrics`/`WithMetrics`/`Metrics` undefined. + +- [ ] **Step 3: Write minimal implementation** + +Create `internal/broker/metrics.go`: + +```go +package broker + +import "time" + +// Metrics receives a callback for every job-state transition the broker makes. +// It is the broker's consumer-side instrumentation contract: the broker depends +// on this small interface, not on any metrics library, so a Prometheus recorder +// (internal/metrics) — or a test fake — can be plugged in via WithMetrics. The +// default is noopMetrics, so a broker built without WithMetrics records nothing +// and behaves exactly as before. +// +// Every method takes the queue name so the implementation can label its series +// per queue. Reap/Promote add a batch count because one call moves many jobs; +// the rest are single events. ObserveLatency reports a job's end-to-end time in +// the system (enqueue -> ack). +type Metrics interface { + IncEnqueued(queue string) + IncDeduplicated(queue string) + IncClaimed(queue string) + IncProcessed(queue string) + IncRetried(queue string) + IncDead(queue string) + AddReaped(queue string, n int) + AddPromoted(queue string, n int) + ObserveLatency(queue string, d time.Duration) +} + +// noopMetrics is the default recorder: every method does nothing. It lets the +// broker call b.metrics unconditionally without nil checks, and keeps metrics +// entirely opt-in. +type noopMetrics struct{} + +func (noopMetrics) IncEnqueued(string) {} +func (noopMetrics) IncDeduplicated(string) {} +func (noopMetrics) IncClaimed(string) {} +func (noopMetrics) IncProcessed(string) {} +func (noopMetrics) IncRetried(string) {} +func (noopMetrics) IncDead(string) {} +func (noopMetrics) AddReaped(string, int) {} +func (noopMetrics) AddPromoted(string, int) {} +func (noopMetrics) ObserveLatency(string, time.Duration) {} + +// WithMetrics installs a Metrics recorder. A nil recorder is ignored so callers +// cannot accidentally replace the safe no-op with something that panics. +func WithMetrics(m Metrics) Option { + return func(b *Broker) { + if m != nil { + b.metrics = m + } + } +} +``` + +In `internal/broker/broker.go`, add the field to the `Broker` struct (alongside the existing fields): + +```go + metrics Metrics +``` + +And in `New`, set the default BEFORE applying options (so `WithMetrics` can override it). The current `New` builds the struct then ranges over opts; ensure the literal includes `metrics: noopMetrics{}`: + +```go + b := &Broker{ + rdb: rdb, + // ... existing field initialisers unchanged ... + metrics: noopMetrics{}, + } + for _, opt := range opts { + opt(b) + } +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `go test ./internal/broker/ -run 'TestNewDefaultsToNoopMetrics|TestWithMetrics' -v` +Expected: PASS (3 tests). + +- [ ] **Step 5: Commit** + +```bash +git add internal/broker/metrics.go internal/broker/metrics_test.go internal/broker/broker.go +git commit -m "Add broker Metrics interface, noop default, and WithMetrics option" +``` + +--- + +## Task 2: Fake recorder + Enqueue instrumentation + +**Files:** +- Modify: `internal/broker/broker_test.go` (refactor `newTestBroker`) +- Create: `internal/broker/instrumentation_test.go` (`package broker_test`) +- Modify: `internal/broker/broker.go` (`Enqueue`) + +- [ ] **Step 1: Refactor the test harness to accept options** + +In `internal/broker/broker_test.go`, replace the `newTestBroker` body so it delegates to a new variadic helper (keep `newTestBroker` working for all existing callers): + +```go +func newTestBroker(t *testing.T) (*broker.Broker, *redis.Client) { + t.Helper() + return newTestBrokerWith(t) +} + +// newTestBrokerWith is newTestBroker with broker options, for tests that need to +// inject a recorder or other configuration. +func newTestBrokerWith(t *testing.T, opts ...broker.Option) (*broker.Broker, *redis.Client) { + t.Helper() + addr := os.Getenv("REDIS_ADDR") + if addr == "" { + addr = "localhost:6379" + } + rdb := redis.NewClient(&redis.Options{Addr: addr, DB: testRedisDB}) + ctx := context.Background() + if err := rdb.Ping(ctx).Err(); err != nil { + t.Skipf("redis not available at %s: %v", addr, err) + } + if err := rdb.FlushDB(ctx).Err(); err != nil { + t.Fatalf("flushdb: %v", err) + } + t.Cleanup(func() { _ = rdb.Close() }) + return broker.New(rdb, opts...), rdb +} +``` + +- [ ] **Step 2: Write the failing test (fake recorder + enqueue)** + +Create `internal/broker/instrumentation_test.go`: + +```go +package broker_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/StrangeNoob/relay/internal/broker" + "github.com/StrangeNoob/relay/internal/job" +) + +// fakeMetrics is a Metrics recorder that counts calls per queue in memory, so a +// test can assert exactly which transition the broker recorded. It is safe for +// concurrent use because the broker may record from multiple goroutines. +type fakeMetrics struct { + mu sync.Mutex + enqueued map[string]int + deduped map[string]int + claimed map[string]int + processed map[string]int + retried map[string]int + dead map[string]int + reaped map[string]int + promoted map[string]int + latencies []time.Duration +} + +func newFakeMetrics() *fakeMetrics { + return &fakeMetrics{ + enqueued: map[string]int{}, + deduped: map[string]int{}, + claimed: map[string]int{}, + processed: map[string]int{}, + retried: map[string]int{}, + dead: map[string]int{}, + reaped: map[string]int{}, + promoted: map[string]int{}, + } +} + +func (f *fakeMetrics) IncEnqueued(q string) { f.bump(f.enqueued, q) } +func (f *fakeMetrics) IncDeduplicated(q string) { f.bump(f.deduped, q) } +func (f *fakeMetrics) IncClaimed(q string) { f.bump(f.claimed, q) } +func (f *fakeMetrics) IncProcessed(q string) { f.bump(f.processed, q) } +func (f *fakeMetrics) IncRetried(q string) { f.bump(f.retried, q) } +func (f *fakeMetrics) IncDead(q string) { f.bump(f.dead, q) } +func (f *fakeMetrics) AddReaped(q string, n int) { f.add(f.reaped, q, n) } +func (f *fakeMetrics) AddPromoted(q string, n int) { f.add(f.promoted, q, n) } +func (f *fakeMetrics) ObserveLatency(q string, d time.Duration) { + f.mu.Lock() + defer f.mu.Unlock() + f.latencies = append(f.latencies, d) +} + +func (f *fakeMetrics) bump(m map[string]int, q string) { f.add(m, q, 1) } +func (f *fakeMetrics) add(m map[string]int, q string, n int) { + f.mu.Lock() + defer f.mu.Unlock() + m[q] += n +} + +func (f *fakeMetrics) get(m map[string]int, q string) int { + f.mu.Lock() + defer f.mu.Unlock() + return m[q] +} + +func TestEnqueueRecordsEnqueued(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + if err := b.Enqueue(ctx, job.New("emails", []byte("x"))); err != nil { + t.Fatalf("Enqueue: %v", err) + } + if got := fm.get(fm.enqueued, "emails"); got != 1 { + t.Errorf("enqueued[emails] = %d, want 1", got) + } + if got := fm.get(fm.deduped, "emails"); got != 0 { + t.Errorf("deduped[emails] = %d, want 0", got) + } +} + +func TestEnqueueDuplicateRecordsDeduplicated(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + j1 := job.New("emails", []byte("a")) + if err := b.Enqueue(ctx, j1, broker.WithIdempotencyKey("k1")); err != nil { + t.Fatalf("first Enqueue: %v", err) + } + j2 := job.New("emails", []byte("b")) + if err := b.Enqueue(ctx, j2, broker.WithIdempotencyKey("k1")); err != broker.ErrDuplicate { + t.Fatalf("second Enqueue err = %v, want ErrDuplicate", err) + } + if got := fm.get(fm.enqueued, "emails"); got != 1 { + t.Errorf("enqueued[emails] = %d, want 1", got) + } + if got := fm.get(fm.deduped, "emails"); got != 1 { + t.Errorf("deduped[emails] = %d, want 1", got) + } +} +``` + +- [ ] **Step 3: Run test to verify it fails** + +Run: `go test ./internal/broker/ -run 'TestEnqueueRecords|TestEnqueueDuplicateRecords' -v` +Expected: FAIL — `enqueued[emails] = 0, want 1` (instrumentation not added yet). Requires Redis; if skipped, note RED cannot be shown without Redis and proceed only once Redis is available. + +- [ ] **Step 4: Add Enqueue instrumentation** + +In `internal/broker/broker.go`, in `Enqueue`, record after the script succeeds (replace the tail `if res == "dup" { return ErrDuplicate }; return nil`): + +```go + if res == "dup" { + b.metrics.IncDeduplicated(j.Queue) + return ErrDuplicate + } + b.metrics.IncEnqueued(j.Queue) + return nil +``` + +- [ ] **Step 5: Run test to verify it passes** + +Run: `go test ./internal/broker/ -run 'TestEnqueueRecords|TestEnqueueDuplicateRecords' -v` +Expected: PASS (2 tests). + +- [ ] **Step 6: Commit** + +```bash +git add internal/broker/broker_test.go internal/broker/instrumentation_test.go internal/broker/broker.go +git commit -m "Instrument Enqueue with enqueued/deduplicated metrics" +``` + +--- + +## Task 3: Claim instrumentation + +**Files:** +- Modify: `internal/broker/broker.go` (`Claim`) +- Modify: `internal/broker/instrumentation_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `internal/broker/instrumentation_test.go`: + +```go +func TestClaimRecordsClaimed(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + if err := b.Enqueue(ctx, job.New("emails", []byte("x"))); err != nil { + t.Fatalf("Enqueue: %v", err) + } + if _, ok, err := b.Claim(ctx, "emails", time.Minute); err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v, want true/nil", ok, err) + } + if got := fm.get(fm.claimed, "emails"); got != 1 { + t.Errorf("claimed[emails] = %d, want 1", got) + } +} + +func TestClaimEmptyQueueRecordsNothing(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + if _, ok, err := b.Claim(ctx, "emails", time.Minute); err != nil || ok { + t.Fatalf("Claim on empty: ok=%v err=%v, want false/nil", ok, err) + } + if got := fm.get(fm.claimed, "emails"); got != 0 { + t.Errorf("claimed[emails] = %d, want 0", got) + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/broker/ -run 'TestClaimRecordsClaimed|TestClaimEmptyQueueRecordsNothing' -v` +Expected: FAIL — `claimed[emails] = 0, want 1`. + +- [ ] **Step 3: Add Claim instrumentation** + +In `Claim`, record only on a successful pop. Find the point where the script returned a job and the function is about to return `(j, true, nil)`; add `b.metrics.IncClaimed(queue)` immediately before that successful return. The empty-queue path (returns `ok == false`) must record nothing. (The exact return statement is the one returning the decoded job with `true`; do not touch the `redis.Nil` / empty branch.) + +- [ ] **Step 4: Run test to verify it passes** + +Run: `go test ./internal/broker/ -run 'TestClaimRecordsClaimed|TestClaimEmptyQueueRecordsNothing' -v` +Expected: PASS (2 tests). + +- [ ] **Step 5: Commit** + +```bash +git add internal/broker/broker.go internal/broker/instrumentation_test.go +git commit -m "Instrument Claim with claimed metric" +``` + +--- + +## Task 4: Ack instrumentation (processed + latency) + +**Files:** +- Modify: `internal/broker/broker.go` (`Ack`) +- Modify: `internal/broker/instrumentation_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `internal/broker/instrumentation_test.go`: + +```go +func TestAckRecordsProcessedAndLatency(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + if err := b.Enqueue(ctx, job.New("emails", []byte("x"))); err != nil { + t.Fatalf("Enqueue: %v", err) + } + j, ok, err := b.Claim(ctx, "emails", time.Minute) + if err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v", ok, err) + } + if err := b.Ack(ctx, j); err != nil { + t.Fatalf("Ack: %v", err) + } + if got := fm.get(fm.processed, "emails"); got != 1 { + t.Errorf("processed[emails] = %d, want 1", got) + } + fm.mu.Lock() + n := len(fm.latencies) + var d time.Duration + if n == 1 { + d = fm.latencies[0] + } + fm.mu.Unlock() + if n != 1 { + t.Fatalf("latencies len = %d, want 1", n) + } + if d < 0 { + t.Errorf("latency = %v, want non-negative", d) + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/broker/ -run TestAckRecordsProcessedAndLatency -v` +Expected: FAIL — `processed[emails] = 0, want 1`. + +- [ ] **Step 3: Add Ack instrumentation** + +In `Ack`, after the script `Run(...).Err()` check succeeds and before `return nil`: + +```go + b.metrics.IncProcessed(j.Queue) + b.metrics.ObserveLatency(j.Queue, time.Since(j.CreatedAt)) + return nil +``` + +(`job.Job` has `CreatedAt time.Time`; `time.Since` yields enqueue→ack elapsed. `time` is already imported in broker.go.) + +- [ ] **Step 4: Run test to verify it passes** + +Run: `go test ./internal/broker/ -run TestAckRecordsProcessedAndLatency -v` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add internal/broker/broker.go internal/broker/instrumentation_test.go +git commit -m "Instrument Ack with processed counter and end-to-end latency" +``` + +--- + +## Task 5: Nack instrumentation (retried vs dead) + +**Files:** +- Modify: `internal/broker/broker.go` (`Nack`) +- Modify: `internal/broker/instrumentation_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `internal/broker/instrumentation_test.go`: + +```go +// nackTestJob enqueues, claims, and returns a job set up so the next Nack takes +// the requested branch. maxRetries controls retry-vs-dead: with maxRetries=5 the +// first nack retries; with maxRetries=0 the first nack dead-letters. +func nackTestJob(t *testing.T, b *broker.Broker, ctx context.Context, maxRetries int) job.Job { + t.Helper() + j := job.New("emails", []byte("x")) + j.MaxRetries = maxRetries + if err := b.Enqueue(ctx, j); err != nil { + t.Fatalf("Enqueue: %v", err) + } + claimed, ok, err := b.Claim(ctx, "emails", time.Minute) + if err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v", ok, err) + } + return claimed +} + +func TestNackWithRetriesLeftRecordsRetried(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + j := nackTestJob(t, b, ctx, 5) // attempts now 1 < 5 -> retry + if err := b.Nack(ctx, j); err != nil { + t.Fatalf("Nack: %v", err) + } + if got := fm.get(fm.retried, "emails"); got != 1 { + t.Errorf("retried[emails] = %d, want 1", got) + } + if got := fm.get(fm.dead, "emails"); got != 0 { + t.Errorf("dead[emails] = %d, want 0", got) + } +} + +func TestNackWithBudgetSpentRecordsDead(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + j := nackTestJob(t, b, ctx, 0) // attempts now 1, max 0 -> dead + if err := b.Nack(ctx, j); err != nil { + t.Fatalf("Nack: %v", err) + } + if got := fm.get(fm.dead, "emails"); got != 1 { + t.Errorf("dead[emails] = %d, want 1", got) + } + if got := fm.get(fm.retried, "emails"); got != 0 { + t.Errorf("retried[emails] = %d, want 0", got) + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/broker/ -run 'TestNackWithRetriesLeftRecordsRetried|TestNackWithBudgetSpentRecordsDead' -v` +Expected: FAIL — `retried[emails] = 0, want 1` (and dead = 0). + +- [ ] **Step 3: Add Nack instrumentation** + +In `Nack`, the script already returns `"retry"`/`"dead"` but the code currently discards it with `.Err()`. Capture it with `.Text()` and branch. Replace the script-run block: + +```go + outcome, err := nackScript.Run(ctx, b.rdb, + []string{inflightKey(j.Queue), delayedKey(j.Queue), dlqKey(j.Queue)}, + j.ID, jobKeyPrefix, readyAt, + ).Text() + if err != nil { + return fmt.Errorf("broker: nacking job %s: %w", j.ID, err) + } + switch outcome { + case "retry": + b.metrics.IncRetried(j.Queue) + case "dead": + b.metrics.IncDead(j.Queue) + } + return nil +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `go test ./internal/broker/ -run 'TestNackWithRetriesLeftRecordsRetried|TestNackWithBudgetSpentRecordsDead' -v` +Expected: PASS (2 tests). + +- [ ] **Step 5: Commit** + +```bash +git add internal/broker/broker.go internal/broker/instrumentation_test.go +git commit -m "Instrument Nack with retried/dead metrics from script outcome" +``` + +--- + +## Task 6: Reap + Promote instrumentation + +**Files:** +- Modify: `internal/broker/broker.go` (`Reap`, `Promote`) +- Modify: `internal/broker/instrumentation_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `internal/broker/instrumentation_test.go`: + +```go +func TestReapRecordsReapedCount(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + // Enqueue + claim two jobs with a tiny visibility so they expire immediately. + for i := 0; i < 2; i++ { + if err := b.Enqueue(ctx, job.New("emails", []byte("x"))); err != nil { + t.Fatalf("Enqueue: %v", err) + } + } + for i := 0; i < 2; i++ { + if _, ok, err := b.Claim(ctx, "emails", time.Millisecond); err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v", ok, err) + } + } + time.Sleep(10 * time.Millisecond) // let the visibility deadline pass + + n, err := b.Reap(ctx, "emails") + if err != nil { + t.Fatalf("Reap: %v", err) + } + if n != 2 { + t.Fatalf("Reap returned %d, want 2", n) + } + if got := fm.get(fm.reaped, "emails"); got != 2 { + t.Errorf("reaped[emails] = %d, want 2", got) + } +} + +func TestPromoteRecordsPromotedCount(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + // Enqueue two delayed jobs whose ready-at is already in the past. + past := time.Now().Add(-time.Second) + for i := 0; i < 2; i++ { + if err := b.Enqueue(ctx, job.New("emails", []byte("x")), broker.WithReadyAt(past)); err != nil { + t.Fatalf("Enqueue delayed: %v", err) + } + } + + n, err := b.Promote(ctx, "emails") + if err != nil { + t.Fatalf("Promote: %v", err) + } + if n != 2 { + t.Fatalf("Promote returned %d, want 2", n) + } + if got := fm.get(fm.promoted, "emails"); got != 2 { + t.Errorf("promoted[emails] = %d, want 2", got) + } +} +``` + +Note on `WithReadyAt(past)`: `Enqueue` routes to the delayed set only when `cfg.readyAt.After(now)`. A past time would route straight to ready and Promote would find nothing. If `WithReadyAt` with a past time does NOT create a delayed job in this codebase, instead enqueue with a near-future ready-at and sleep past it: + +```go + soon := time.Now().Add(20 * time.Millisecond) + for i := 0; i < 2; i++ { + if err := b.Enqueue(ctx, job.New("emails", []byte("x")), broker.WithReadyAt(soon)); err != nil { + t.Fatalf("Enqueue delayed: %v", err) + } + } + time.Sleep(40 * time.Millisecond) +``` + +Use whichever form makes the job land in the delayed set; verify by asserting `n == 2` (the test fails loudly if the jobs were not delayed). + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/broker/ -run 'TestReapRecordsReapedCount|TestPromoteRecordsPromotedCount' -v` +Expected: FAIL — `reaped[emails] = 0, want 2` / `promoted[emails] = 0, want 2`. + +- [ ] **Step 3: Add Reap + Promote instrumentation** + +In `Reap`, after a successful run replace `return n, nil` with: + +```go + if n > 0 { + b.metrics.AddReaped(queue, n) + } + return n, nil +``` + +In `Promote`, after a successful run replace `return n, nil` with: + +```go + if n > 0 { + b.metrics.AddPromoted(queue, n) + } + return n, nil +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `go test ./internal/broker/ -run 'TestReapRecordsReapedCount|TestPromoteRecordsPromotedCount' -v` +Expected: PASS (2 tests). + +- [ ] **Step 5: Full broker suite under race** + +Run: `go test -race ./internal/broker/` +Expected: PASS (all existing + new tests; confirms instrumentation didn't disturb behavior). + +- [ ] **Step 6: Commit** + +```bash +git add internal/broker/broker.go internal/broker/instrumentation_test.go +git commit -m "Instrument Reap and Promote with batch count metrics" +``` + +--- + +## Task 7: `internal/metrics` Recorder (Prometheus) + +**Files:** +- Create: `internal/metrics/recorder.go` +- Create: `internal/metrics/recorder_test.go` +- Modify: `go.mod`, `go.sum` (via `go mod tidy`) + +- [ ] **Step 1: Write the failing test** + +Create `internal/metrics/recorder_test.go`: + +```go +package metrics_test + +import ( + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus/testutil" + + "github.com/StrangeNoob/relay/internal/broker" + "github.com/StrangeNoob/relay/internal/metrics" +) + +// Compile-time proof the Recorder satisfies the broker's instrumentation contract. +var _ broker.Metrics = (*metrics.Recorder)(nil) + +func TestRecorderCountersIncrement(t *testing.T) { + rec := metrics.NewRecorder() + + rec.IncEnqueued("emails") + rec.IncEnqueued("emails") + rec.IncDeduplicated("emails") + rec.IncClaimed("emails") + rec.IncProcessed("emails") + rec.IncRetried("emails") + rec.IncDead("emails") + rec.AddReaped("emails", 3) + rec.AddPromoted("emails", 4) + + checks := []struct { + name string + metric string + want float64 + }{ + {"enqueued", "relay_jobs_enqueued_total", 2}, + {"deduped", "relay_jobs_deduplicated_total", 1}, + {"claimed", "relay_jobs_claimed_total", 1}, + {"processed", "relay_jobs_processed_total", 1}, + {"retried", "relay_jobs_retried_total", 1}, + {"dead", "relay_jobs_dead_total", 1}, + {"reaped", "relay_jobs_reaped_total", 3}, + {"promoted", "relay_jobs_promoted_total", 4}, + } + for _, c := range checks { + if got := testutil.ToFloat64(rec.CounterForTest(c.name, "emails")); got != c.want { + t.Errorf("%s{queue=emails} = %v, want %v", c.metric, got, c.want) + } + } +} + +func TestRecorderObservesLatency(t *testing.T) { + rec := metrics.NewRecorder() + rec.ObserveLatency("emails", 250*time.Millisecond) + + // One observation must be recorded in the histogram for queue=emails. + got := testutil.CollectAndCount(rec.Registry(), "relay_job_latency_seconds") + if got == 0 { + t.Fatal("relay_job_latency_seconds: no series collected after one observation") + } +} +``` + +Helper `CounterForTest` is a small test-only accessor (defined in the impl so the test can fetch the right `prometheus.Counter` child). It returns the per-queue child counter for the named metric. + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/metrics/ -run 'TestRecorder' -v` +Expected: FAIL — package `internal/metrics` / `prometheus/client_golang` not found. + +- [ ] **Step 3: Implement the Recorder** + +Create `internal/metrics/recorder.go`: + +```go +// Package metrics provides the Prometheus implementation of the broker's +// instrumentation contract (broker.Metrics) plus a pull-based collector for +// per-queue depth gauges. It deliberately does not import internal/broker: it +// satisfies broker.Metrics structurally, keeping the dependency arrow one-way. +package metrics + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +const namespace = "relay" + +// Recorder implements broker.Metrics over a private Prometheus registry. Build +// it with NewRecorder, install it with broker.WithMetrics(rec), and serve +// rec.Registry() over HTTP. Counters and the latency histogram are labelled by +// queue. +type Recorder struct { + reg *prometheus.Registry + + enqueued *prometheus.CounterVec + deduped *prometheus.CounterVec + claimed *prometheus.CounterVec + processed *prometheus.CounterVec + retried *prometheus.CounterVec + dead *prometheus.CounterVec + reaped *prometheus.CounterVec + promoted *prometheus.CounterVec + latency *prometheus.HistogramVec +} + +// NewRecorder builds a Recorder with all metrics registered on a fresh registry. +func NewRecorder() *Recorder { + reg := prometheus.NewRegistry() + + counter := func(name, help string) *prometheus.CounterVec { + c := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, Name: name, Help: help, + }, []string{"queue"}) + reg.MustRegister(c) + return c + } + + r := &Recorder{ + reg: reg, + enqueued: counter("jobs_enqueued_total", "Jobs accepted into a queue."), + deduped: counter("jobs_deduplicated_total", "Enqueues dropped as idempotency-key duplicates."), + claimed: counter("jobs_claimed_total", "Jobs claimed by a worker."), + processed: counter("jobs_processed_total", "Jobs acked after successful processing."), + retried: counter("jobs_retried_total", "Failed jobs requeued for retry."), + dead: counter("jobs_dead_total", "Jobs moved to the dead-letter queue."), + reaped: counter("jobs_reaped_total", "Expired in-flight jobs requeued by the reaper."), + promoted: counter("jobs_promoted_total", "Delayed jobs promoted to ready."), + } + r.latency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Name: "job_latency_seconds", + Help: "End-to-end time from enqueue to ack.", + // End-to-end latency spans ms to minutes, wider than the default + // 5ms-10s buckets: ~1ms -> ~9min across 13 buckets. + Buckets: prometheus.ExponentialBuckets(0.001, 3, 13), + }, []string{"queue"}) + reg.MustRegister(r.latency) + return r +} + +// Registry exposes the underlying registry for an HTTP handler and for +// registering additional collectors (e.g. DepthCollector). +func (r *Recorder) Registry() *prometheus.Registry { return r.reg } + +func (r *Recorder) IncEnqueued(q string) { r.enqueued.WithLabelValues(q).Inc() } +func (r *Recorder) IncDeduplicated(q string) { r.deduped.WithLabelValues(q).Inc() } +func (r *Recorder) IncClaimed(q string) { r.claimed.WithLabelValues(q).Inc() } +func (r *Recorder) IncProcessed(q string) { r.processed.WithLabelValues(q).Inc() } +func (r *Recorder) IncRetried(q string) { r.retried.WithLabelValues(q).Inc() } +func (r *Recorder) IncDead(q string) { r.dead.WithLabelValues(q).Inc() } +func (r *Recorder) AddReaped(q string, n int) { + r.reaped.WithLabelValues(q).Add(float64(n)) +} +func (r *Recorder) AddPromoted(q string, n int) { + r.promoted.WithLabelValues(q).Add(float64(n)) +} +func (r *Recorder) ObserveLatency(q string, d time.Duration) { + r.latency.WithLabelValues(q).Observe(d.Seconds()) +} + +// CounterForTest returns the per-queue child counter for the named metric. It +// exists so tests can read a specific series; "name" is the short key +// (enqueued, deduped, claimed, processed, retried, dead, reaped, promoted). +func (r *Recorder) CounterForTest(name, queue string) prometheus.Counter { + var v *prometheus.CounterVec + switch name { + case "enqueued": + v = r.enqueued + case "deduped": + v = r.deduped + case "claimed": + v = r.claimed + case "processed": + v = r.processed + case "retried": + v = r.retried + case "dead": + v = r.dead + case "reaped": + v = r.reaped + case "promoted": + v = r.promoted + default: + return nil + } + return v.WithLabelValues(queue) +} +``` + +- [ ] **Step 4: Tidy modules and run the test** + +Run: +```bash +go mod tidy +go test ./internal/metrics/ -run 'TestRecorder' -v +``` +Expected: `go mod tidy` adds `github.com/prometheus/client_golang` (and transitive deps) to go.mod/go.sum; tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add internal/metrics/recorder.go internal/metrics/recorder_test.go go.mod go.sum +git commit -m "Add internal/metrics Recorder implementing broker.Metrics over Prometheus" +``` + +--- + +## Task 8: `internal/metrics` DepthCollector + +**Files:** +- Create: `internal/metrics/depth.go` +- Create: `internal/metrics/depth_test.go` + +- [ ] **Step 1: Write the failing test** + +Create `internal/metrics/depth_test.go`: + +```go +package metrics_test + +import ( + "context" + "os" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/redis/go-redis/v9" + + "github.com/StrangeNoob/relay/internal/metrics" +) + +// metricsTestRedisDB is this package's dedicated Redis DB. broker tests use 15, +// worker tests 14; metrics claims 13 so parallel `go test ./...` never collides. +const metricsTestRedisDB = 13 + +func newTestRedis(t *testing.T) *redis.Client { + t.Helper() + addr := os.Getenv("REDIS_ADDR") + if addr == "" { + addr = "localhost:6379" + } + rdb := redis.NewClient(&redis.Options{Addr: addr, DB: metricsTestRedisDB}) + ctx := context.Background() + if err := rdb.Ping(ctx).Err(); err != nil { + t.Skipf("redis not available at %s: %v", addr, err) + } + if err := rdb.FlushDB(ctx).Err(); err != nil { + t.Fatalf("flushdb: %v", err) + } + t.Cleanup(func() { _ = rdb.Close() }) + return rdb +} + +func TestDepthCollectorReportsDepths(t *testing.T) { + rdb := newTestRedis(t) + ctx := context.Background() + + // Seed known cardinalities into the per-queue keys the collector reads. + rdb.ZAdd(ctx, "q:emails:ready", redis.Z{Score: 1, Member: "a"}, redis.Z{Score: 2, Member: "b"}) + rdb.ZAdd(ctx, "q:emails:inflight", redis.Z{Score: 1, Member: "c"}) + rdb.ZAdd(ctx, "q:emails:delayed", redis.Z{Score: 1, Member: "d"}, redis.Z{Score: 2, Member: "e"}, redis.Z{Score: 3, Member: "f"}) + rdb.RPush(ctx, "q:emails:dlq", "g") + + c := metrics.NewDepthCollector(rdb, "emails") + + want := ` +# HELP relay_queue_depth Number of jobs in a queue, by state. +# TYPE relay_queue_depth gauge +relay_queue_depth{queue="emails",state="ready"} 2 +relay_queue_depth{queue="emails",state="inflight"} 1 +relay_queue_depth{queue="emails",state="delayed"} 3 +relay_queue_depth{queue="emails",state="dlq"} 1 +` + if err := testutil.CollectAndCompare(c, strings.NewReader(want), "relay_queue_depth"); err != nil { + t.Errorf("CollectAndCompare: %v", err) + } +} + +func TestDepthCollectorEmptyQueueReportsZeros(t *testing.T) { + rdb := newTestRedis(t) + c := metrics.NewDepthCollector(rdb, "emails") + + want := ` +# HELP relay_queue_depth Number of jobs in a queue, by state. +# TYPE relay_queue_depth gauge +relay_queue_depth{queue="emails",state="ready"} 0 +relay_queue_depth{queue="emails",state="inflight"} 0 +relay_queue_depth{queue="emails",state="delayed"} 0 +relay_queue_depth{queue="emails",state="dlq"} 0 +` + if err := testutil.CollectAndCompare(c, strings.NewReader(want), "relay_queue_depth"); err != nil { + t.Errorf("CollectAndCompare: %v", err) + } +} + +// Ensures the collector can be registered on a registry without panicking. +func TestDepthCollectorRegisters(t *testing.T) { + rdb := newTestRedis(t) + reg := prometheus.NewRegistry() + if err := reg.Register(metrics.NewDepthCollector(rdb, "emails")); err != nil { + t.Fatalf("Register: %v", err) + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/metrics/ -run TestDepthCollector -v` +Expected: FAIL — `NewDepthCollector` undefined. + +- [ ] **Step 3: Implement the DepthCollector** + +Create `internal/metrics/depth.go`: + +```go +package metrics + +import ( + "context" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/redis/go-redis/v9" +) + +// scrapeTimeout bounds the Redis work done during one Prometheus scrape so a +// slow or unreachable Redis cannot hang the /metrics handler. +const scrapeTimeout = 5 * time.Second + +// DepthCollector reports per-queue depth gauges by reading Redis at scrape time, +// so the values can never go stale between scrapes. It implements +// prometheus.Collector and is registered on the Recorder's registry. +type DepthCollector struct { + rdb *redis.Client + queues []string + desc *prometheus.Desc +} + +// NewDepthCollector builds a collector that reports depths for the given queues. +func NewDepthCollector(rdb *redis.Client, queues ...string) *DepthCollector { + return &DepthCollector{ + rdb: rdb, + queues: queues, + desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "", "queue_depth"), + "Number of jobs in a queue, by state.", + []string{"queue", "state"}, nil, + ), + } +} + +// Describe sends the single gauge descriptor. (Required by prometheus.Collector.) +func (c *DepthCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.desc +} + +// Collect reads ready/inflight/delayed (ZCARD) and dlq (LLEN) for each queue and +// emits one gauge sample per (queue, state). On a Redis error for a given query +// it skips that sample rather than reporting a misleading zero. +func (c *DepthCollector) Collect(ch chan<- prometheus.Metric) { + ctx, cancel := context.WithTimeout(context.Background(), scrapeTimeout) + defer cancel() + + for _, q := range c.queues { + c.emit(ctx, ch, q, "ready", c.rdb.ZCard(ctx, "q:"+q+":ready")) + c.emit(ctx, ch, q, "inflight", c.rdb.ZCard(ctx, "q:"+q+":inflight")) + c.emit(ctx, ch, q, "delayed", c.rdb.ZCard(ctx, "q:"+q+":delayed")) + c.emit(ctx, ch, q, "dlq", c.rdb.LLen(ctx, "q:"+q+":dlq")) + } +} + +// emit turns one IntCmd result into a gauge sample, skipping it on error. +func (c *DepthCollector) emit(_ context.Context, ch chan<- prometheus.Metric, queue, state string, cmd *redis.IntCmd) { + n, err := cmd.Result() + if err != nil { + return // skip this sample; do not report a stale/zero depth + } + ch <- prometheus.MustNewConstMetric(c.desc, prometheus.GaugeValue, float64(n), queue, state) +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `go test ./internal/metrics/ -run TestDepthCollector -v` +Expected: PASS (3 tests). If `CollectAndCompare` complains about sample ordering, the expected text above lists states in the emit order (ready, inflight, delayed, dlq); `CollectAndCompare` sorts internally, so order in the literal does not matter — fix any mismatch by correcting the expected values, not the code. + +- [ ] **Step 5: Full metrics suite under race** + +Run: `go test -race ./internal/metrics/` +Expected: PASS. + +- [ ] **Step 6: Commit** + +```bash +git add internal/metrics/depth.go internal/metrics/depth_test.go +git commit -m "Add DepthCollector reporting per-queue depth gauges at scrape time" +``` + +--- + +## Task 9: Wire `/metrics` into `cmd/worker` + +**Files:** +- Modify: `cmd/worker/main.go` + +- [ ] **Step 1: Add the flag and wiring** + +Read `cmd/worker/main.go` first to match its existing flag/broker/shutdown style. Then: + +1. Add a flag near the others: + +```go + metricsAddr := flag.String("metrics-addr", "", "if set (e.g. :9090), serve Prometheus /metrics on this address") +``` + +2. When building the broker, conditionally add the recorder. Build the broker options slice so `WithMetrics` is appended only when metrics are enabled, and keep a handle to the recorder: + +```go + var rec *metrics.Recorder + brokerOpts := []broker.Option{broker.WithBackoff(/* existing args unchanged */)} + // ... keep existing conditional WithRateLimit append ... + if *metricsAddr != "" { + rec = metrics.NewRecorder() + brokerOpts = append(brokerOpts, broker.WithMetrics(rec)) + } + b := broker.New(rdb, brokerOpts...) +``` + +(Adapt to the file's current construction — the key change is appending `broker.WithMetrics(rec)` when `*metricsAddr != ""`. Do not change existing options.) + +3. After the broker is built and before/alongside starting the worker pool, start the metrics HTTP server when enabled: + +```go + var metricsSrv *http.Server + if rec != nil { + rec.Registry().MustRegister(metrics.NewDepthCollector(rdb, *queue)) + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor(rec.Registry(), promhttp.HandlerOpts{})) + metricsSrv = &http.Server{Addr: *metricsAddr, Handler: mux} + go func() { + if err := metricsSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Error("relay worker: metrics server failed", "err", err) + } + }() + logger.Info("relay worker: serving metrics", "addr", *metricsAddr) + } +``` + +(Use the file's existing logger variable; if it uses `log` rather than `slog`, match that.) + +4. In the shutdown path (after the worker pool stops, where ctx is cancelled on SIGINT/SIGTERM), gracefully close the server: + +```go + if metricsSrv != nil { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := metricsSrv.Shutdown(shutdownCtx); err != nil { + logger.Error("relay worker: metrics shutdown", "err", err) + } + } +``` + +5. Add imports: `net/http`, `github.com/prometheus/client_golang/prometheus/promhttp`, `github.com/StrangeNoob/relay/internal/metrics` (and `context`/`time` if not already present). + +- [ ] **Step 2: Build and vet** + +Run: +```bash +go build ./... +go vet ./... +``` +Expected: both clean. + +- [ ] **Step 3: Smoke check the flag (optional, requires Redis)** + +Run (manual, then Ctrl-C): +```bash +go run ./cmd/worker -queue demo -concurrency 1 -metrics-addr :9090 & +sleep 1 && curl -s localhost:9090/metrics | grep relay_ | head +kill %1 +``` +Expected: `relay_*` metric lines printed (depths at least). Skip if no local Redis. + +- [ ] **Step 4: Commit** + +```bash +git add cmd/worker/main.go +git commit -m "Serve Prometheus /metrics from cmd/worker behind --metrics-addr" +``` + +--- + +## Task 10: Update CLAUDE.md and final verification + +**Files:** +- Modify: `CLAUDE.md` + +- [ ] **Step 1: Update CLAUDE.md** + +Make these edits (match exact surrounding wording when editing): + +1. **Status line** — change "Phase 2 nearly complete … only Prometheus metrics remain in Phase 2" to state Phase 2 is complete (metrics shipped). Example: "**Status: Phase 1 complete; Phase 2 complete.** The core engine plus delayed jobs, the promoter, retry backoff, priority, idempotency enforcement, per-queue rate limiting, and Prometheus metrics are built, tested against a real Redis under `-race`, and CI is green." +2. **Remaining-Phase-2 line** — remove the "(Prometheus metrics)" remaining note; only Phase 3 remains. +3. **Broker options** — add `WithMetrics(m)` to the broker-options sentence in the "What this is" section. +4. **`internal/metrics`** — in the Layout block, change the metrics entry to ✅ built and add `cmd/worker --metrics-addr`. Add `internal/metrics/` (Recorder + DepthCollector) to the built list. +5. **Build order** — Phase 2 line: change "metrics still to do" to metrics ✅; Phase 2 done. +6. **Known limitations** — add a metrics bullet: per-queue label cardinality; depth gauges cost one Redis round-trip per (queue,state) per scrape; metrics are per-process (aggregate depth gauges with max/avg, not sum); endpoint only on `cmd/worker` until Phase 3. +7. **Dependencies** — update the "Only dependency" wording to note `github.com/prometheus/client_golang` is a second direct dependency, used purely for metrics instrumentation (not a queue library — the from-scratch rule is intact). +8. **Script/option inventories** — if a broker-options inventory exists, ensure `WithMetrics` is listed. + +- [ ] **Step 2: Full verification** + +Run: +```bash +go build ./... +go test -race ./... +go vet ./... +gofmt -l internal/ cmd/ +``` +Expected: build clean; all tests pass (broker DB 15, worker DB 14, metrics DB 13 — no collisions); vet clean; `gofmt -l` prints nothing. + +- [ ] **Step 3: Commit** + +```bash +git add CLAUDE.md +git commit -m "Document Phase 2 completion: Prometheus metrics" +``` + +--- + +## Self-Review (completed during planning) + +- **Spec coverage:** Metrics interface + noop + WithMetrics (Task 1); enqueue/dedup (Task 2); claim (Task 3); ack+latency (Task 4); retry/dead (Task 5); reap/promote (Task 6); Recorder + all metric names/buckets (Task 7); DepthCollector + depth gauges (Task 8); cmd/worker endpoint + graceful shutdown (Task 9); CLAUDE.md + dependency note + known limitations (Task 10). All spec sections mapped. +- **Type consistency:** `Metrics` method set is identical across the interface (Task 1), the fake (Task 2), and the Recorder (Task 7): `IncEnqueued/IncDeduplicated/IncClaimed/IncProcessed/IncRetried/IncDead` (single, queue), `AddReaped/AddPromoted` (queue, int), `ObserveLatency` (queue, time.Duration). Metric names match the spec table. Test DB numbers: broker 15, worker 14, metrics 13. +- **No placeholders:** every code step shows complete code; the only "adapt to existing file" note is Task 9 (cmd wiring), which lists the exact additions. +- **Known soft spot:** Task 6's delayed-job setup depends on whether `WithReadyAt(pastTime)` lands a job in the delayed set; the task gives both forms and asserts `n == 2` so the implementer can pick the one that actually delays. diff --git a/docs/superpowers/specs/2026-06-08-relay-phase2-metrics-design.md b/docs/superpowers/specs/2026-06-08-relay-phase2-metrics-design.md new file mode 100644 index 0000000..541272a --- /dev/null +++ b/docs/superpowers/specs/2026-06-08-relay-phase2-metrics-design.md @@ -0,0 +1,190 @@ +# Relay — Phase 2: Prometheus Metrics + +**Status:** Approved design · **Date:** 2026-06-08 +**Parent spec:** [`2026-06-07-relay-distributed-task-queue-design.md`](2026-06-07-relay-distributed-task-queue-design.md) +**Phase:** 2 (depth) — final sub-project (follows 2a delayed/backoff, 2b priority, 2c idempotency, 2d rate limiting) + +## Purpose + +Make the engine observable. The parent spec calls for "Prometheus counters/gauges (enqueued, +processed, retried, dead, in-flight, latency)." This slice instruments every job-state transition +the broker performs and exposes them in Prometheus exposition format, plus live queue-depth gauges +read straight from Redis at scrape time. It is the last remaining Phase 2 feature; completing it +closes Phase 2. + +## Scope + +In scope: + +- A `Metrics` interface in `broker` (consumer-side contract) with a no-op default, set via a new + `WithMetrics` option. Opt-in, additive, zero behavior change when unused. +- Inline counter/histogram recording at each broker state transition (enqueue, dedup, claim, ack, + retry, dead, reap, promote, latency). +- A pull-based `prometheus.Collector` in `internal/metrics` that reports per-queue depth gauges + (`ready`/`inflight`/`delayed`/`dlq`) by running `ZCARD`/`LLEN` at scrape time. +- A `--metrics-addr` flag on `cmd/worker` that, when set, serves `/metrics`. + +Out of scope: the Phase 3 API/dashboard server (`cmd/server`); a metrics endpoint on any process +other than `cmd/worker`; tracing; alerting rules; Grafana dashboards. Phase 2 has no other +remaining features — this slice completes it. + +## Key decisions + +| Decision | Choice | Rationale | +|---|---|---| +| Instrumentation wiring | **Injected `Metrics` interface, no-op default** | Keeps the broker free of a hard prometheus dependency, allows isolated assertion with a fake recorder, and makes metrics purely additive (existing tests/usage untouched). | +| Push vs pull | **Both: counters/histogram pushed inline; depth gauges pulled at scrape** | Counters are event-driven; queue depths are point-in-time facts that should never go stale, so they are read from Redis when Prometheus scrapes. | +| Metric library | **`github.com/prometheus/client_golang`** | Correct exposition/histogram/escaping and a clean custom-collector API. It is an instrumentation library, not a queue library — the "build the queue from scratch" rule is untouched. | +| Latency definition | **End-to-end: `now − created_at`, observed in `Ack`** | Captures total time in system (queue wait + processing). Derivable entirely from the job hash; no new field or claim-time plumbing. | +| Labels | **`queue` on everything; `state` additionally on the depth gauge** | Bounded cardinality (number of queues), and the dimensions an operator actually slices by. | +| Endpoint | **`cmd/worker --metrics-addr`, empty = off** | No metrics process exists until Phase 3; the worker is the natural host. Off by default keeps current behavior byte-identical and avoids binding a port in CI. | +| Registry | **Dedicated `prometheus.Registry` per recorder** | No global mutable state; matches the project's emphasis on isolated, testable units. | + +## Data model + +No new Redis keys and no new job-hash fields. Depth gauges are computed from the existing per-queue +keys at scrape time: + +| Gauge sample | Source | +|---|---| +| `relay_queue_depth{queue,state="ready"}` | `ZCARD q:{name}:ready` | +| `relay_queue_depth{queue,state="inflight"}` | `ZCARD q:{name}:inflight` | +| `relay_queue_depth{queue,state="delayed"}` | `ZCARD q:{name}:delayed` | +| `relay_queue_depth{queue,state="dlq"}` | `LLEN q:{name}:dlq` | + +## Components & changes + +### `internal/broker` + +- **`Metrics` interface** (new file `metrics.go`): + + ```go + type Metrics interface { + IncEnqueued(queue string) + IncDeduplicated(queue string) + IncClaimed(queue string) + IncProcessed(queue string) + IncRetried(queue string) + IncDead(queue string) + AddReaped(queue string, n int) + AddPromoted(queue string, n int) + ObserveLatency(queue string, d time.Duration) + } + ``` + +- **`noopMetrics`** — unexported zero-cost implementation; every method empty. `Broker` gains a + `metrics Metrics` field initialised to `noopMetrics{}` in `New`. +- **`WithMetrics(m Metrics) Option`** — overrides the recorder. A `nil` argument is ignored (keeps + the no-op) so callers can't accidentally install a nil and panic. +- **Instrumentation points** (no Lua changes; all signals already available in Go): + - `Enqueue`: on success → `IncEnqueued(j.Queue)`; on `ErrDuplicate` → `IncDeduplicated(j.Queue)` + (recorded before the sentinel is returned). + - `Claim`: when a job is returned (`ok == true`) → `IncClaimed(queue)`. Empty/rate-limited claims + record nothing. + - `Ack`: → `IncProcessed(j.Queue)` and `ObserveLatency(j.Queue, time.Since(j.CreatedAt))`. + - `Nack`: capture `nack.lua`'s existing `"retry"`/`"dead"` return (currently discarded via + `.Err()`) → `IncRetried` / `IncDead`. Still returns its current error contract. + - `Reap`: when `n > 0` → `AddReaped(queue, n)`. + - `Promote`: when `n > 0` → `AddPromoted(queue, n)`. + + Metrics are recorded only after the underlying Redis operation succeeds, so a failed op never + inflates a counter. + +### `internal/metrics` (new package) + +Depends only on `prometheus/client_golang` and `redis/go-redis` (for the collector) — **not** on +`broker`. It satisfies `broker.Metrics` structurally. + +- **`Recorder`** — implements `broker.Metrics` over a private `*prometheus.Registry`. Holds the + counter vecs and histogram vec (all `*prometheus.CounterVec` / `*prometheus.HistogramVec` with a + `queue` label). Constructor `NewRecorder() *Recorder` registers them on a fresh registry. + `(*Recorder).Registry() *prometheus.Registry` exposes it for the HTTP handler. +- Metric definitions (namespace `relay`): + + | Metric | Type | Labels | + |---|---|---| + | `relay_jobs_enqueued_total` | counter | `queue` | + | `relay_jobs_deduplicated_total` | counter | `queue` | + | `relay_jobs_claimed_total` | counter | `queue` | + | `relay_jobs_processed_total` | counter | `queue` | + | `relay_jobs_retried_total` | counter | `queue` | + | `relay_jobs_dead_total` | counter | `queue` | + | `relay_jobs_reaped_total` | counter | `queue` | + | `relay_jobs_promoted_total` | counter | `queue` | + | `relay_job_latency_seconds` | histogram | `queue` | + | `relay_queue_depth` | gauge (via collector) | `queue`, `state` | + + The latency histogram uses buckets spanning roughly 1ms→5min (end-to-end time is much wider than + prometheus' default 5ms→10s), e.g. `prometheus.ExponentialBuckets(0.001, 3, 13)`. + +- **`DepthCollector`** — implements `prometheus.Collector`. Constructed with the redis client and + the queue name(s) to watch (`NewDepthCollector(rdb, queues...)`). `Describe` emits the + `relay_queue_depth` descriptor; `Collect` runs `ZCARD`/`LLEN` per queue under a short + scrape-scoped `context.Context` (a few seconds) and emits one gauge sample per (queue, state). On + a Redis error for a given query it skips that sample rather than emitting a stale or zero value. + Registered on the `Recorder`'s registry by the caller. + +### `cmd/worker` + +- New `--metrics-addr` flag (string, default `""`). +- When empty: unchanged — broker built without `WithMetrics`, no HTTP server. +- When set: build a `metrics.NewRecorder()`, pass `broker.WithMetrics(rec)`, register a + `metrics.NewDepthCollector(rdb, *queue)` on `rec.Registry()`, and start an `http.Server` serving + `promhttp.HandlerFor(rec.Registry(), promhttp.HandlerOpts{})` at `/metrics`. The server is shut + down (graceful `Shutdown`) alongside the worker pool / reaper / promoter on SIGINT/SIGTERM. + +## Testing + +Real Redis where Redis is needed; skip (not fail) when unreachable. To keep `go test ./...` +parallel-safe, the metrics package uses **Redis DB 13** (broker tests use DB 15, worker tests DB +14, so no `FlushDB` collisions). + +### `internal/broker` + +A fake `Metrics` recorder (records calls/counts in memory) injected via `WithMetrics`, asserted +against real Redis ops: + +- Enqueue (no key) → exactly one `IncEnqueued`, queue correct. +- Enqueue duplicate (same idempotency key within TTL) → one `IncEnqueued` then one + `IncDeduplicated`; ready ZCard reflects only the first. +- Claim of a ready job → one `IncClaimed`; claim of an empty queue → no call. +- Ack → one `IncProcessed` and one `ObserveLatency` with a non-negative duration. +- Nack with retries left → one `IncRetried`, none `IncDead`; Nack with budget spent → one + `IncDead`, none `IncRetried`. +- Reap of N expired → one `AddReaped(queue, N)`; Promote of N due → one `AddPromoted(queue, N)`. +- Default broker (no `WithMetrics`) and `WithMetrics(nil)` → no panic across a full + enqueue/claim/ack cycle (noop path). + +### `internal/metrics` + +- Compile-time assertion `var _ broker.Metrics = (*Recorder)(nil)`. +- Using `prometheus/testutil`: after recording, assert counter/histogram sample values + (`testutil.ToFloat64`, `CollectAndCount`) for representative metrics and the `queue` label. +- `DepthCollector` against real Redis (DB 13): seed known members into `ready`/`inflight`/`delayed` + and items into `dlq`, then assert `relay_queue_depth{state=…}` matches via + `testutil.CollectAndCompare` / `GatherAndCompare`. +- Empty/unknown queue → depth samples are `0` (or absent on Redis error); no panic. + +### `cmd/worker` + +Build/vet only (consistent with prior slices); no integration test for the HTTP server. + +## Known limitations + +- **Label cardinality is per queue.** One time series per metric per queue. Fine for the handful of + queues this project targets; a deployment with unbounded queue names would need care. +- **Depth gauges cost a scrape-time round-trip.** Each scrape issues one Redis call per (queue, + state). Cheap at this scale; a very high scrape frequency against many queues would add load. +- **Metrics are per-process.** Each worker process exposes its own counters; cluster-wide totals + come from Prometheus aggregating across scraped targets, as usual. Depth gauges read shared Redis, + so every worker reports the same depths — operators should aggregate with `max`/`avg`, not `sum`. +- **Endpoint only on `cmd/worker`.** The standalone API/dashboard server is Phase 3. + +## Invariants preserved + +- At-least-once delivery — metrics are read-only observation; no job movement changes. +- The atomic claim is sacred — no Lua script is modified; instrumentation is Go-side, after the + atomic op succeeds. +- Crash safety via the reaper is untouched. +- Build the queue from scratch on Redis primitives — `prometheus/client_golang` instruments; it + does not implement any queue mechanics. diff --git a/go.mod b/go.mod index ef60965..c81228e 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,22 @@ go 1.24 toolchain go1.25.11 -require github.com/redis/go-redis/v9 v9.20.0 +require ( + github.com/prometheus/client_golang v1.23.2 + github.com/redis/go-redis/v9 v9.20.0 +) require ( + github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect go.uber.org/atomic v1.11.0 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect + golang.org/x/sys v0.35.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect ) diff --git a/go.sum b/go.sum index 53cee5f..6286537 100644 --- a/go.sum +++ b/go.sum @@ -1,22 +1,56 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/redis/go-redis/v9 v9.20.0 h1:WnQYxLkgO2xiXTCJY0ldIiI8dNqCDlQAG+AtaH7a2a0= github.com/redis/go-redis/v9 v9.20.0/go.mod h1:v/M13XI1PVCDcm01VtPFOADfZtHf8YW3baQf57KlIkA= -github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs= github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/broker/broker.go b/internal/broker/broker.go index 562de1e..2f16832 100644 --- a/internal/broker/broker.go +++ b/internal/broker/broker.go @@ -27,6 +27,7 @@ type Broker struct { backoffMax time.Duration dedupTTL time.Duration rateLimits map[string]rateLimit + metrics Metrics rndMu sync.Mutex rnd *rand.Rand @@ -53,6 +54,7 @@ func New(rdb *redis.Client, opts ...Option) *Broker { backoffBase: time.Second, backoffMax: 10 * time.Minute, dedupTTL: 24 * time.Hour, + metrics: noopMetrics{}, rnd: rand.New(rand.NewSource(time.Now().UnixNano())), } for _, opt := range opts { @@ -197,8 +199,10 @@ func (b *Broker) Enqueue(ctx context.Context, j job.Job, opts ...EnqueueOption) return fmt.Errorf("broker: enqueuing job %s: %w", j.ID, err) } if res == "dup" { + b.metrics.IncDeduplicated(j.Queue) return ErrDuplicate } + b.metrics.IncEnqueued(j.Queue) return nil } @@ -236,6 +240,7 @@ func (b *Broker) Claim(ctx context.Context, queue string, visibility time.Durati if err != nil { return job.Job{}, false, fmt.Errorf("broker: claiming from %q: %w", queue, err) } + b.metrics.IncClaimed(queue) return j, true, nil } @@ -249,6 +254,8 @@ func (b *Broker) Ack(ctx context.Context, j job.Job) error { ).Err(); err != nil { return fmt.Errorf("broker: acking job %s: %w", j.ID, err) } + b.metrics.IncProcessed(j.Queue) + b.metrics.ObserveLatency(j.Queue, time.Since(j.CreatedAt)) return nil } @@ -263,12 +270,21 @@ func (b *Broker) Nack(ctx context.Context, j job.Job) error { b.rndMu.Unlock() readyAt := time.Now().Add(delay).UnixMilli() - if err := nackScript.Run(ctx, b.rdb, + outcome, err := nackScript.Run(ctx, b.rdb, []string{inflightKey(j.Queue), delayedKey(j.Queue), dlqKey(j.Queue)}, j.ID, jobKeyPrefix, readyAt, - ).Err(); err != nil { + ).Text() + if err != nil { return fmt.Errorf("broker: nacking job %s: %w", j.ID, err) } + // nack.lua returns "retry" or "dead"; any other value intentionally records + // nothing rather than mis-attributing a count. + switch outcome { + case "retry": + b.metrics.IncRetried(j.Queue) + case "dead": + b.metrics.IncDead(j.Queue) + } return nil } @@ -296,6 +312,9 @@ func (b *Broker) Reap(ctx context.Context, queue string) (int, error) { if err != nil { return 0, fmt.Errorf("broker: reaping %q: %w", queue, err) } + if n > 0 { + b.metrics.AddReaped(queue, n) + } return n, nil } @@ -311,6 +330,9 @@ func (b *Broker) Promote(ctx context.Context, queue string) (int, error) { if err != nil { return 0, fmt.Errorf("broker: promoting %q: %w", queue, err) } + if n > 0 { + b.metrics.AddPromoted(queue, n) + } return n, nil } diff --git a/internal/broker/broker_test.go b/internal/broker/broker_test.go index 98f7b7f..01ee419 100644 --- a/internal/broker/broker_test.go +++ b/internal/broker/broker_test.go @@ -30,6 +30,13 @@ const testRedisDB = 15 // unreachable the test is skipped rather than failed, so the suite still runs // off-CI. func newTestBroker(t *testing.T) (*broker.Broker, *redis.Client) { + t.Helper() + return newTestBrokerWith(t) +} + +// newTestBrokerWith is newTestBroker with broker options, for tests that need to +// inject a recorder or other configuration. +func newTestBrokerWith(t *testing.T, opts ...broker.Option) (*broker.Broker, *redis.Client) { t.Helper() addr := os.Getenv("REDIS_ADDR") if addr == "" { @@ -44,7 +51,7 @@ func newTestBroker(t *testing.T) (*broker.Broker, *redis.Client) { t.Fatalf("flushdb: %v", err) } t.Cleanup(func() { _ = rdb.Close() }) - return broker.New(rdb), rdb + return broker.New(rdb, opts...), rdb } func TestEnqueuePersistsJob(t *testing.T) { diff --git a/internal/broker/instrumentation_test.go b/internal/broker/instrumentation_test.go new file mode 100644 index 0000000..c7bf0d0 --- /dev/null +++ b/internal/broker/instrumentation_test.go @@ -0,0 +1,276 @@ +package broker_test + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/StrangeNoob/relay/internal/broker" + "github.com/StrangeNoob/relay/internal/job" +) + +// fakeMetrics is a Metrics recorder that counts calls per queue in memory, so a +// test can assert exactly which transition the broker recorded. It is safe for +// concurrent use because the broker may record from multiple goroutines. +type fakeMetrics struct { + mu sync.Mutex + enqueued map[string]int + deduped map[string]int + claimed map[string]int + processed map[string]int + retried map[string]int + dead map[string]int + reaped map[string]int + promoted map[string]int + latencies []time.Duration +} + +func newFakeMetrics() *fakeMetrics { + return &fakeMetrics{ + enqueued: map[string]int{}, + deduped: map[string]int{}, + claimed: map[string]int{}, + processed: map[string]int{}, + retried: map[string]int{}, + dead: map[string]int{}, + reaped: map[string]int{}, + promoted: map[string]int{}, + } +} + +func (f *fakeMetrics) IncEnqueued(q string) { f.bump(f.enqueued, q) } +func (f *fakeMetrics) IncDeduplicated(q string) { f.bump(f.deduped, q) } +func (f *fakeMetrics) IncClaimed(q string) { f.bump(f.claimed, q) } +func (f *fakeMetrics) IncProcessed(q string) { f.bump(f.processed, q) } +func (f *fakeMetrics) IncRetried(q string) { f.bump(f.retried, q) } +func (f *fakeMetrics) IncDead(q string) { f.bump(f.dead, q) } +func (f *fakeMetrics) AddReaped(q string, n int) { f.add(f.reaped, q, n) } +func (f *fakeMetrics) AddPromoted(q string, n int) { f.add(f.promoted, q, n) } +func (f *fakeMetrics) ObserveLatency(q string, d time.Duration) { + f.mu.Lock() + defer f.mu.Unlock() + f.latencies = append(f.latencies, d) +} + +func (f *fakeMetrics) bump(m map[string]int, q string) { f.add(m, q, 1) } +func (f *fakeMetrics) add(m map[string]int, q string, n int) { + f.mu.Lock() + defer f.mu.Unlock() + m[q] += n +} + +func (f *fakeMetrics) get(m map[string]int, q string) int { + f.mu.Lock() + defer f.mu.Unlock() + return m[q] +} + +func TestEnqueueRecordsEnqueued(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + if err := b.Enqueue(ctx, job.New("emails", []byte("x"))); err != nil { + t.Fatalf("Enqueue: %v", err) + } + if got := fm.get(fm.enqueued, "emails"); got != 1 { + t.Errorf("enqueued[emails] = %d, want 1", got) + } + if got := fm.get(fm.deduped, "emails"); got != 0 { + t.Errorf("deduped[emails] = %d, want 0", got) + } +} + +func TestEnqueueDuplicateRecordsDeduplicated(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + j1 := job.New("emails", []byte("a")) + if err := b.Enqueue(ctx, j1, broker.WithIdempotencyKey("k1")); err != nil { + t.Fatalf("first Enqueue: %v", err) + } + j2 := job.New("emails", []byte("b")) + if err := b.Enqueue(ctx, j2, broker.WithIdempotencyKey("k1")); !errors.Is(err, broker.ErrDuplicate) { + t.Fatalf("second Enqueue err = %v, want ErrDuplicate", err) + } + if got := fm.get(fm.enqueued, "emails"); got != 1 { + t.Errorf("enqueued[emails] = %d, want 1", got) + } + if got := fm.get(fm.deduped, "emails"); got != 1 { + t.Errorf("deduped[emails] = %d, want 1", got) + } +} + +func TestClaimRecordsClaimed(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + if err := b.Enqueue(ctx, job.New("emails", []byte("x"))); err != nil { + t.Fatalf("Enqueue: %v", err) + } + if _, ok, err := b.Claim(ctx, "emails", time.Minute); err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v, want true/nil", ok, err) + } + if got := fm.get(fm.claimed, "emails"); got != 1 { + t.Errorf("claimed[emails] = %d, want 1", got) + } +} + +func TestClaimEmptyQueueRecordsNothing(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + if _, ok, err := b.Claim(ctx, "emails", time.Minute); err != nil || ok { + t.Fatalf("Claim on empty: ok=%v err=%v, want false/nil", ok, err) + } + if got := fm.get(fm.claimed, "emails"); got != 0 { + t.Errorf("claimed[emails] = %d, want 0", got) + } +} + +func TestAckRecordsProcessedAndLatency(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + if err := b.Enqueue(ctx, job.New("emails", []byte("x"))); err != nil { + t.Fatalf("Enqueue: %v", err) + } + j, ok, err := b.Claim(ctx, "emails", time.Minute) + if err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v", ok, err) + } + if err := b.Ack(ctx, j); err != nil { + t.Fatalf("Ack: %v", err) + } + if got := fm.get(fm.processed, "emails"); got != 1 { + t.Errorf("processed[emails] = %d, want 1", got) + } + fm.mu.Lock() + n := len(fm.latencies) + var d time.Duration + if n == 1 { + d = fm.latencies[0] + } + fm.mu.Unlock() + if n != 1 { + t.Fatalf("latencies len = %d, want 1", n) + } + if d < 0 { + t.Errorf("latency = %v, want non-negative", d) + } +} + +// nackTestJob enqueues, claims, and returns a job set up so the next Nack takes +// the requested branch. maxRetries controls retry-vs-dead: with maxRetries=5 the +// first nack retries; with maxRetries=0 the first nack dead-letters. +func nackTestJob(t *testing.T, b *broker.Broker, ctx context.Context, maxRetries int) job.Job { + t.Helper() + j := job.New("emails", []byte("x")) + j.MaxRetries = maxRetries + if err := b.Enqueue(ctx, j); err != nil { + t.Fatalf("Enqueue: %v", err) + } + claimed, ok, err := b.Claim(ctx, "emails", time.Minute) + if err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v", ok, err) + } + return claimed +} + +func TestNackWithRetriesLeftRecordsRetried(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + j := nackTestJob(t, b, ctx, 5) // attempts now 1 < 5 -> retry + if err := b.Nack(ctx, j); err != nil { + t.Fatalf("Nack: %v", err) + } + if got := fm.get(fm.retried, "emails"); got != 1 { + t.Errorf("retried[emails] = %d, want 1", got) + } + if got := fm.get(fm.dead, "emails"); got != 0 { + t.Errorf("dead[emails] = %d, want 0", got) + } +} + +func TestNackWithBudgetSpentRecordsDead(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + j := nackTestJob(t, b, ctx, 0) // attempts now 1, max 0 -> dead + if err := b.Nack(ctx, j); err != nil { + t.Fatalf("Nack: %v", err) + } + if got := fm.get(fm.dead, "emails"); got != 1 { + t.Errorf("dead[emails] = %d, want 1", got) + } + if got := fm.get(fm.retried, "emails"); got != 0 { + t.Errorf("retried[emails] = %d, want 0", got) + } +} + +func TestReapRecordsReapedCount(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + // Enqueue + claim two jobs with a tiny visibility so they expire immediately. + for i := 0; i < 2; i++ { + if err := b.Enqueue(ctx, job.New("emails", []byte("x"))); err != nil { + t.Fatalf("Enqueue: %v", err) + } + } + for i := 0; i < 2; i++ { + if _, ok, err := b.Claim(ctx, "emails", time.Millisecond); err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v", ok, err) + } + } + time.Sleep(10 * time.Millisecond) // let the visibility deadline pass + + n, err := b.Reap(ctx, "emails") + if err != nil { + t.Fatalf("Reap: %v", err) + } + if n != 2 { + t.Fatalf("Reap returned %d, want 2", n) + } + if got := fm.get(fm.reaped, "emails"); got != 2 { + t.Errorf("reaped[emails] = %d, want 2", got) + } +} + +func TestPromoteRecordsPromotedCount(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + // Enqueue two delayed jobs whose ready-at is just in the future, then wait + // well past it. The sleep margin is generous (>3x) so the test stays reliable + // on a loaded CI runner under -race. + soon := time.Now().Add(30 * time.Millisecond) + for i := 0; i < 2; i++ { + if err := b.Enqueue(ctx, job.New("emails", []byte("x")), broker.WithReadyAt(soon)); err != nil { + t.Fatalf("Enqueue delayed: %v", err) + } + } + time.Sleep(100 * time.Millisecond) + + n, err := b.Promote(ctx, "emails") + if err != nil { + t.Fatalf("Promote: %v", err) + } + if n != 2 { + t.Fatalf("Promote returned %d, want 2", n) + } + if got := fm.get(fm.promoted, "emails"); got != 2 { + t.Errorf("promoted[emails] = %d, want 2", got) + } +} diff --git a/internal/broker/metrics.go b/internal/broker/metrics.go new file mode 100644 index 0000000..131f522 --- /dev/null +++ b/internal/broker/metrics.go @@ -0,0 +1,51 @@ +package broker + +import "time" + +// Metrics receives a callback for every job-state transition the broker makes. +// It is the broker's consumer-side instrumentation contract: the broker depends +// on this small interface, not on any metrics library, so a Prometheus recorder +// (internal/metrics) — or a test fake — can be plugged in via WithMetrics. The +// default is noopMetrics, so a broker built without WithMetrics records nothing +// and behaves exactly as before. +// +// Every method takes the queue name so the implementation can label its series +// per queue. Reap/Promote add a batch count because one call moves many jobs; +// the rest are single events. ObserveLatency reports a job's end-to-end time in +// the system (creation -> ack). +type Metrics interface { + IncEnqueued(queue string) + IncDeduplicated(queue string) + IncClaimed(queue string) + IncProcessed(queue string) + IncRetried(queue string) + IncDead(queue string) + AddReaped(queue string, n int) + AddPromoted(queue string, n int) + ObserveLatency(queue string, d time.Duration) +} + +// noopMetrics is the default recorder: every method does nothing. It lets the +// broker call b.metrics unconditionally without nil checks, and keeps metrics +// entirely opt-in. +type noopMetrics struct{} + +func (noopMetrics) IncEnqueued(string) {} +func (noopMetrics) IncDeduplicated(string) {} +func (noopMetrics) IncClaimed(string) {} +func (noopMetrics) IncProcessed(string) {} +func (noopMetrics) IncRetried(string) {} +func (noopMetrics) IncDead(string) {} +func (noopMetrics) AddReaped(string, int) {} +func (noopMetrics) AddPromoted(string, int) {} +func (noopMetrics) ObserveLatency(string, time.Duration) {} + +// WithMetrics installs a Metrics recorder. A nil recorder is ignored so callers +// cannot accidentally replace the safe no-op with something that panics. +func WithMetrics(m Metrics) Option { + return func(b *Broker) { + if m != nil { + b.metrics = m + } + } +} diff --git a/internal/broker/metrics_test.go b/internal/broker/metrics_test.go new file mode 100644 index 0000000..e792674 --- /dev/null +++ b/internal/broker/metrics_test.go @@ -0,0 +1,32 @@ +package broker + +import "testing" + +func TestNewDefaultsToNoopMetrics(t *testing.T) { + b := New(nil) + if b.metrics == nil { + t.Fatal("New: metrics field is nil, want noopMetrics default") + } + if _, ok := b.metrics.(noopMetrics); !ok { + t.Fatalf("New: metrics = %T, want noopMetrics", b.metrics) + } +} + +// distinctMetrics is a sentinel recorder with its own identity, so the test can +// prove WithMetrics installed exactly this value (noopMetrics{} has no identity). +type distinctMetrics struct{ noopMetrics } + +func TestWithMetricsInstallsRecorder(t *testing.T) { + rec := &distinctMetrics{} + b := New(nil, WithMetrics(rec)) + if b.metrics != rec { + t.Fatalf("WithMetrics: metrics = %v, want the installed recorder", b.metrics) + } +} + +func TestWithMetricsNilIsIgnored(t *testing.T) { + b := New(nil, WithMetrics(nil)) + if _, ok := b.metrics.(noopMetrics); !ok { + t.Fatalf("WithMetrics(nil): metrics = %T, want noopMetrics retained", b.metrics) + } +} diff --git a/internal/metrics/depth.go b/internal/metrics/depth.go new file mode 100644 index 0000000..c4cba93 --- /dev/null +++ b/internal/metrics/depth.go @@ -0,0 +1,64 @@ +package metrics + +import ( + "context" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/redis/go-redis/v9" +) + +// scrapeTimeout bounds the Redis work done during one Prometheus scrape so a +// slow or unreachable Redis cannot hang the /metrics handler. +const scrapeTimeout = 5 * time.Second + +// DepthCollector reports per-queue depth gauges by reading Redis at scrape time, +// so the values can never go stale between scrapes. It implements +// prometheus.Collector and is registered on the Recorder's registry. +type DepthCollector struct { + rdb *redis.Client + queues []string + desc *prometheus.Desc +} + +// NewDepthCollector builds a collector that reports depths for the given queues. +func NewDepthCollector(rdb *redis.Client, queues ...string) *DepthCollector { + return &DepthCollector{ + rdb: rdb, + queues: queues, + desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "", "queue_depth"), + "Number of jobs in a queue, by state.", + []string{"queue", "state"}, nil, + ), + } +} + +// Describe sends the single gauge descriptor. (Required by prometheus.Collector.) +func (c *DepthCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.desc +} + +// Collect reads ready/inflight/delayed (ZCARD) and dlq (LLEN) for each queue and +// emits one gauge sample per (queue, state). On a Redis error for a given query +// it skips that sample rather than reporting a misleading zero. +func (c *DepthCollector) Collect(ch chan<- prometheus.Metric) { + ctx, cancel := context.WithTimeout(context.Background(), scrapeTimeout) + defer cancel() + + for _, q := range c.queues { + c.emit(ch, q, "ready", c.rdb.ZCard(ctx, "q:"+q+":ready")) + c.emit(ch, q, "inflight", c.rdb.ZCard(ctx, "q:"+q+":inflight")) + c.emit(ch, q, "delayed", c.rdb.ZCard(ctx, "q:"+q+":delayed")) + c.emit(ch, q, "dlq", c.rdb.LLen(ctx, "q:"+q+":dlq")) + } +} + +// emit turns one IntCmd result into a gauge sample, skipping it on error. +func (c *DepthCollector) emit(ch chan<- prometheus.Metric, queue, state string, cmd *redis.IntCmd) { + n, err := cmd.Result() + if err != nil { + return // skip this sample; do not report a stale/zero depth + } + ch <- prometheus.MustNewConstMetric(c.desc, prometheus.GaugeValue, float64(n), queue, state) +} diff --git a/internal/metrics/depth_test.go b/internal/metrics/depth_test.go new file mode 100644 index 0000000..2c95b76 --- /dev/null +++ b/internal/metrics/depth_test.go @@ -0,0 +1,95 @@ +package metrics_test + +import ( + "context" + "os" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/redis/go-redis/v9" + + "github.com/StrangeNoob/relay/internal/metrics" +) + +// metricsTestRedisDB is this package's dedicated Redis DB. broker tests use 15, +// worker tests 14; metrics claims 13 so parallel `go test ./...` never collides. +const metricsTestRedisDB = 13 + +func newTestRedis(t *testing.T) *redis.Client { + t.Helper() + addr := os.Getenv("REDIS_ADDR") + if addr == "" { + addr = "localhost:6379" + } + rdb := redis.NewClient(&redis.Options{Addr: addr, DB: metricsTestRedisDB}) + ctx := context.Background() + if err := rdb.Ping(ctx).Err(); err != nil { + t.Skipf("redis not available at %s: %v", addr, err) + } + if err := rdb.FlushDB(ctx).Err(); err != nil { + t.Fatalf("flushdb: %v", err) + } + t.Cleanup(func() { _ = rdb.Close() }) + return rdb +} + +// mustSeed fails the test immediately if seeding a fixture into Redis errored. +func mustSeed(t *testing.T, err error) { + t.Helper() + if err != nil { + t.Fatalf("seed redis: %v", err) + } +} + +func TestDepthCollectorReportsDepths(t *testing.T) { + rdb := newTestRedis(t) + ctx := context.Background() + + // Seed known cardinalities; fail loudly if seeding itself errors so a + // comparison mismatch can never be misread as a collector bug. + mustSeed(t, rdb.ZAdd(ctx, "q:emails:ready", redis.Z{Score: 1, Member: "a"}, redis.Z{Score: 2, Member: "b"}).Err()) + mustSeed(t, rdb.ZAdd(ctx, "q:emails:inflight", redis.Z{Score: 1, Member: "c"}).Err()) + mustSeed(t, rdb.ZAdd(ctx, "q:emails:delayed", redis.Z{Score: 1, Member: "d"}, redis.Z{Score: 2, Member: "e"}, redis.Z{Score: 3, Member: "f"}).Err()) + mustSeed(t, rdb.RPush(ctx, "q:emails:dlq", "g").Err()) + + c := metrics.NewDepthCollector(rdb, "emails") + + want := ` +# HELP relay_queue_depth Number of jobs in a queue, by state. +# TYPE relay_queue_depth gauge +relay_queue_depth{queue="emails",state="ready"} 2 +relay_queue_depth{queue="emails",state="inflight"} 1 +relay_queue_depth{queue="emails",state="delayed"} 3 +relay_queue_depth{queue="emails",state="dlq"} 1 +` + if err := testutil.CollectAndCompare(c, strings.NewReader(want), "relay_queue_depth"); err != nil { + t.Errorf("CollectAndCompare: %v", err) + } +} + +func TestDepthCollectorEmptyQueueReportsZeros(t *testing.T) { + rdb := newTestRedis(t) + c := metrics.NewDepthCollector(rdb, "emails") + + want := ` +# HELP relay_queue_depth Number of jobs in a queue, by state. +# TYPE relay_queue_depth gauge +relay_queue_depth{queue="emails",state="ready"} 0 +relay_queue_depth{queue="emails",state="inflight"} 0 +relay_queue_depth{queue="emails",state="delayed"} 0 +relay_queue_depth{queue="emails",state="dlq"} 0 +` + if err := testutil.CollectAndCompare(c, strings.NewReader(want), "relay_queue_depth"); err != nil { + t.Errorf("CollectAndCompare: %v", err) + } +} + +func TestDepthCollectorRegisters(t *testing.T) { + rdb := newTestRedis(t) + reg := prometheus.NewRegistry() + if err := reg.Register(metrics.NewDepthCollector(rdb, "emails")); err != nil { + t.Fatalf("Register: %v", err) + } +} diff --git a/internal/metrics/export_test.go b/internal/metrics/export_test.go new file mode 100644 index 0000000..6ea9ddc --- /dev/null +++ b/internal/metrics/export_test.go @@ -0,0 +1,33 @@ +package metrics + +import "github.com/prometheus/client_golang/prometheus" + +// CounterForTest returns the per-queue child counter for the named metric so +// tests in the external metrics_test package can read a specific series. It +// lives in an _test.go file so it never ships in the production build. "name" is +// the short key (enqueued, deduped, claimed, processed, retried, dead, reaped, +// promoted); an unknown name returns nil. +func (r *Recorder) CounterForTest(name, queue string) prometheus.Counter { + var v *prometheus.CounterVec + switch name { + case "enqueued": + v = r.enqueued + case "deduped": + v = r.deduped + case "claimed": + v = r.claimed + case "processed": + v = r.processed + case "retried": + v = r.retried + case "dead": + v = r.dead + case "reaped": + v = r.reaped + case "promoted": + v = r.promoted + default: + return nil + } + return v.WithLabelValues(queue) +} diff --git a/internal/metrics/recorder.go b/internal/metrics/recorder.go new file mode 100644 index 0000000..3dd3508 --- /dev/null +++ b/internal/metrics/recorder.go @@ -0,0 +1,86 @@ +// Package metrics provides the Prometheus implementation of the broker's +// instrumentation contract (broker.Metrics) plus a pull-based collector for +// per-queue depth gauges. It deliberately does not import internal/broker: it +// satisfies broker.Metrics structurally, keeping the dependency arrow one-way. +package metrics + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +const namespace = "relay" + +// Recorder implements broker.Metrics over a private Prometheus registry. Build +// it with NewRecorder, install it with broker.WithMetrics(rec), and serve +// rec.Registry() over HTTP. Counters and the latency histogram are labelled by +// queue. +type Recorder struct { + reg *prometheus.Registry + + enqueued *prometheus.CounterVec + deduped *prometheus.CounterVec + claimed *prometheus.CounterVec + processed *prometheus.CounterVec + retried *prometheus.CounterVec + dead *prometheus.CounterVec + reaped *prometheus.CounterVec + promoted *prometheus.CounterVec + latency *prometheus.HistogramVec +} + +// NewRecorder builds a Recorder with all metrics registered on a fresh registry. +func NewRecorder() *Recorder { + reg := prometheus.NewRegistry() + + counter := func(name, help string) *prometheus.CounterVec { + c := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, Name: name, Help: help, + }, []string{"queue"}) + reg.MustRegister(c) + return c + } + + r := &Recorder{ + reg: reg, + enqueued: counter("jobs_enqueued_total", "Jobs accepted into a queue."), + deduped: counter("jobs_deduplicated_total", "Enqueues dropped as idempotency-key duplicates."), + claimed: counter("jobs_claimed_total", "Jobs claimed by a worker."), + processed: counter("jobs_processed_total", "Jobs acked after successful processing."), + retried: counter("jobs_retried_total", "Failed jobs requeued for retry."), + dead: counter("jobs_dead_total", "Jobs moved to the dead-letter queue."), + reaped: counter("jobs_reaped_total", "Expired in-flight jobs requeued by the reaper."), + promoted: counter("jobs_promoted_total", "Delayed jobs promoted to ready."), + } + r.latency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Name: "job_latency_seconds", + Help: "End-to-end time from job creation to ack.", + // End-to-end latency spans ms to minutes, wider than the default + // 5ms-10s buckets: ~1ms -> ~9min across 13 buckets. + Buckets: prometheus.ExponentialBuckets(0.001, 3, 13), + }, []string{"queue"}) + reg.MustRegister(r.latency) + return r +} + +// Registry exposes the underlying registry for an HTTP handler and for +// registering additional collectors (e.g. DepthCollector). +func (r *Recorder) Registry() *prometheus.Registry { return r.reg } + +func (r *Recorder) IncEnqueued(q string) { r.enqueued.WithLabelValues(q).Inc() } +func (r *Recorder) IncDeduplicated(q string) { r.deduped.WithLabelValues(q).Inc() } +func (r *Recorder) IncClaimed(q string) { r.claimed.WithLabelValues(q).Inc() } +func (r *Recorder) IncProcessed(q string) { r.processed.WithLabelValues(q).Inc() } +func (r *Recorder) IncRetried(q string) { r.retried.WithLabelValues(q).Inc() } +func (r *Recorder) IncDead(q string) { r.dead.WithLabelValues(q).Inc() } +func (r *Recorder) AddReaped(q string, n int) { + r.reaped.WithLabelValues(q).Add(float64(n)) +} +func (r *Recorder) AddPromoted(q string, n int) { + r.promoted.WithLabelValues(q).Add(float64(n)) +} +func (r *Recorder) ObserveLatency(q string, d time.Duration) { + r.latency.WithLabelValues(q).Observe(d.Seconds()) +} diff --git a/internal/metrics/recorder_test.go b/internal/metrics/recorder_test.go new file mode 100644 index 0000000..a71b6ea --- /dev/null +++ b/internal/metrics/recorder_test.go @@ -0,0 +1,58 @@ +package metrics_test + +import ( + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus/testutil" + + "github.com/StrangeNoob/relay/internal/broker" + "github.com/StrangeNoob/relay/internal/metrics" +) + +// Compile-time proof the Recorder satisfies the broker's instrumentation contract. +var _ broker.Metrics = (*metrics.Recorder)(nil) + +func TestRecorderCountersIncrement(t *testing.T) { + rec := metrics.NewRecorder() + + rec.IncEnqueued("emails") + rec.IncEnqueued("emails") + rec.IncDeduplicated("emails") + rec.IncClaimed("emails") + rec.IncProcessed("emails") + rec.IncRetried("emails") + rec.IncDead("emails") + rec.AddReaped("emails", 3) + rec.AddPromoted("emails", 4) + + checks := []struct { + name string + metric string + want float64 + }{ + {"enqueued", "relay_jobs_enqueued_total", 2}, + {"deduped", "relay_jobs_deduplicated_total", 1}, + {"claimed", "relay_jobs_claimed_total", 1}, + {"processed", "relay_jobs_processed_total", 1}, + {"retried", "relay_jobs_retried_total", 1}, + {"dead", "relay_jobs_dead_total", 1}, + {"reaped", "relay_jobs_reaped_total", 3}, + {"promoted", "relay_jobs_promoted_total", 4}, + } + for _, c := range checks { + if got := testutil.ToFloat64(rec.CounterForTest(c.name, "emails")); got != c.want { + t.Errorf("%s{queue=emails} = %v, want %v", c.metric, got, c.want) + } + } +} + +func TestRecorderObservesLatency(t *testing.T) { + rec := metrics.NewRecorder() + rec.ObserveLatency("emails", 250*time.Millisecond) + + got := testutil.CollectAndCount(rec.Registry(), "relay_job_latency_seconds") + if got == 0 { + t.Fatal("relay_job_latency_seconds: no series collected after one observation") + } +}