From 5e490707ffe7d4e2fb7ed11535d2647af5dd78f6 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 13:18:48 +0530 Subject: [PATCH 01/19] Add Phase 2 metrics design spec: Prometheus instrumentation --- .../2026-06-08-relay-phase2-metrics-design.md | 190 ++++++++++++++++++ 1 file changed, 190 insertions(+) create mode 100644 docs/superpowers/specs/2026-06-08-relay-phase2-metrics-design.md diff --git a/docs/superpowers/specs/2026-06-08-relay-phase2-metrics-design.md b/docs/superpowers/specs/2026-06-08-relay-phase2-metrics-design.md new file mode 100644 index 0000000..541272a --- /dev/null +++ b/docs/superpowers/specs/2026-06-08-relay-phase2-metrics-design.md @@ -0,0 +1,190 @@ +# Relay — Phase 2: Prometheus Metrics + +**Status:** Approved design · **Date:** 2026-06-08 +**Parent spec:** [`2026-06-07-relay-distributed-task-queue-design.md`](2026-06-07-relay-distributed-task-queue-design.md) +**Phase:** 2 (depth) — final sub-project (follows 2a delayed/backoff, 2b priority, 2c idempotency, 2d rate limiting) + +## Purpose + +Make the engine observable. The parent spec calls for "Prometheus counters/gauges (enqueued, +processed, retried, dead, in-flight, latency)." This slice instruments every job-state transition +the broker performs and exposes them in Prometheus exposition format, plus live queue-depth gauges +read straight from Redis at scrape time. It is the last remaining Phase 2 feature; completing it +closes Phase 2. + +## Scope + +In scope: + +- A `Metrics` interface in `broker` (consumer-side contract) with a no-op default, set via a new + `WithMetrics` option. Opt-in, additive, zero behavior change when unused. +- Inline counter/histogram recording at each broker state transition (enqueue, dedup, claim, ack, + retry, dead, reap, promote, latency). +- A pull-based `prometheus.Collector` in `internal/metrics` that reports per-queue depth gauges + (`ready`/`inflight`/`delayed`/`dlq`) by running `ZCARD`/`LLEN` at scrape time. +- A `--metrics-addr` flag on `cmd/worker` that, when set, serves `/metrics`. + +Out of scope: the Phase 3 API/dashboard server (`cmd/server`); a metrics endpoint on any process +other than `cmd/worker`; tracing; alerting rules; Grafana dashboards. Phase 2 has no other +remaining features — this slice completes it. + +## Key decisions + +| Decision | Choice | Rationale | +|---|---|---| +| Instrumentation wiring | **Injected `Metrics` interface, no-op default** | Keeps the broker free of a hard prometheus dependency, allows isolated assertion with a fake recorder, and makes metrics purely additive (existing tests/usage untouched). | +| Push vs pull | **Both: counters/histogram pushed inline; depth gauges pulled at scrape** | Counters are event-driven; queue depths are point-in-time facts that should never go stale, so they are read from Redis when Prometheus scrapes. | +| Metric library | **`github.com/prometheus/client_golang`** | Correct exposition/histogram/escaping and a clean custom-collector API. It is an instrumentation library, not a queue library — the "build the queue from scratch" rule is untouched. | +| Latency definition | **End-to-end: `now − created_at`, observed in `Ack`** | Captures total time in system (queue wait + processing). Derivable entirely from the job hash; no new field or claim-time plumbing. | +| Labels | **`queue` on everything; `state` additionally on the depth gauge** | Bounded cardinality (number of queues), and the dimensions an operator actually slices by. | +| Endpoint | **`cmd/worker --metrics-addr`, empty = off** | No metrics process exists until Phase 3; the worker is the natural host. Off by default keeps current behavior byte-identical and avoids binding a port in CI. | +| Registry | **Dedicated `prometheus.Registry` per recorder** | No global mutable state; matches the project's emphasis on isolated, testable units. | + +## Data model + +No new Redis keys and no new job-hash fields. Depth gauges are computed from the existing per-queue +keys at scrape time: + +| Gauge sample | Source | +|---|---| +| `relay_queue_depth{queue,state="ready"}` | `ZCARD q:{name}:ready` | +| `relay_queue_depth{queue,state="inflight"}` | `ZCARD q:{name}:inflight` | +| `relay_queue_depth{queue,state="delayed"}` | `ZCARD q:{name}:delayed` | +| `relay_queue_depth{queue,state="dlq"}` | `LLEN q:{name}:dlq` | + +## Components & changes + +### `internal/broker` + +- **`Metrics` interface** (new file `metrics.go`): + + ```go + type Metrics interface { + IncEnqueued(queue string) + IncDeduplicated(queue string) + IncClaimed(queue string) + IncProcessed(queue string) + IncRetried(queue string) + IncDead(queue string) + AddReaped(queue string, n int) + AddPromoted(queue string, n int) + ObserveLatency(queue string, d time.Duration) + } + ``` + +- **`noopMetrics`** — unexported zero-cost implementation; every method empty. `Broker` gains a + `metrics Metrics` field initialised to `noopMetrics{}` in `New`. +- **`WithMetrics(m Metrics) Option`** — overrides the recorder. A `nil` argument is ignored (keeps + the no-op) so callers can't accidentally install a nil and panic. +- **Instrumentation points** (no Lua changes; all signals already available in Go): + - `Enqueue`: on success → `IncEnqueued(j.Queue)`; on `ErrDuplicate` → `IncDeduplicated(j.Queue)` + (recorded before the sentinel is returned). + - `Claim`: when a job is returned (`ok == true`) → `IncClaimed(queue)`. Empty/rate-limited claims + record nothing. + - `Ack`: → `IncProcessed(j.Queue)` and `ObserveLatency(j.Queue, time.Since(j.CreatedAt))`. + - `Nack`: capture `nack.lua`'s existing `"retry"`/`"dead"` return (currently discarded via + `.Err()`) → `IncRetried` / `IncDead`. Still returns its current error contract. + - `Reap`: when `n > 0` → `AddReaped(queue, n)`. + - `Promote`: when `n > 0` → `AddPromoted(queue, n)`. + + Metrics are recorded only after the underlying Redis operation succeeds, so a failed op never + inflates a counter. + +### `internal/metrics` (new package) + +Depends only on `prometheus/client_golang` and `redis/go-redis` (for the collector) — **not** on +`broker`. It satisfies `broker.Metrics` structurally. + +- **`Recorder`** — implements `broker.Metrics` over a private `*prometheus.Registry`. Holds the + counter vecs and histogram vec (all `*prometheus.CounterVec` / `*prometheus.HistogramVec` with a + `queue` label). Constructor `NewRecorder() *Recorder` registers them on a fresh registry. + `(*Recorder).Registry() *prometheus.Registry` exposes it for the HTTP handler. +- Metric definitions (namespace `relay`): + + | Metric | Type | Labels | + |---|---|---| + | `relay_jobs_enqueued_total` | counter | `queue` | + | `relay_jobs_deduplicated_total` | counter | `queue` | + | `relay_jobs_claimed_total` | counter | `queue` | + | `relay_jobs_processed_total` | counter | `queue` | + | `relay_jobs_retried_total` | counter | `queue` | + | `relay_jobs_dead_total` | counter | `queue` | + | `relay_jobs_reaped_total` | counter | `queue` | + | `relay_jobs_promoted_total` | counter | `queue` | + | `relay_job_latency_seconds` | histogram | `queue` | + | `relay_queue_depth` | gauge (via collector) | `queue`, `state` | + + The latency histogram uses buckets spanning roughly 1ms→5min (end-to-end time is much wider than + prometheus' default 5ms→10s), e.g. `prometheus.ExponentialBuckets(0.001, 3, 13)`. + +- **`DepthCollector`** — implements `prometheus.Collector`. Constructed with the redis client and + the queue name(s) to watch (`NewDepthCollector(rdb, queues...)`). `Describe` emits the + `relay_queue_depth` descriptor; `Collect` runs `ZCARD`/`LLEN` per queue under a short + scrape-scoped `context.Context` (a few seconds) and emits one gauge sample per (queue, state). On + a Redis error for a given query it skips that sample rather than emitting a stale or zero value. + Registered on the `Recorder`'s registry by the caller. + +### `cmd/worker` + +- New `--metrics-addr` flag (string, default `""`). +- When empty: unchanged — broker built without `WithMetrics`, no HTTP server. +- When set: build a `metrics.NewRecorder()`, pass `broker.WithMetrics(rec)`, register a + `metrics.NewDepthCollector(rdb, *queue)` on `rec.Registry()`, and start an `http.Server` serving + `promhttp.HandlerFor(rec.Registry(), promhttp.HandlerOpts{})` at `/metrics`. The server is shut + down (graceful `Shutdown`) alongside the worker pool / reaper / promoter on SIGINT/SIGTERM. + +## Testing + +Real Redis where Redis is needed; skip (not fail) when unreachable. To keep `go test ./...` +parallel-safe, the metrics package uses **Redis DB 13** (broker tests use DB 15, worker tests DB +14, so no `FlushDB` collisions). + +### `internal/broker` + +A fake `Metrics` recorder (records calls/counts in memory) injected via `WithMetrics`, asserted +against real Redis ops: + +- Enqueue (no key) → exactly one `IncEnqueued`, queue correct. +- Enqueue duplicate (same idempotency key within TTL) → one `IncEnqueued` then one + `IncDeduplicated`; ready ZCard reflects only the first. +- Claim of a ready job → one `IncClaimed`; claim of an empty queue → no call. +- Ack → one `IncProcessed` and one `ObserveLatency` with a non-negative duration. +- Nack with retries left → one `IncRetried`, none `IncDead`; Nack with budget spent → one + `IncDead`, none `IncRetried`. +- Reap of N expired → one `AddReaped(queue, N)`; Promote of N due → one `AddPromoted(queue, N)`. +- Default broker (no `WithMetrics`) and `WithMetrics(nil)` → no panic across a full + enqueue/claim/ack cycle (noop path). + +### `internal/metrics` + +- Compile-time assertion `var _ broker.Metrics = (*Recorder)(nil)`. +- Using `prometheus/testutil`: after recording, assert counter/histogram sample values + (`testutil.ToFloat64`, `CollectAndCount`) for representative metrics and the `queue` label. +- `DepthCollector` against real Redis (DB 13): seed known members into `ready`/`inflight`/`delayed` + and items into `dlq`, then assert `relay_queue_depth{state=…}` matches via + `testutil.CollectAndCompare` / `GatherAndCompare`. +- Empty/unknown queue → depth samples are `0` (or absent on Redis error); no panic. + +### `cmd/worker` + +Build/vet only (consistent with prior slices); no integration test for the HTTP server. + +## Known limitations + +- **Label cardinality is per queue.** One time series per metric per queue. Fine for the handful of + queues this project targets; a deployment with unbounded queue names would need care. +- **Depth gauges cost a scrape-time round-trip.** Each scrape issues one Redis call per (queue, + state). Cheap at this scale; a very high scrape frequency against many queues would add load. +- **Metrics are per-process.** Each worker process exposes its own counters; cluster-wide totals + come from Prometheus aggregating across scraped targets, as usual. Depth gauges read shared Redis, + so every worker reports the same depths — operators should aggregate with `max`/`avg`, not `sum`. +- **Endpoint only on `cmd/worker`.** The standalone API/dashboard server is Phase 3. + +## Invariants preserved + +- At-least-once delivery — metrics are read-only observation; no job movement changes. +- The atomic claim is sacred — no Lua script is modified; instrumentation is Go-side, after the + atomic op succeeds. +- Crash safety via the reaper is untouched. +- Build the queue from scratch on Redis primitives — `prometheus/client_golang` instruments; it + does not implement any queue mechanics. From 2f6d8efd4a7d169c3678bf3ad9100884dbcc5d98 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 13:23:08 +0530 Subject: [PATCH 02/19] Add Phase 2 metrics implementation plan --- .../plans/2026-06-08-relay-phase2-metrics.md | 1269 +++++++++++++++++ 1 file changed, 1269 insertions(+) create mode 100644 docs/superpowers/plans/2026-06-08-relay-phase2-metrics.md diff --git a/docs/superpowers/plans/2026-06-08-relay-phase2-metrics.md b/docs/superpowers/plans/2026-06-08-relay-phase2-metrics.md new file mode 100644 index 0000000..3bd458e --- /dev/null +++ b/docs/superpowers/plans/2026-06-08-relay-phase2-metrics.md @@ -0,0 +1,1269 @@ +# Phase 2 Metrics Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Make Relay observable by instrumenting every broker job-state transition with Prometheus counters/histogram and exposing live per-queue depth gauges at a `/metrics` endpoint on `cmd/worker`. + +**Architecture:** A `Metrics` interface in `broker` with a no-op default (opt-in via `WithMetrics`) keeps the broker free of a hard prometheus dependency and makes instrumentation purely additive. Counters/histogram are pushed inline after each Redis op succeeds; per-queue depth gauges are pulled at scrape time by a `prometheus.Collector` running `ZCARD`/`LLEN`. The concrete recorder + collector live in a new `internal/metrics` package that satisfies `broker.Metrics` structurally (no import of `broker`). + +**Tech Stack:** Go, `github.com/redis/go-redis/v9`, `github.com/prometheus/client_golang` (new dependency), real-Redis integration tests. + +**Spec:** [`docs/superpowers/specs/2026-06-08-relay-phase2-metrics-design.md`](../specs/2026-06-08-relay-phase2-metrics-design.md) + +--- + +## File Structure + +- **Create `internal/broker/metrics.go`** — `Metrics` interface, `noopMetrics` default, `WithMetrics` option. Responsibility: the consumer-side instrumentation contract and its no-op. +- **Modify `internal/broker/broker.go`** — add `metrics Metrics` field to `Broker`, initialise to `noopMetrics{}` in `New`, and record at each transition (`Enqueue`, `Claim`, `Ack`, `Nack`, `Reap`, `Promote`). +- **Create `internal/broker/metrics_test.go`** (`package broker`) — internal unit tests for option wiring / noop default. +- **Create `internal/broker/instrumentation_test.go`** (`package broker_test`) — a fake recorder + behavior tests that each transition records the right call against real Redis. +- **Modify `internal/broker/broker_test.go`** — refactor `newTestBroker` to delegate to a new `newTestBrokerWith(t, opts...)` so instrumentation tests can inject `WithMetrics`. +- **Create `internal/metrics/recorder.go`** — `Recorder` (prometheus counters + histogram over a private registry) implementing `broker.Metrics`. +- **Create `internal/metrics/depth.go`** — `DepthCollector` (`prometheus.Collector`) reading `ZCARD`/`LLEN` at scrape time. +- **Create `internal/metrics/recorder_test.go`** — compile assertion + `testutil` value assertions. +- **Create `internal/metrics/depth_test.go`** — `DepthCollector` against real Redis (DB 13). +- **Modify `cmd/worker/main.go`** — `--metrics-addr` flag; when set, build recorder, wire `WithMetrics`, register depth collector, serve `/metrics`, shut down gracefully. +- **Modify `CLAUDE.md`** — flip Phase 2 to complete, mark `internal/metrics` ✅, document `WithMetrics` + the new dependency + known limitations. +- **Modify `go.mod` / `go.sum`** — add `prometheus/client_golang` (happens automatically on first import + `go mod tidy`). + +--- + +## Task 1: `Metrics` interface, noop default, and `WithMetrics` option + +**Files:** +- Create: `internal/broker/metrics.go` +- Create: `internal/broker/metrics_test.go` (`package broker`) +- Modify: `internal/broker/broker.go` (the `Broker` struct + `New`) + +- [ ] **Step 1: Write the failing test** + +Create `internal/broker/metrics_test.go`: + +```go +package broker + +import "testing" + +func TestNewDefaultsToNoopMetrics(t *testing.T) { + b := New(nil) + if b.metrics == nil { + t.Fatal("New: metrics field is nil, want noopMetrics default") + } + if _, ok := b.metrics.(noopMetrics); !ok { + t.Fatalf("New: metrics = %T, want noopMetrics", b.metrics) + } +} + +func TestWithMetricsInstallsRecorder(t *testing.T) { + rec := noopMetrics{} // any Metrics value; identity is what we check + var custom Metrics = rec + b := New(nil, WithMetrics(custom)) + if b.metrics == nil { + t.Fatal("WithMetrics: metrics is nil") + } +} + +func TestWithMetricsNilIsIgnored(t *testing.T) { + b := New(nil, WithMetrics(nil)) + if _, ok := b.metrics.(noopMetrics); !ok { + t.Fatalf("WithMetrics(nil): metrics = %T, want noopMetrics retained", b.metrics) + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/broker/ -run 'TestNewDefaultsToNoopMetrics|TestWithMetrics' -v` +Expected: FAIL — compile error, `b.metrics` undefined / `noopMetrics`/`WithMetrics`/`Metrics` undefined. + +- [ ] **Step 3: Write minimal implementation** + +Create `internal/broker/metrics.go`: + +```go +package broker + +import "time" + +// Metrics receives a callback for every job-state transition the broker makes. +// It is the broker's consumer-side instrumentation contract: the broker depends +// on this small interface, not on any metrics library, so a Prometheus recorder +// (internal/metrics) — or a test fake — can be plugged in via WithMetrics. The +// default is noopMetrics, so a broker built without WithMetrics records nothing +// and behaves exactly as before. +// +// Every method takes the queue name so the implementation can label its series +// per queue. Reap/Promote add a batch count because one call moves many jobs; +// the rest are single events. ObserveLatency reports a job's end-to-end time in +// the system (enqueue -> ack). +type Metrics interface { + IncEnqueued(queue string) + IncDeduplicated(queue string) + IncClaimed(queue string) + IncProcessed(queue string) + IncRetried(queue string) + IncDead(queue string) + AddReaped(queue string, n int) + AddPromoted(queue string, n int) + ObserveLatency(queue string, d time.Duration) +} + +// noopMetrics is the default recorder: every method does nothing. It lets the +// broker call b.metrics unconditionally without nil checks, and keeps metrics +// entirely opt-in. +type noopMetrics struct{} + +func (noopMetrics) IncEnqueued(string) {} +func (noopMetrics) IncDeduplicated(string) {} +func (noopMetrics) IncClaimed(string) {} +func (noopMetrics) IncProcessed(string) {} +func (noopMetrics) IncRetried(string) {} +func (noopMetrics) IncDead(string) {} +func (noopMetrics) AddReaped(string, int) {} +func (noopMetrics) AddPromoted(string, int) {} +func (noopMetrics) ObserveLatency(string, time.Duration) {} + +// WithMetrics installs a Metrics recorder. A nil recorder is ignored so callers +// cannot accidentally replace the safe no-op with something that panics. +func WithMetrics(m Metrics) Option { + return func(b *Broker) { + if m != nil { + b.metrics = m + } + } +} +``` + +In `internal/broker/broker.go`, add the field to the `Broker` struct (alongside the existing fields): + +```go + metrics Metrics +``` + +And in `New`, set the default BEFORE applying options (so `WithMetrics` can override it). The current `New` builds the struct then ranges over opts; ensure the literal includes `metrics: noopMetrics{}`: + +```go + b := &Broker{ + rdb: rdb, + // ... existing field initialisers unchanged ... + metrics: noopMetrics{}, + } + for _, opt := range opts { + opt(b) + } +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `go test ./internal/broker/ -run 'TestNewDefaultsToNoopMetrics|TestWithMetrics' -v` +Expected: PASS (3 tests). + +- [ ] **Step 5: Commit** + +```bash +git add internal/broker/metrics.go internal/broker/metrics_test.go internal/broker/broker.go +git commit -m "Add broker Metrics interface, noop default, and WithMetrics option" +``` + +--- + +## Task 2: Fake recorder + Enqueue instrumentation + +**Files:** +- Modify: `internal/broker/broker_test.go` (refactor `newTestBroker`) +- Create: `internal/broker/instrumentation_test.go` (`package broker_test`) +- Modify: `internal/broker/broker.go` (`Enqueue`) + +- [ ] **Step 1: Refactor the test harness to accept options** + +In `internal/broker/broker_test.go`, replace the `newTestBroker` body so it delegates to a new variadic helper (keep `newTestBroker` working for all existing callers): + +```go +func newTestBroker(t *testing.T) (*broker.Broker, *redis.Client) { + t.Helper() + return newTestBrokerWith(t) +} + +// newTestBrokerWith is newTestBroker with broker options, for tests that need to +// inject a recorder or other configuration. +func newTestBrokerWith(t *testing.T, opts ...broker.Option) (*broker.Broker, *redis.Client) { + t.Helper() + addr := os.Getenv("REDIS_ADDR") + if addr == "" { + addr = "localhost:6379" + } + rdb := redis.NewClient(&redis.Options{Addr: addr, DB: testRedisDB}) + ctx := context.Background() + if err := rdb.Ping(ctx).Err(); err != nil { + t.Skipf("redis not available at %s: %v", addr, err) + } + if err := rdb.FlushDB(ctx).Err(); err != nil { + t.Fatalf("flushdb: %v", err) + } + t.Cleanup(func() { _ = rdb.Close() }) + return broker.New(rdb, opts...), rdb +} +``` + +- [ ] **Step 2: Write the failing test (fake recorder + enqueue)** + +Create `internal/broker/instrumentation_test.go`: + +```go +package broker_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/StrangeNoob/relay/internal/broker" + "github.com/StrangeNoob/relay/internal/job" +) + +// fakeMetrics is a Metrics recorder that counts calls per queue in memory, so a +// test can assert exactly which transition the broker recorded. It is safe for +// concurrent use because the broker may record from multiple goroutines. +type fakeMetrics struct { + mu sync.Mutex + enqueued map[string]int + deduped map[string]int + claimed map[string]int + processed map[string]int + retried map[string]int + dead map[string]int + reaped map[string]int + promoted map[string]int + latencies []time.Duration +} + +func newFakeMetrics() *fakeMetrics { + return &fakeMetrics{ + enqueued: map[string]int{}, + deduped: map[string]int{}, + claimed: map[string]int{}, + processed: map[string]int{}, + retried: map[string]int{}, + dead: map[string]int{}, + reaped: map[string]int{}, + promoted: map[string]int{}, + } +} + +func (f *fakeMetrics) IncEnqueued(q string) { f.bump(f.enqueued, q) } +func (f *fakeMetrics) IncDeduplicated(q string) { f.bump(f.deduped, q) } +func (f *fakeMetrics) IncClaimed(q string) { f.bump(f.claimed, q) } +func (f *fakeMetrics) IncProcessed(q string) { f.bump(f.processed, q) } +func (f *fakeMetrics) IncRetried(q string) { f.bump(f.retried, q) } +func (f *fakeMetrics) IncDead(q string) { f.bump(f.dead, q) } +func (f *fakeMetrics) AddReaped(q string, n int) { f.add(f.reaped, q, n) } +func (f *fakeMetrics) AddPromoted(q string, n int) { f.add(f.promoted, q, n) } +func (f *fakeMetrics) ObserveLatency(q string, d time.Duration) { + f.mu.Lock() + defer f.mu.Unlock() + f.latencies = append(f.latencies, d) +} + +func (f *fakeMetrics) bump(m map[string]int, q string) { f.add(m, q, 1) } +func (f *fakeMetrics) add(m map[string]int, q string, n int) { + f.mu.Lock() + defer f.mu.Unlock() + m[q] += n +} + +func (f *fakeMetrics) get(m map[string]int, q string) int { + f.mu.Lock() + defer f.mu.Unlock() + return m[q] +} + +func TestEnqueueRecordsEnqueued(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + if err := b.Enqueue(ctx, job.New("emails", []byte("x"))); err != nil { + t.Fatalf("Enqueue: %v", err) + } + if got := fm.get(fm.enqueued, "emails"); got != 1 { + t.Errorf("enqueued[emails] = %d, want 1", got) + } + if got := fm.get(fm.deduped, "emails"); got != 0 { + t.Errorf("deduped[emails] = %d, want 0", got) + } +} + +func TestEnqueueDuplicateRecordsDeduplicated(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + j1 := job.New("emails", []byte("a")) + if err := b.Enqueue(ctx, j1, broker.WithIdempotencyKey("k1")); err != nil { + t.Fatalf("first Enqueue: %v", err) + } + j2 := job.New("emails", []byte("b")) + if err := b.Enqueue(ctx, j2, broker.WithIdempotencyKey("k1")); err != broker.ErrDuplicate { + t.Fatalf("second Enqueue err = %v, want ErrDuplicate", err) + } + if got := fm.get(fm.enqueued, "emails"); got != 1 { + t.Errorf("enqueued[emails] = %d, want 1", got) + } + if got := fm.get(fm.deduped, "emails"); got != 1 { + t.Errorf("deduped[emails] = %d, want 1", got) + } +} +``` + +- [ ] **Step 3: Run test to verify it fails** + +Run: `go test ./internal/broker/ -run 'TestEnqueueRecords|TestEnqueueDuplicateRecords' -v` +Expected: FAIL — `enqueued[emails] = 0, want 1` (instrumentation not added yet). Requires Redis; if skipped, note RED cannot be shown without Redis and proceed only once Redis is available. + +- [ ] **Step 4: Add Enqueue instrumentation** + +In `internal/broker/broker.go`, in `Enqueue`, record after the script succeeds (replace the tail `if res == "dup" { return ErrDuplicate }; return nil`): + +```go + if res == "dup" { + b.metrics.IncDeduplicated(j.Queue) + return ErrDuplicate + } + b.metrics.IncEnqueued(j.Queue) + return nil +``` + +- [ ] **Step 5: Run test to verify it passes** + +Run: `go test ./internal/broker/ -run 'TestEnqueueRecords|TestEnqueueDuplicateRecords' -v` +Expected: PASS (2 tests). + +- [ ] **Step 6: Commit** + +```bash +git add internal/broker/broker_test.go internal/broker/instrumentation_test.go internal/broker/broker.go +git commit -m "Instrument Enqueue with enqueued/deduplicated metrics" +``` + +--- + +## Task 3: Claim instrumentation + +**Files:** +- Modify: `internal/broker/broker.go` (`Claim`) +- Modify: `internal/broker/instrumentation_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `internal/broker/instrumentation_test.go`: + +```go +func TestClaimRecordsClaimed(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + if err := b.Enqueue(ctx, job.New("emails", []byte("x"))); err != nil { + t.Fatalf("Enqueue: %v", err) + } + if _, ok, err := b.Claim(ctx, "emails", time.Minute); err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v, want true/nil", ok, err) + } + if got := fm.get(fm.claimed, "emails"); got != 1 { + t.Errorf("claimed[emails] = %d, want 1", got) + } +} + +func TestClaimEmptyQueueRecordsNothing(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + if _, ok, err := b.Claim(ctx, "emails", time.Minute); err != nil || ok { + t.Fatalf("Claim on empty: ok=%v err=%v, want false/nil", ok, err) + } + if got := fm.get(fm.claimed, "emails"); got != 0 { + t.Errorf("claimed[emails] = %d, want 0", got) + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/broker/ -run 'TestClaimRecordsClaimed|TestClaimEmptyQueueRecordsNothing' -v` +Expected: FAIL — `claimed[emails] = 0, want 1`. + +- [ ] **Step 3: Add Claim instrumentation** + +In `Claim`, record only on a successful pop. Find the point where the script returned a job and the function is about to return `(j, true, nil)`; add `b.metrics.IncClaimed(queue)` immediately before that successful return. The empty-queue path (returns `ok == false`) must record nothing. (The exact return statement is the one returning the decoded job with `true`; do not touch the `redis.Nil` / empty branch.) + +- [ ] **Step 4: Run test to verify it passes** + +Run: `go test ./internal/broker/ -run 'TestClaimRecordsClaimed|TestClaimEmptyQueueRecordsNothing' -v` +Expected: PASS (2 tests). + +- [ ] **Step 5: Commit** + +```bash +git add internal/broker/broker.go internal/broker/instrumentation_test.go +git commit -m "Instrument Claim with claimed metric" +``` + +--- + +## Task 4: Ack instrumentation (processed + latency) + +**Files:** +- Modify: `internal/broker/broker.go` (`Ack`) +- Modify: `internal/broker/instrumentation_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `internal/broker/instrumentation_test.go`: + +```go +func TestAckRecordsProcessedAndLatency(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + if err := b.Enqueue(ctx, job.New("emails", []byte("x"))); err != nil { + t.Fatalf("Enqueue: %v", err) + } + j, ok, err := b.Claim(ctx, "emails", time.Minute) + if err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v", ok, err) + } + if err := b.Ack(ctx, j); err != nil { + t.Fatalf("Ack: %v", err) + } + if got := fm.get(fm.processed, "emails"); got != 1 { + t.Errorf("processed[emails] = %d, want 1", got) + } + fm.mu.Lock() + n := len(fm.latencies) + var d time.Duration + if n == 1 { + d = fm.latencies[0] + } + fm.mu.Unlock() + if n != 1 { + t.Fatalf("latencies len = %d, want 1", n) + } + if d < 0 { + t.Errorf("latency = %v, want non-negative", d) + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/broker/ -run TestAckRecordsProcessedAndLatency -v` +Expected: FAIL — `processed[emails] = 0, want 1`. + +- [ ] **Step 3: Add Ack instrumentation** + +In `Ack`, after the script `Run(...).Err()` check succeeds and before `return nil`: + +```go + b.metrics.IncProcessed(j.Queue) + b.metrics.ObserveLatency(j.Queue, time.Since(j.CreatedAt)) + return nil +``` + +(`job.Job` has `CreatedAt time.Time`; `time.Since` yields enqueue→ack elapsed. `time` is already imported in broker.go.) + +- [ ] **Step 4: Run test to verify it passes** + +Run: `go test ./internal/broker/ -run TestAckRecordsProcessedAndLatency -v` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add internal/broker/broker.go internal/broker/instrumentation_test.go +git commit -m "Instrument Ack with processed counter and end-to-end latency" +``` + +--- + +## Task 5: Nack instrumentation (retried vs dead) + +**Files:** +- Modify: `internal/broker/broker.go` (`Nack`) +- Modify: `internal/broker/instrumentation_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `internal/broker/instrumentation_test.go`: + +```go +// nackTestJob enqueues, claims, and returns a job set up so the next Nack takes +// the requested branch. maxRetries controls retry-vs-dead: with maxRetries=5 the +// first nack retries; with maxRetries=0 the first nack dead-letters. +func nackTestJob(t *testing.T, b *broker.Broker, ctx context.Context, maxRetries int) job.Job { + t.Helper() + j := job.New("emails", []byte("x")) + j.MaxRetries = maxRetries + if err := b.Enqueue(ctx, j); err != nil { + t.Fatalf("Enqueue: %v", err) + } + claimed, ok, err := b.Claim(ctx, "emails", time.Minute) + if err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v", ok, err) + } + return claimed +} + +func TestNackWithRetriesLeftRecordsRetried(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + j := nackTestJob(t, b, ctx, 5) // attempts now 1 < 5 -> retry + if err := b.Nack(ctx, j); err != nil { + t.Fatalf("Nack: %v", err) + } + if got := fm.get(fm.retried, "emails"); got != 1 { + t.Errorf("retried[emails] = %d, want 1", got) + } + if got := fm.get(fm.dead, "emails"); got != 0 { + t.Errorf("dead[emails] = %d, want 0", got) + } +} + +func TestNackWithBudgetSpentRecordsDead(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + j := nackTestJob(t, b, ctx, 0) // attempts now 1, max 0 -> dead + if err := b.Nack(ctx, j); err != nil { + t.Fatalf("Nack: %v", err) + } + if got := fm.get(fm.dead, "emails"); got != 1 { + t.Errorf("dead[emails] = %d, want 1", got) + } + if got := fm.get(fm.retried, "emails"); got != 0 { + t.Errorf("retried[emails] = %d, want 0", got) + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/broker/ -run 'TestNackWithRetriesLeftRecordsRetried|TestNackWithBudgetSpentRecordsDead' -v` +Expected: FAIL — `retried[emails] = 0, want 1` (and dead = 0). + +- [ ] **Step 3: Add Nack instrumentation** + +In `Nack`, the script already returns `"retry"`/`"dead"` but the code currently discards it with `.Err()`. Capture it with `.Text()` and branch. Replace the script-run block: + +```go + outcome, err := nackScript.Run(ctx, b.rdb, + []string{inflightKey(j.Queue), delayedKey(j.Queue), dlqKey(j.Queue)}, + j.ID, jobKeyPrefix, readyAt, + ).Text() + if err != nil { + return fmt.Errorf("broker: nacking job %s: %w", j.ID, err) + } + switch outcome { + case "retry": + b.metrics.IncRetried(j.Queue) + case "dead": + b.metrics.IncDead(j.Queue) + } + return nil +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `go test ./internal/broker/ -run 'TestNackWithRetriesLeftRecordsRetried|TestNackWithBudgetSpentRecordsDead' -v` +Expected: PASS (2 tests). + +- [ ] **Step 5: Commit** + +```bash +git add internal/broker/broker.go internal/broker/instrumentation_test.go +git commit -m "Instrument Nack with retried/dead metrics from script outcome" +``` + +--- + +## Task 6: Reap + Promote instrumentation + +**Files:** +- Modify: `internal/broker/broker.go` (`Reap`, `Promote`) +- Modify: `internal/broker/instrumentation_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `internal/broker/instrumentation_test.go`: + +```go +func TestReapRecordsReapedCount(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + // Enqueue + claim two jobs with a tiny visibility so they expire immediately. + for i := 0; i < 2; i++ { + if err := b.Enqueue(ctx, job.New("emails", []byte("x"))); err != nil { + t.Fatalf("Enqueue: %v", err) + } + } + for i := 0; i < 2; i++ { + if _, ok, err := b.Claim(ctx, "emails", time.Millisecond); err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v", ok, err) + } + } + time.Sleep(10 * time.Millisecond) // let the visibility deadline pass + + n, err := b.Reap(ctx, "emails") + if err != nil { + t.Fatalf("Reap: %v", err) + } + if n != 2 { + t.Fatalf("Reap returned %d, want 2", n) + } + if got := fm.get(fm.reaped, "emails"); got != 2 { + t.Errorf("reaped[emails] = %d, want 2", got) + } +} + +func TestPromoteRecordsPromotedCount(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + // Enqueue two delayed jobs whose ready-at is already in the past. + past := time.Now().Add(-time.Second) + for i := 0; i < 2; i++ { + if err := b.Enqueue(ctx, job.New("emails", []byte("x")), broker.WithReadyAt(past)); err != nil { + t.Fatalf("Enqueue delayed: %v", err) + } + } + + n, err := b.Promote(ctx, "emails") + if err != nil { + t.Fatalf("Promote: %v", err) + } + if n != 2 { + t.Fatalf("Promote returned %d, want 2", n) + } + if got := fm.get(fm.promoted, "emails"); got != 2 { + t.Errorf("promoted[emails] = %d, want 2", got) + } +} +``` + +Note on `WithReadyAt(past)`: `Enqueue` routes to the delayed set only when `cfg.readyAt.After(now)`. A past time would route straight to ready and Promote would find nothing. If `WithReadyAt` with a past time does NOT create a delayed job in this codebase, instead enqueue with a near-future ready-at and sleep past it: + +```go + soon := time.Now().Add(20 * time.Millisecond) + for i := 0; i < 2; i++ { + if err := b.Enqueue(ctx, job.New("emails", []byte("x")), broker.WithReadyAt(soon)); err != nil { + t.Fatalf("Enqueue delayed: %v", err) + } + } + time.Sleep(40 * time.Millisecond) +``` + +Use whichever form makes the job land in the delayed set; verify by asserting `n == 2` (the test fails loudly if the jobs were not delayed). + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/broker/ -run 'TestReapRecordsReapedCount|TestPromoteRecordsPromotedCount' -v` +Expected: FAIL — `reaped[emails] = 0, want 2` / `promoted[emails] = 0, want 2`. + +- [ ] **Step 3: Add Reap + Promote instrumentation** + +In `Reap`, after a successful run replace `return n, nil` with: + +```go + if n > 0 { + b.metrics.AddReaped(queue, n) + } + return n, nil +``` + +In `Promote`, after a successful run replace `return n, nil` with: + +```go + if n > 0 { + b.metrics.AddPromoted(queue, n) + } + return n, nil +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `go test ./internal/broker/ -run 'TestReapRecordsReapedCount|TestPromoteRecordsPromotedCount' -v` +Expected: PASS (2 tests). + +- [ ] **Step 5: Full broker suite under race** + +Run: `go test -race ./internal/broker/` +Expected: PASS (all existing + new tests; confirms instrumentation didn't disturb behavior). + +- [ ] **Step 6: Commit** + +```bash +git add internal/broker/broker.go internal/broker/instrumentation_test.go +git commit -m "Instrument Reap and Promote with batch count metrics" +``` + +--- + +## Task 7: `internal/metrics` Recorder (Prometheus) + +**Files:** +- Create: `internal/metrics/recorder.go` +- Create: `internal/metrics/recorder_test.go` +- Modify: `go.mod`, `go.sum` (via `go mod tidy`) + +- [ ] **Step 1: Write the failing test** + +Create `internal/metrics/recorder_test.go`: + +```go +package metrics_test + +import ( + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus/testutil" + + "github.com/StrangeNoob/relay/internal/broker" + "github.com/StrangeNoob/relay/internal/metrics" +) + +// Compile-time proof the Recorder satisfies the broker's instrumentation contract. +var _ broker.Metrics = (*metrics.Recorder)(nil) + +func TestRecorderCountersIncrement(t *testing.T) { + rec := metrics.NewRecorder() + + rec.IncEnqueued("emails") + rec.IncEnqueued("emails") + rec.IncDeduplicated("emails") + rec.IncClaimed("emails") + rec.IncProcessed("emails") + rec.IncRetried("emails") + rec.IncDead("emails") + rec.AddReaped("emails", 3) + rec.AddPromoted("emails", 4) + + checks := []struct { + name string + metric string + want float64 + }{ + {"enqueued", "relay_jobs_enqueued_total", 2}, + {"deduped", "relay_jobs_deduplicated_total", 1}, + {"claimed", "relay_jobs_claimed_total", 1}, + {"processed", "relay_jobs_processed_total", 1}, + {"retried", "relay_jobs_retried_total", 1}, + {"dead", "relay_jobs_dead_total", 1}, + {"reaped", "relay_jobs_reaped_total", 3}, + {"promoted", "relay_jobs_promoted_total", 4}, + } + for _, c := range checks { + if got := testutil.ToFloat64(rec.CounterForTest(c.name, "emails")); got != c.want { + t.Errorf("%s{queue=emails} = %v, want %v", c.metric, got, c.want) + } + } +} + +func TestRecorderObservesLatency(t *testing.T) { + rec := metrics.NewRecorder() + rec.ObserveLatency("emails", 250*time.Millisecond) + + // One observation must be recorded in the histogram for queue=emails. + got := testutil.CollectAndCount(rec.Registry(), "relay_job_latency_seconds") + if got == 0 { + t.Fatal("relay_job_latency_seconds: no series collected after one observation") + } +} +``` + +Helper `CounterForTest` is a small test-only accessor (defined in the impl so the test can fetch the right `prometheus.Counter` child). It returns the per-queue child counter for the named metric. + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/metrics/ -run 'TestRecorder' -v` +Expected: FAIL — package `internal/metrics` / `prometheus/client_golang` not found. + +- [ ] **Step 3: Implement the Recorder** + +Create `internal/metrics/recorder.go`: + +```go +// Package metrics provides the Prometheus implementation of the broker's +// instrumentation contract (broker.Metrics) plus a pull-based collector for +// per-queue depth gauges. It deliberately does not import internal/broker: it +// satisfies broker.Metrics structurally, keeping the dependency arrow one-way. +package metrics + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +const namespace = "relay" + +// Recorder implements broker.Metrics over a private Prometheus registry. Build +// it with NewRecorder, install it with broker.WithMetrics(rec), and serve +// rec.Registry() over HTTP. Counters and the latency histogram are labelled by +// queue. +type Recorder struct { + reg *prometheus.Registry + + enqueued *prometheus.CounterVec + deduped *prometheus.CounterVec + claimed *prometheus.CounterVec + processed *prometheus.CounterVec + retried *prometheus.CounterVec + dead *prometheus.CounterVec + reaped *prometheus.CounterVec + promoted *prometheus.CounterVec + latency *prometheus.HistogramVec +} + +// NewRecorder builds a Recorder with all metrics registered on a fresh registry. +func NewRecorder() *Recorder { + reg := prometheus.NewRegistry() + + counter := func(name, help string) *prometheus.CounterVec { + c := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, Name: name, Help: help, + }, []string{"queue"}) + reg.MustRegister(c) + return c + } + + r := &Recorder{ + reg: reg, + enqueued: counter("jobs_enqueued_total", "Jobs accepted into a queue."), + deduped: counter("jobs_deduplicated_total", "Enqueues dropped as idempotency-key duplicates."), + claimed: counter("jobs_claimed_total", "Jobs claimed by a worker."), + processed: counter("jobs_processed_total", "Jobs acked after successful processing."), + retried: counter("jobs_retried_total", "Failed jobs requeued for retry."), + dead: counter("jobs_dead_total", "Jobs moved to the dead-letter queue."), + reaped: counter("jobs_reaped_total", "Expired in-flight jobs requeued by the reaper."), + promoted: counter("jobs_promoted_total", "Delayed jobs promoted to ready."), + } + r.latency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Name: "job_latency_seconds", + Help: "End-to-end time from enqueue to ack.", + // End-to-end latency spans ms to minutes, wider than the default + // 5ms-10s buckets: ~1ms -> ~9min across 13 buckets. + Buckets: prometheus.ExponentialBuckets(0.001, 3, 13), + }, []string{"queue"}) + reg.MustRegister(r.latency) + return r +} + +// Registry exposes the underlying registry for an HTTP handler and for +// registering additional collectors (e.g. DepthCollector). +func (r *Recorder) Registry() *prometheus.Registry { return r.reg } + +func (r *Recorder) IncEnqueued(q string) { r.enqueued.WithLabelValues(q).Inc() } +func (r *Recorder) IncDeduplicated(q string) { r.deduped.WithLabelValues(q).Inc() } +func (r *Recorder) IncClaimed(q string) { r.claimed.WithLabelValues(q).Inc() } +func (r *Recorder) IncProcessed(q string) { r.processed.WithLabelValues(q).Inc() } +func (r *Recorder) IncRetried(q string) { r.retried.WithLabelValues(q).Inc() } +func (r *Recorder) IncDead(q string) { r.dead.WithLabelValues(q).Inc() } +func (r *Recorder) AddReaped(q string, n int) { + r.reaped.WithLabelValues(q).Add(float64(n)) +} +func (r *Recorder) AddPromoted(q string, n int) { + r.promoted.WithLabelValues(q).Add(float64(n)) +} +func (r *Recorder) ObserveLatency(q string, d time.Duration) { + r.latency.WithLabelValues(q).Observe(d.Seconds()) +} + +// CounterForTest returns the per-queue child counter for the named metric. It +// exists so tests can read a specific series; "name" is the short key +// (enqueued, deduped, claimed, processed, retried, dead, reaped, promoted). +func (r *Recorder) CounterForTest(name, queue string) prometheus.Counter { + var v *prometheus.CounterVec + switch name { + case "enqueued": + v = r.enqueued + case "deduped": + v = r.deduped + case "claimed": + v = r.claimed + case "processed": + v = r.processed + case "retried": + v = r.retried + case "dead": + v = r.dead + case "reaped": + v = r.reaped + case "promoted": + v = r.promoted + default: + return nil + } + return v.WithLabelValues(queue) +} +``` + +- [ ] **Step 4: Tidy modules and run the test** + +Run: +```bash +go mod tidy +go test ./internal/metrics/ -run 'TestRecorder' -v +``` +Expected: `go mod tidy` adds `github.com/prometheus/client_golang` (and transitive deps) to go.mod/go.sum; tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add internal/metrics/recorder.go internal/metrics/recorder_test.go go.mod go.sum +git commit -m "Add internal/metrics Recorder implementing broker.Metrics over Prometheus" +``` + +--- + +## Task 8: `internal/metrics` DepthCollector + +**Files:** +- Create: `internal/metrics/depth.go` +- Create: `internal/metrics/depth_test.go` + +- [ ] **Step 1: Write the failing test** + +Create `internal/metrics/depth_test.go`: + +```go +package metrics_test + +import ( + "context" + "os" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/redis/go-redis/v9" + + "github.com/StrangeNoob/relay/internal/metrics" +) + +// metricsTestRedisDB is this package's dedicated Redis DB. broker tests use 15, +// worker tests 14; metrics claims 13 so parallel `go test ./...` never collides. +const metricsTestRedisDB = 13 + +func newTestRedis(t *testing.T) *redis.Client { + t.Helper() + addr := os.Getenv("REDIS_ADDR") + if addr == "" { + addr = "localhost:6379" + } + rdb := redis.NewClient(&redis.Options{Addr: addr, DB: metricsTestRedisDB}) + ctx := context.Background() + if err := rdb.Ping(ctx).Err(); err != nil { + t.Skipf("redis not available at %s: %v", addr, err) + } + if err := rdb.FlushDB(ctx).Err(); err != nil { + t.Fatalf("flushdb: %v", err) + } + t.Cleanup(func() { _ = rdb.Close() }) + return rdb +} + +func TestDepthCollectorReportsDepths(t *testing.T) { + rdb := newTestRedis(t) + ctx := context.Background() + + // Seed known cardinalities into the per-queue keys the collector reads. + rdb.ZAdd(ctx, "q:emails:ready", redis.Z{Score: 1, Member: "a"}, redis.Z{Score: 2, Member: "b"}) + rdb.ZAdd(ctx, "q:emails:inflight", redis.Z{Score: 1, Member: "c"}) + rdb.ZAdd(ctx, "q:emails:delayed", redis.Z{Score: 1, Member: "d"}, redis.Z{Score: 2, Member: "e"}, redis.Z{Score: 3, Member: "f"}) + rdb.RPush(ctx, "q:emails:dlq", "g") + + c := metrics.NewDepthCollector(rdb, "emails") + + want := ` +# HELP relay_queue_depth Number of jobs in a queue, by state. +# TYPE relay_queue_depth gauge +relay_queue_depth{queue="emails",state="ready"} 2 +relay_queue_depth{queue="emails",state="inflight"} 1 +relay_queue_depth{queue="emails",state="delayed"} 3 +relay_queue_depth{queue="emails",state="dlq"} 1 +` + if err := testutil.CollectAndCompare(c, strings.NewReader(want), "relay_queue_depth"); err != nil { + t.Errorf("CollectAndCompare: %v", err) + } +} + +func TestDepthCollectorEmptyQueueReportsZeros(t *testing.T) { + rdb := newTestRedis(t) + c := metrics.NewDepthCollector(rdb, "emails") + + want := ` +# HELP relay_queue_depth Number of jobs in a queue, by state. +# TYPE relay_queue_depth gauge +relay_queue_depth{queue="emails",state="ready"} 0 +relay_queue_depth{queue="emails",state="inflight"} 0 +relay_queue_depth{queue="emails",state="delayed"} 0 +relay_queue_depth{queue="emails",state="dlq"} 0 +` + if err := testutil.CollectAndCompare(c, strings.NewReader(want), "relay_queue_depth"); err != nil { + t.Errorf("CollectAndCompare: %v", err) + } +} + +// Ensures the collector can be registered on a registry without panicking. +func TestDepthCollectorRegisters(t *testing.T) { + rdb := newTestRedis(t) + reg := prometheus.NewRegistry() + if err := reg.Register(metrics.NewDepthCollector(rdb, "emails")); err != nil { + t.Fatalf("Register: %v", err) + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/metrics/ -run TestDepthCollector -v` +Expected: FAIL — `NewDepthCollector` undefined. + +- [ ] **Step 3: Implement the DepthCollector** + +Create `internal/metrics/depth.go`: + +```go +package metrics + +import ( + "context" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/redis/go-redis/v9" +) + +// scrapeTimeout bounds the Redis work done during one Prometheus scrape so a +// slow or unreachable Redis cannot hang the /metrics handler. +const scrapeTimeout = 5 * time.Second + +// DepthCollector reports per-queue depth gauges by reading Redis at scrape time, +// so the values can never go stale between scrapes. It implements +// prometheus.Collector and is registered on the Recorder's registry. +type DepthCollector struct { + rdb *redis.Client + queues []string + desc *prometheus.Desc +} + +// NewDepthCollector builds a collector that reports depths for the given queues. +func NewDepthCollector(rdb *redis.Client, queues ...string) *DepthCollector { + return &DepthCollector{ + rdb: rdb, + queues: queues, + desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "", "queue_depth"), + "Number of jobs in a queue, by state.", + []string{"queue", "state"}, nil, + ), + } +} + +// Describe sends the single gauge descriptor. (Required by prometheus.Collector.) +func (c *DepthCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.desc +} + +// Collect reads ready/inflight/delayed (ZCARD) and dlq (LLEN) for each queue and +// emits one gauge sample per (queue, state). On a Redis error for a given query +// it skips that sample rather than reporting a misleading zero. +func (c *DepthCollector) Collect(ch chan<- prometheus.Metric) { + ctx, cancel := context.WithTimeout(context.Background(), scrapeTimeout) + defer cancel() + + for _, q := range c.queues { + c.emit(ctx, ch, q, "ready", c.rdb.ZCard(ctx, "q:"+q+":ready")) + c.emit(ctx, ch, q, "inflight", c.rdb.ZCard(ctx, "q:"+q+":inflight")) + c.emit(ctx, ch, q, "delayed", c.rdb.ZCard(ctx, "q:"+q+":delayed")) + c.emit(ctx, ch, q, "dlq", c.rdb.LLen(ctx, "q:"+q+":dlq")) + } +} + +// emit turns one IntCmd result into a gauge sample, skipping it on error. +func (c *DepthCollector) emit(_ context.Context, ch chan<- prometheus.Metric, queue, state string, cmd *redis.IntCmd) { + n, err := cmd.Result() + if err != nil { + return // skip this sample; do not report a stale/zero depth + } + ch <- prometheus.MustNewConstMetric(c.desc, prometheus.GaugeValue, float64(n), queue, state) +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `go test ./internal/metrics/ -run TestDepthCollector -v` +Expected: PASS (3 tests). If `CollectAndCompare` complains about sample ordering, the expected text above lists states in the emit order (ready, inflight, delayed, dlq); `CollectAndCompare` sorts internally, so order in the literal does not matter — fix any mismatch by correcting the expected values, not the code. + +- [ ] **Step 5: Full metrics suite under race** + +Run: `go test -race ./internal/metrics/` +Expected: PASS. + +- [ ] **Step 6: Commit** + +```bash +git add internal/metrics/depth.go internal/metrics/depth_test.go +git commit -m "Add DepthCollector reporting per-queue depth gauges at scrape time" +``` + +--- + +## Task 9: Wire `/metrics` into `cmd/worker` + +**Files:** +- Modify: `cmd/worker/main.go` + +- [ ] **Step 1: Add the flag and wiring** + +Read `cmd/worker/main.go` first to match its existing flag/broker/shutdown style. Then: + +1. Add a flag near the others: + +```go + metricsAddr := flag.String("metrics-addr", "", "if set (e.g. :9090), serve Prometheus /metrics on this address") +``` + +2. When building the broker, conditionally add the recorder. Build the broker options slice so `WithMetrics` is appended only when metrics are enabled, and keep a handle to the recorder: + +```go + var rec *metrics.Recorder + brokerOpts := []broker.Option{broker.WithBackoff(/* existing args unchanged */)} + // ... keep existing conditional WithRateLimit append ... + if *metricsAddr != "" { + rec = metrics.NewRecorder() + brokerOpts = append(brokerOpts, broker.WithMetrics(rec)) + } + b := broker.New(rdb, brokerOpts...) +``` + +(Adapt to the file's current construction — the key change is appending `broker.WithMetrics(rec)` when `*metricsAddr != ""`. Do not change existing options.) + +3. After the broker is built and before/alongside starting the worker pool, start the metrics HTTP server when enabled: + +```go + var metricsSrv *http.Server + if rec != nil { + rec.Registry().MustRegister(metrics.NewDepthCollector(rdb, *queue)) + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor(rec.Registry(), promhttp.HandlerOpts{})) + metricsSrv = &http.Server{Addr: *metricsAddr, Handler: mux} + go func() { + if err := metricsSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Error("relay worker: metrics server failed", "err", err) + } + }() + logger.Info("relay worker: serving metrics", "addr", *metricsAddr) + } +``` + +(Use the file's existing logger variable; if it uses `log` rather than `slog`, match that.) + +4. In the shutdown path (after the worker pool stops, where ctx is cancelled on SIGINT/SIGTERM), gracefully close the server: + +```go + if metricsSrv != nil { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := metricsSrv.Shutdown(shutdownCtx); err != nil { + logger.Error("relay worker: metrics shutdown", "err", err) + } + } +``` + +5. Add imports: `net/http`, `github.com/prometheus/client_golang/prometheus/promhttp`, `github.com/StrangeNoob/relay/internal/metrics` (and `context`/`time` if not already present). + +- [ ] **Step 2: Build and vet** + +Run: +```bash +go build ./... +go vet ./... +``` +Expected: both clean. + +- [ ] **Step 3: Smoke check the flag (optional, requires Redis)** + +Run (manual, then Ctrl-C): +```bash +go run ./cmd/worker -queue demo -concurrency 1 -metrics-addr :9090 & +sleep 1 && curl -s localhost:9090/metrics | grep relay_ | head +kill %1 +``` +Expected: `relay_*` metric lines printed (depths at least). Skip if no local Redis. + +- [ ] **Step 4: Commit** + +```bash +git add cmd/worker/main.go +git commit -m "Serve Prometheus /metrics from cmd/worker behind --metrics-addr" +``` + +--- + +## Task 10: Update CLAUDE.md and final verification + +**Files:** +- Modify: `CLAUDE.md` + +- [ ] **Step 1: Update CLAUDE.md** + +Make these edits (match exact surrounding wording when editing): + +1. **Status line** — change "Phase 2 nearly complete … only Prometheus metrics remain in Phase 2" to state Phase 2 is complete (metrics shipped). Example: "**Status: Phase 1 complete; Phase 2 complete.** The core engine plus delayed jobs, the promoter, retry backoff, priority, idempotency enforcement, per-queue rate limiting, and Prometheus metrics are built, tested against a real Redis under `-race`, and CI is green." +2. **Remaining-Phase-2 line** — remove the "(Prometheus metrics)" remaining note; only Phase 3 remains. +3. **Broker options** — add `WithMetrics(m)` to the broker-options sentence in the "What this is" section. +4. **`internal/metrics`** — in the Layout block, change the metrics entry to ✅ built and add `cmd/worker --metrics-addr`. Add `internal/metrics/` (Recorder + DepthCollector) to the built list. +5. **Build order** — Phase 2 line: change "metrics still to do" to metrics ✅; Phase 2 done. +6. **Known limitations** — add a metrics bullet: per-queue label cardinality; depth gauges cost one Redis round-trip per (queue,state) per scrape; metrics are per-process (aggregate depth gauges with max/avg, not sum); endpoint only on `cmd/worker` until Phase 3. +7. **Dependencies** — update the "Only dependency" wording to note `github.com/prometheus/client_golang` is a second direct dependency, used purely for metrics instrumentation (not a queue library — the from-scratch rule is intact). +8. **Script/option inventories** — if a broker-options inventory exists, ensure `WithMetrics` is listed. + +- [ ] **Step 2: Full verification** + +Run: +```bash +go build ./... +go test -race ./... +go vet ./... +gofmt -l internal/ cmd/ +``` +Expected: build clean; all tests pass (broker DB 15, worker DB 14, metrics DB 13 — no collisions); vet clean; `gofmt -l` prints nothing. + +- [ ] **Step 3: Commit** + +```bash +git add CLAUDE.md +git commit -m "Document Phase 2 completion: Prometheus metrics" +``` + +--- + +## Self-Review (completed during planning) + +- **Spec coverage:** Metrics interface + noop + WithMetrics (Task 1); enqueue/dedup (Task 2); claim (Task 3); ack+latency (Task 4); retry/dead (Task 5); reap/promote (Task 6); Recorder + all metric names/buckets (Task 7); DepthCollector + depth gauges (Task 8); cmd/worker endpoint + graceful shutdown (Task 9); CLAUDE.md + dependency note + known limitations (Task 10). All spec sections mapped. +- **Type consistency:** `Metrics` method set is identical across the interface (Task 1), the fake (Task 2), and the Recorder (Task 7): `IncEnqueued/IncDeduplicated/IncClaimed/IncProcessed/IncRetried/IncDead` (single, queue), `AddReaped/AddPromoted` (queue, int), `ObserveLatency` (queue, time.Duration). Metric names match the spec table. Test DB numbers: broker 15, worker 14, metrics 13. +- **No placeholders:** every code step shows complete code; the only "adapt to existing file" note is Task 9 (cmd wiring), which lists the exact additions. +- **Known soft spot:** Task 6's delayed-job setup depends on whether `WithReadyAt(pastTime)` lands a job in the delayed set; the task gives both forms and asserts `n == 2` so the implementer can pick the one that actually delays. From 3ae6112e67fc5d379fe1d6efe7d575e7d256b262 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 13:25:20 +0530 Subject: [PATCH 03/19] Add broker Metrics interface, noop default, and WithMetrics option --- internal/broker/broker.go | 2 ++ internal/broker/metrics.go | 51 +++++++++++++++++++++++++++++++++ internal/broker/metrics_test.go | 29 +++++++++++++++++++ 3 files changed, 82 insertions(+) create mode 100644 internal/broker/metrics.go create mode 100644 internal/broker/metrics_test.go diff --git a/internal/broker/broker.go b/internal/broker/broker.go index 562de1e..0f256fe 100644 --- a/internal/broker/broker.go +++ b/internal/broker/broker.go @@ -27,6 +27,7 @@ type Broker struct { backoffMax time.Duration dedupTTL time.Duration rateLimits map[string]rateLimit + metrics Metrics rndMu sync.Mutex rnd *rand.Rand @@ -53,6 +54,7 @@ func New(rdb *redis.Client, opts ...Option) *Broker { backoffBase: time.Second, backoffMax: 10 * time.Minute, dedupTTL: 24 * time.Hour, + metrics: noopMetrics{}, rnd: rand.New(rand.NewSource(time.Now().UnixNano())), } for _, opt := range opts { diff --git a/internal/broker/metrics.go b/internal/broker/metrics.go new file mode 100644 index 0000000..c09b6e0 --- /dev/null +++ b/internal/broker/metrics.go @@ -0,0 +1,51 @@ +package broker + +import "time" + +// Metrics receives a callback for every job-state transition the broker makes. +// It is the broker's consumer-side instrumentation contract: the broker depends +// on this small interface, not on any metrics library, so a Prometheus recorder +// (internal/metrics) — or a test fake — can be plugged in via WithMetrics. The +// default is noopMetrics, so a broker built without WithMetrics records nothing +// and behaves exactly as before. +// +// Every method takes the queue name so the implementation can label its series +// per queue. Reap/Promote add a batch count because one call moves many jobs; +// the rest are single events. ObserveLatency reports a job's end-to-end time in +// the system (enqueue -> ack). +type Metrics interface { + IncEnqueued(queue string) + IncDeduplicated(queue string) + IncClaimed(queue string) + IncProcessed(queue string) + IncRetried(queue string) + IncDead(queue string) + AddReaped(queue string, n int) + AddPromoted(queue string, n int) + ObserveLatency(queue string, d time.Duration) +} + +// noopMetrics is the default recorder: every method does nothing. It lets the +// broker call b.metrics unconditionally without nil checks, and keeps metrics +// entirely opt-in. +type noopMetrics struct{} + +func (noopMetrics) IncEnqueued(string) {} +func (noopMetrics) IncDeduplicated(string) {} +func (noopMetrics) IncClaimed(string) {} +func (noopMetrics) IncProcessed(string) {} +func (noopMetrics) IncRetried(string) {} +func (noopMetrics) IncDead(string) {} +func (noopMetrics) AddReaped(string, int) {} +func (noopMetrics) AddPromoted(string, int) {} +func (noopMetrics) ObserveLatency(string, time.Duration) {} + +// WithMetrics installs a Metrics recorder. A nil recorder is ignored so callers +// cannot accidentally replace the safe no-op with something that panics. +func WithMetrics(m Metrics) Option { + return func(b *Broker) { + if m != nil { + b.metrics = m + } + } +} diff --git a/internal/broker/metrics_test.go b/internal/broker/metrics_test.go new file mode 100644 index 0000000..6b2a369 --- /dev/null +++ b/internal/broker/metrics_test.go @@ -0,0 +1,29 @@ +package broker + +import "testing" + +func TestNewDefaultsToNoopMetrics(t *testing.T) { + b := New(nil) + if b.metrics == nil { + t.Fatal("New: metrics field is nil, want noopMetrics default") + } + if _, ok := b.metrics.(noopMetrics); !ok { + t.Fatalf("New: metrics = %T, want noopMetrics", b.metrics) + } +} + +func TestWithMetricsInstallsRecorder(t *testing.T) { + rec := noopMetrics{} // any Metrics value; identity is what we check + var custom Metrics = rec + b := New(nil, WithMetrics(custom)) + if b.metrics == nil { + t.Fatal("WithMetrics: metrics is nil") + } +} + +func TestWithMetricsNilIsIgnored(t *testing.T) { + b := New(nil, WithMetrics(nil)) + if _, ok := b.metrics.(noopMetrics); !ok { + t.Fatalf("WithMetrics(nil): metrics = %T, want noopMetrics retained", b.metrics) + } +} From cc7102009aed736bef19e00a520b8697f69d5937 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 13:29:29 +0530 Subject: [PATCH 04/19] Make WithMetrics install test non-vacuous; clarify latency doc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the vacuous noopMetrics identity check with a sentinel distinctMetrics pointer so TestWithMetricsInstallsRecorder actually fails when WithMetrics does not install the recorder. Also tighten the ObserveLatency doc comment: "enqueue -> ack" → "creation -> ack" to match the implementation (latency is measured from job.CreatedAt, set in job.New, not from the enqueue call). --- internal/broker/metrics.go | 2 +- internal/broker/metrics_test.go | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/internal/broker/metrics.go b/internal/broker/metrics.go index c09b6e0..131f522 100644 --- a/internal/broker/metrics.go +++ b/internal/broker/metrics.go @@ -12,7 +12,7 @@ import "time" // Every method takes the queue name so the implementation can label its series // per queue. Reap/Promote add a batch count because one call moves many jobs; // the rest are single events. ObserveLatency reports a job's end-to-end time in -// the system (enqueue -> ack). +// the system (creation -> ack). type Metrics interface { IncEnqueued(queue string) IncDeduplicated(queue string) diff --git a/internal/broker/metrics_test.go b/internal/broker/metrics_test.go index 6b2a369..e792674 100644 --- a/internal/broker/metrics_test.go +++ b/internal/broker/metrics_test.go @@ -12,12 +12,15 @@ func TestNewDefaultsToNoopMetrics(t *testing.T) { } } +// distinctMetrics is a sentinel recorder with its own identity, so the test can +// prove WithMetrics installed exactly this value (noopMetrics{} has no identity). +type distinctMetrics struct{ noopMetrics } + func TestWithMetricsInstallsRecorder(t *testing.T) { - rec := noopMetrics{} // any Metrics value; identity is what we check - var custom Metrics = rec - b := New(nil, WithMetrics(custom)) - if b.metrics == nil { - t.Fatal("WithMetrics: metrics is nil") + rec := &distinctMetrics{} + b := New(nil, WithMetrics(rec)) + if b.metrics != rec { + t.Fatalf("WithMetrics: metrics = %v, want the installed recorder", b.metrics) } } From 5f5e9deb827b27cd8ddabcd4e2fb178dc2895790 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 13:31:56 +0530 Subject: [PATCH 05/19] Instrument Enqueue with enqueued/deduplicated metrics Add fakeMetrics recorder and newTestBrokerWith helper for option-injecting tests. Wire IncEnqueued and IncDeduplicated into broker.Enqueue so every successful enqueue and every dropped duplicate is counted per queue. --- internal/broker/broker.go | 2 + internal/broker/broker_test.go | 9 +- internal/broker/instrumentation_test.go | 104 ++++++++++++++++++++++++ 3 files changed, 114 insertions(+), 1 deletion(-) create mode 100644 internal/broker/instrumentation_test.go diff --git a/internal/broker/broker.go b/internal/broker/broker.go index 0f256fe..71e4ba1 100644 --- a/internal/broker/broker.go +++ b/internal/broker/broker.go @@ -199,8 +199,10 @@ func (b *Broker) Enqueue(ctx context.Context, j job.Job, opts ...EnqueueOption) return fmt.Errorf("broker: enqueuing job %s: %w", j.ID, err) } if res == "dup" { + b.metrics.IncDeduplicated(j.Queue) return ErrDuplicate } + b.metrics.IncEnqueued(j.Queue) return nil } diff --git a/internal/broker/broker_test.go b/internal/broker/broker_test.go index 98f7b7f..01ee419 100644 --- a/internal/broker/broker_test.go +++ b/internal/broker/broker_test.go @@ -30,6 +30,13 @@ const testRedisDB = 15 // unreachable the test is skipped rather than failed, so the suite still runs // off-CI. func newTestBroker(t *testing.T) (*broker.Broker, *redis.Client) { + t.Helper() + return newTestBrokerWith(t) +} + +// newTestBrokerWith is newTestBroker with broker options, for tests that need to +// inject a recorder or other configuration. +func newTestBrokerWith(t *testing.T, opts ...broker.Option) (*broker.Broker, *redis.Client) { t.Helper() addr := os.Getenv("REDIS_ADDR") if addr == "" { @@ -44,7 +51,7 @@ func newTestBroker(t *testing.T) (*broker.Broker, *redis.Client) { t.Fatalf("flushdb: %v", err) } t.Cleanup(func() { _ = rdb.Close() }) - return broker.New(rdb), rdb + return broker.New(rdb, opts...), rdb } func TestEnqueuePersistsJob(t *testing.T) { diff --git a/internal/broker/instrumentation_test.go b/internal/broker/instrumentation_test.go new file mode 100644 index 0000000..527ea69 --- /dev/null +++ b/internal/broker/instrumentation_test.go @@ -0,0 +1,104 @@ +package broker_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/StrangeNoob/relay/internal/broker" + "github.com/StrangeNoob/relay/internal/job" +) + +// fakeMetrics is a Metrics recorder that counts calls per queue in memory, so a +// test can assert exactly which transition the broker recorded. It is safe for +// concurrent use because the broker may record from multiple goroutines. +type fakeMetrics struct { + mu sync.Mutex + enqueued map[string]int + deduped map[string]int + claimed map[string]int + processed map[string]int + retried map[string]int + dead map[string]int + reaped map[string]int + promoted map[string]int + latencies []time.Duration +} + +func newFakeMetrics() *fakeMetrics { + return &fakeMetrics{ + enqueued: map[string]int{}, + deduped: map[string]int{}, + claimed: map[string]int{}, + processed: map[string]int{}, + retried: map[string]int{}, + dead: map[string]int{}, + reaped: map[string]int{}, + promoted: map[string]int{}, + } +} + +func (f *fakeMetrics) IncEnqueued(q string) { f.bump(f.enqueued, q) } +func (f *fakeMetrics) IncDeduplicated(q string) { f.bump(f.deduped, q) } +func (f *fakeMetrics) IncClaimed(q string) { f.bump(f.claimed, q) } +func (f *fakeMetrics) IncProcessed(q string) { f.bump(f.processed, q) } +func (f *fakeMetrics) IncRetried(q string) { f.bump(f.retried, q) } +func (f *fakeMetrics) IncDead(q string) { f.bump(f.dead, q) } +func (f *fakeMetrics) AddReaped(q string, n int) { f.add(f.reaped, q, n) } +func (f *fakeMetrics) AddPromoted(q string, n int) { f.add(f.promoted, q, n) } +func (f *fakeMetrics) ObserveLatency(q string, d time.Duration) { + f.mu.Lock() + defer f.mu.Unlock() + f.latencies = append(f.latencies, d) +} + +func (f *fakeMetrics) bump(m map[string]int, q string) { f.add(m, q, 1) } +func (f *fakeMetrics) add(m map[string]int, q string, n int) { + f.mu.Lock() + defer f.mu.Unlock() + m[q] += n +} + +func (f *fakeMetrics) get(m map[string]int, q string) int { + f.mu.Lock() + defer f.mu.Unlock() + return m[q] +} + +func TestEnqueueRecordsEnqueued(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + if err := b.Enqueue(ctx, job.New("emails", []byte("x"))); err != nil { + t.Fatalf("Enqueue: %v", err) + } + if got := fm.get(fm.enqueued, "emails"); got != 1 { + t.Errorf("enqueued[emails] = %d, want 1", got) + } + if got := fm.get(fm.deduped, "emails"); got != 0 { + t.Errorf("deduped[emails] = %d, want 0", got) + } +} + +func TestEnqueueDuplicateRecordsDeduplicated(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + j1 := job.New("emails", []byte("a")) + if err := b.Enqueue(ctx, j1, broker.WithIdempotencyKey("k1")); err != nil { + t.Fatalf("first Enqueue: %v", err) + } + j2 := job.New("emails", []byte("b")) + if err := b.Enqueue(ctx, j2, broker.WithIdempotencyKey("k1")); err != broker.ErrDuplicate { + t.Fatalf("second Enqueue err = %v, want ErrDuplicate", err) + } + if got := fm.get(fm.enqueued, "emails"); got != 1 { + t.Errorf("enqueued[emails] = %d, want 1", got) + } + if got := fm.get(fm.deduped, "emails"); got != 1 { + t.Errorf("deduped[emails] = %d, want 1", got) + } +} From 4ff3c7fd5c48fa0c56020ee4cd0da7ed83077989 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 13:35:26 +0530 Subject: [PATCH 06/19] Use errors.Is for ErrDuplicate check, matching file convention --- internal/broker/instrumentation_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/broker/instrumentation_test.go b/internal/broker/instrumentation_test.go index 527ea69..57e98b9 100644 --- a/internal/broker/instrumentation_test.go +++ b/internal/broker/instrumentation_test.go @@ -2,6 +2,7 @@ package broker_test import ( "context" + "errors" "sync" "testing" "time" @@ -92,7 +93,7 @@ func TestEnqueueDuplicateRecordsDeduplicated(t *testing.T) { t.Fatalf("first Enqueue: %v", err) } j2 := job.New("emails", []byte("b")) - if err := b.Enqueue(ctx, j2, broker.WithIdempotencyKey("k1")); err != broker.ErrDuplicate { + if err := b.Enqueue(ctx, j2, broker.WithIdempotencyKey("k1")); !errors.Is(err, broker.ErrDuplicate) { t.Fatalf("second Enqueue err = %v, want ErrDuplicate", err) } if got := fm.get(fm.enqueued, "emails"); got != 1 { From f710400b1a317f42ee1558c2034e9e66d045fcca Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 13:36:50 +0530 Subject: [PATCH 07/19] Instrument Claim with claimed metric --- internal/broker/broker.go | 1 + internal/broker/instrumentation_test.go | 29 +++++++++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/internal/broker/broker.go b/internal/broker/broker.go index 71e4ba1..4cf2c78 100644 --- a/internal/broker/broker.go +++ b/internal/broker/broker.go @@ -240,6 +240,7 @@ func (b *Broker) Claim(ctx context.Context, queue string, visibility time.Durati if err != nil { return job.Job{}, false, fmt.Errorf("broker: claiming from %q: %w", queue, err) } + b.metrics.IncClaimed(queue) return j, true, nil } diff --git a/internal/broker/instrumentation_test.go b/internal/broker/instrumentation_test.go index 57e98b9..7c40d93 100644 --- a/internal/broker/instrumentation_test.go +++ b/internal/broker/instrumentation_test.go @@ -103,3 +103,32 @@ func TestEnqueueDuplicateRecordsDeduplicated(t *testing.T) { t.Errorf("deduped[emails] = %d, want 1", got) } } + +func TestClaimRecordsClaimed(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + if err := b.Enqueue(ctx, job.New("emails", []byte("x"))); err != nil { + t.Fatalf("Enqueue: %v", err) + } + if _, ok, err := b.Claim(ctx, "emails", time.Minute); err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v, want true/nil", ok, err) + } + if got := fm.get(fm.claimed, "emails"); got != 1 { + t.Errorf("claimed[emails] = %d, want 1", got) + } +} + +func TestClaimEmptyQueueRecordsNothing(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + if _, ok, err := b.Claim(ctx, "emails", time.Minute); err != nil || ok { + t.Fatalf("Claim on empty: ok=%v err=%v, want false/nil", ok, err) + } + if got := fm.get(fm.claimed, "emails"); got != 0 { + t.Errorf("claimed[emails] = %d, want 0", got) + } +} From eee3fb87aa5d9e70997c925521beaca489b994cf Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 13:39:40 +0530 Subject: [PATCH 08/19] Instrument Ack with processed counter and end-to-end latency --- internal/broker/broker.go | 2 ++ internal/broker/instrumentation_test.go | 33 +++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/internal/broker/broker.go b/internal/broker/broker.go index 4cf2c78..a1b5899 100644 --- a/internal/broker/broker.go +++ b/internal/broker/broker.go @@ -254,6 +254,8 @@ func (b *Broker) Ack(ctx context.Context, j job.Job) error { ).Err(); err != nil { return fmt.Errorf("broker: acking job %s: %w", j.ID, err) } + b.metrics.IncProcessed(j.Queue) + b.metrics.ObserveLatency(j.Queue, time.Since(j.CreatedAt)) return nil } diff --git a/internal/broker/instrumentation_test.go b/internal/broker/instrumentation_test.go index 7c40d93..852456c 100644 --- a/internal/broker/instrumentation_test.go +++ b/internal/broker/instrumentation_test.go @@ -132,3 +132,36 @@ func TestClaimEmptyQueueRecordsNothing(t *testing.T) { t.Errorf("claimed[emails] = %d, want 0", got) } } + +func TestAckRecordsProcessedAndLatency(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + if err := b.Enqueue(ctx, job.New("emails", []byte("x"))); err != nil { + t.Fatalf("Enqueue: %v", err) + } + j, ok, err := b.Claim(ctx, "emails", time.Minute) + if err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v", ok, err) + } + if err := b.Ack(ctx, j); err != nil { + t.Fatalf("Ack: %v", err) + } + if got := fm.get(fm.processed, "emails"); got != 1 { + t.Errorf("processed[emails] = %d, want 1", got) + } + fm.mu.Lock() + n := len(fm.latencies) + var d time.Duration + if n == 1 { + d = fm.latencies[0] + } + fm.mu.Unlock() + if n != 1 { + t.Fatalf("latencies len = %d, want 1", n) + } + if d < 0 { + t.Errorf("latency = %v, want non-negative", d) + } +} From 616e61c8ac2615d288c987f0525ccaa52a625290 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 13:43:13 +0530 Subject: [PATCH 09/19] Instrument Nack with retried/dead metrics from script outcome nack.lua already returns "retry" or "dead" to indicate which branch was taken. Switch Nack from .Err() to .Text() to capture that return value, then call IncRetried/IncDead on the Metrics interface accordingly. Tests (TestNackWithRetriesLeftRecordsRetried, TestNackWithBudgetSpentRecordsDead) were written first and confirmed RED before the one-line behavior change made them GREEN. --- internal/broker/broker.go | 11 +++++- internal/broker/instrumentation_test.go | 51 +++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/internal/broker/broker.go b/internal/broker/broker.go index a1b5899..5106b3c 100644 --- a/internal/broker/broker.go +++ b/internal/broker/broker.go @@ -270,12 +270,19 @@ func (b *Broker) Nack(ctx context.Context, j job.Job) error { b.rndMu.Unlock() readyAt := time.Now().Add(delay).UnixMilli() - if err := nackScript.Run(ctx, b.rdb, + outcome, err := nackScript.Run(ctx, b.rdb, []string{inflightKey(j.Queue), delayedKey(j.Queue), dlqKey(j.Queue)}, j.ID, jobKeyPrefix, readyAt, - ).Err(); err != nil { + ).Text() + if err != nil { return fmt.Errorf("broker: nacking job %s: %w", j.ID, err) } + switch outcome { + case "retry": + b.metrics.IncRetried(j.Queue) + case "dead": + b.metrics.IncDead(j.Queue) + } return nil } diff --git a/internal/broker/instrumentation_test.go b/internal/broker/instrumentation_test.go index 852456c..2a17dd1 100644 --- a/internal/broker/instrumentation_test.go +++ b/internal/broker/instrumentation_test.go @@ -165,3 +165,54 @@ func TestAckRecordsProcessedAndLatency(t *testing.T) { t.Errorf("latency = %v, want non-negative", d) } } + +// nackTestJob enqueues, claims, and returns a job set up so the next Nack takes +// the requested branch. maxRetries controls retry-vs-dead: with maxRetries=5 the +// first nack retries; with maxRetries=0 the first nack dead-letters. +func nackTestJob(t *testing.T, b *broker.Broker, ctx context.Context, maxRetries int) job.Job { + t.Helper() + j := job.New("emails", []byte("x")) + j.MaxRetries = maxRetries + if err := b.Enqueue(ctx, j); err != nil { + t.Fatalf("Enqueue: %v", err) + } + claimed, ok, err := b.Claim(ctx, "emails", time.Minute) + if err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v", ok, err) + } + return claimed +} + +func TestNackWithRetriesLeftRecordsRetried(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + j := nackTestJob(t, b, ctx, 5) // attempts now 1 < 5 -> retry + if err := b.Nack(ctx, j); err != nil { + t.Fatalf("Nack: %v", err) + } + if got := fm.get(fm.retried, "emails"); got != 1 { + t.Errorf("retried[emails] = %d, want 1", got) + } + if got := fm.get(fm.dead, "emails"); got != 0 { + t.Errorf("dead[emails] = %d, want 0", got) + } +} + +func TestNackWithBudgetSpentRecordsDead(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + j := nackTestJob(t, b, ctx, 0) // attempts now 1, max 0 -> dead + if err := b.Nack(ctx, j); err != nil { + t.Fatalf("Nack: %v", err) + } + if got := fm.get(fm.dead, "emails"); got != 1 { + t.Errorf("dead[emails] = %d, want 1", got) + } + if got := fm.get(fm.retried, "emails"); got != 0 { + t.Errorf("retried[emails] = %d, want 0", got) + } +} From 6ad96a217f10490f5a980401be043f73bc4894a0 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 13:45:53 +0530 Subject: [PATCH 10/19] Comment Nack outcome switch's intentional no-match behavior --- internal/broker/broker.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/broker/broker.go b/internal/broker/broker.go index 5106b3c..63ecde0 100644 --- a/internal/broker/broker.go +++ b/internal/broker/broker.go @@ -277,6 +277,8 @@ func (b *Broker) Nack(ctx context.Context, j job.Job) error { if err != nil { return fmt.Errorf("broker: nacking job %s: %w", j.ID, err) } + // nack.lua returns "retry" or "dead"; any other value intentionally records + // nothing rather than mis-attributing a count. switch outcome { case "retry": b.metrics.IncRetried(j.Queue) From 41fc1234379600207ec326e1a053cd70a6d858fb Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 13:47:49 +0530 Subject: [PATCH 11/19] Instrument Reap and Promote with batch count metrics --- internal/broker/broker.go | 6 +++ internal/broker/instrumentation_test.go | 56 +++++++++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/internal/broker/broker.go b/internal/broker/broker.go index 63ecde0..2f16832 100644 --- a/internal/broker/broker.go +++ b/internal/broker/broker.go @@ -312,6 +312,9 @@ func (b *Broker) Reap(ctx context.Context, queue string) (int, error) { if err != nil { return 0, fmt.Errorf("broker: reaping %q: %w", queue, err) } + if n > 0 { + b.metrics.AddReaped(queue, n) + } return n, nil } @@ -327,6 +330,9 @@ func (b *Broker) Promote(ctx context.Context, queue string) (int, error) { if err != nil { return 0, fmt.Errorf("broker: promoting %q: %w", queue, err) } + if n > 0 { + b.metrics.AddPromoted(queue, n) + } return n, nil } diff --git a/internal/broker/instrumentation_test.go b/internal/broker/instrumentation_test.go index 2a17dd1..cd81d4d 100644 --- a/internal/broker/instrumentation_test.go +++ b/internal/broker/instrumentation_test.go @@ -216,3 +216,59 @@ func TestNackWithBudgetSpentRecordsDead(t *testing.T) { t.Errorf("retried[emails] = %d, want 0", got) } } + +func TestReapRecordsReapedCount(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + // Enqueue + claim two jobs with a tiny visibility so they expire immediately. + for i := 0; i < 2; i++ { + if err := b.Enqueue(ctx, job.New("emails", []byte("x"))); err != nil { + t.Fatalf("Enqueue: %v", err) + } + } + for i := 0; i < 2; i++ { + if _, ok, err := b.Claim(ctx, "emails", time.Millisecond); err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v", ok, err) + } + } + time.Sleep(10 * time.Millisecond) // let the visibility deadline pass + + n, err := b.Reap(ctx, "emails") + if err != nil { + t.Fatalf("Reap: %v", err) + } + if n != 2 { + t.Fatalf("Reap returned %d, want 2", n) + } + if got := fm.get(fm.reaped, "emails"); got != 2 { + t.Errorf("reaped[emails] = %d, want 2", got) + } +} + +func TestPromoteRecordsPromotedCount(t *testing.T) { + fm := newFakeMetrics() + b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) + ctx := context.Background() + + // Enqueue two delayed jobs whose ready-at is just in the future, then wait. + soon := time.Now().Add(20 * time.Millisecond) + for i := 0; i < 2; i++ { + if err := b.Enqueue(ctx, job.New("emails", []byte("x")), broker.WithReadyAt(soon)); err != nil { + t.Fatalf("Enqueue delayed: %v", err) + } + } + time.Sleep(40 * time.Millisecond) + + n, err := b.Promote(ctx, "emails") + if err != nil { + t.Fatalf("Promote: %v", err) + } + if n != 2 { + t.Fatalf("Promote returned %d, want 2", n) + } + if got := fm.get(fm.promoted, "emails"); got != 2 { + t.Errorf("promoted[emails] = %d, want 2", got) + } +} From f62ddd1eeeced84fec0a61897bafaf16b6e87c57 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 13:50:13 +0530 Subject: [PATCH 12/19] Widen promote metric test timing margin for CI reliability --- internal/broker/instrumentation_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/broker/instrumentation_test.go b/internal/broker/instrumentation_test.go index cd81d4d..c7bf0d0 100644 --- a/internal/broker/instrumentation_test.go +++ b/internal/broker/instrumentation_test.go @@ -252,14 +252,16 @@ func TestPromoteRecordsPromotedCount(t *testing.T) { b, _ := newTestBrokerWith(t, broker.WithMetrics(fm)) ctx := context.Background() - // Enqueue two delayed jobs whose ready-at is just in the future, then wait. - soon := time.Now().Add(20 * time.Millisecond) + // Enqueue two delayed jobs whose ready-at is just in the future, then wait + // well past it. The sleep margin is generous (>3x) so the test stays reliable + // on a loaded CI runner under -race. + soon := time.Now().Add(30 * time.Millisecond) for i := 0; i < 2; i++ { if err := b.Enqueue(ctx, job.New("emails", []byte("x")), broker.WithReadyAt(soon)); err != nil { t.Fatalf("Enqueue delayed: %v", err) } } - time.Sleep(40 * time.Millisecond) + time.Sleep(100 * time.Millisecond) n, err := b.Promote(ctx, "emails") if err != nil { From 1fcfe9b575919f457bc373be80331a84a4deae49 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 13:52:16 +0530 Subject: [PATCH 13/19] Add internal/metrics Recorder implementing broker.Metrics over Prometheus --- go.mod | 15 +++- go.sum | 42 +++++++++-- internal/metrics/recorder.go | 114 ++++++++++++++++++++++++++++++ internal/metrics/recorder_test.go | 58 +++++++++++++++ 4 files changed, 224 insertions(+), 5 deletions(-) create mode 100644 internal/metrics/recorder.go create mode 100644 internal/metrics/recorder_test.go diff --git a/go.mod b/go.mod index ef60965..c81228e 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,22 @@ go 1.24 toolchain go1.25.11 -require github.com/redis/go-redis/v9 v9.20.0 +require ( + github.com/prometheus/client_golang v1.23.2 + github.com/redis/go-redis/v9 v9.20.0 +) require ( + github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect go.uber.org/atomic v1.11.0 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect + golang.org/x/sys v0.35.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect ) diff --git a/go.sum b/go.sum index 53cee5f..6286537 100644 --- a/go.sum +++ b/go.sum @@ -1,22 +1,56 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/redis/go-redis/v9 v9.20.0 h1:WnQYxLkgO2xiXTCJY0ldIiI8dNqCDlQAG+AtaH7a2a0= github.com/redis/go-redis/v9 v9.20.0/go.mod h1:v/M13XI1PVCDcm01VtPFOADfZtHf8YW3baQf57KlIkA= -github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs= github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/metrics/recorder.go b/internal/metrics/recorder.go new file mode 100644 index 0000000..75370d5 --- /dev/null +++ b/internal/metrics/recorder.go @@ -0,0 +1,114 @@ +// Package metrics provides the Prometheus implementation of the broker's +// instrumentation contract (broker.Metrics) plus a pull-based collector for +// per-queue depth gauges. It deliberately does not import internal/broker: it +// satisfies broker.Metrics structurally, keeping the dependency arrow one-way. +package metrics + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +const namespace = "relay" + +// Recorder implements broker.Metrics over a private Prometheus registry. Build +// it with NewRecorder, install it with broker.WithMetrics(rec), and serve +// rec.Registry() over HTTP. Counters and the latency histogram are labelled by +// queue. +type Recorder struct { + reg *prometheus.Registry + + enqueued *prometheus.CounterVec + deduped *prometheus.CounterVec + claimed *prometheus.CounterVec + processed *prometheus.CounterVec + retried *prometheus.CounterVec + dead *prometheus.CounterVec + reaped *prometheus.CounterVec + promoted *prometheus.CounterVec + latency *prometheus.HistogramVec +} + +// NewRecorder builds a Recorder with all metrics registered on a fresh registry. +func NewRecorder() *Recorder { + reg := prometheus.NewRegistry() + + counter := func(name, help string) *prometheus.CounterVec { + c := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, Name: name, Help: help, + }, []string{"queue"}) + reg.MustRegister(c) + return c + } + + r := &Recorder{ + reg: reg, + enqueued: counter("jobs_enqueued_total", "Jobs accepted into a queue."), + deduped: counter("jobs_deduplicated_total", "Enqueues dropped as idempotency-key duplicates."), + claimed: counter("jobs_claimed_total", "Jobs claimed by a worker."), + processed: counter("jobs_processed_total", "Jobs acked after successful processing."), + retried: counter("jobs_retried_total", "Failed jobs requeued for retry."), + dead: counter("jobs_dead_total", "Jobs moved to the dead-letter queue."), + reaped: counter("jobs_reaped_total", "Expired in-flight jobs requeued by the reaper."), + promoted: counter("jobs_promoted_total", "Delayed jobs promoted to ready."), + } + r.latency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Name: "job_latency_seconds", + Help: "End-to-end time from job creation to ack.", + // End-to-end latency spans ms to minutes, wider than the default + // 5ms-10s buckets: ~1ms -> ~9min across 13 buckets. + Buckets: prometheus.ExponentialBuckets(0.001, 3, 13), + }, []string{"queue"}) + reg.MustRegister(r.latency) + return r +} + +// Registry exposes the underlying registry for an HTTP handler and for +// registering additional collectors (e.g. DepthCollector). +func (r *Recorder) Registry() *prometheus.Registry { return r.reg } + +func (r *Recorder) IncEnqueued(q string) { r.enqueued.WithLabelValues(q).Inc() } +func (r *Recorder) IncDeduplicated(q string) { r.deduped.WithLabelValues(q).Inc() } +func (r *Recorder) IncClaimed(q string) { r.claimed.WithLabelValues(q).Inc() } +func (r *Recorder) IncProcessed(q string) { r.processed.WithLabelValues(q).Inc() } +func (r *Recorder) IncRetried(q string) { r.retried.WithLabelValues(q).Inc() } +func (r *Recorder) IncDead(q string) { r.dead.WithLabelValues(q).Inc() } +func (r *Recorder) AddReaped(q string, n int) { + r.reaped.WithLabelValues(q).Add(float64(n)) +} +func (r *Recorder) AddPromoted(q string, n int) { + r.promoted.WithLabelValues(q).Add(float64(n)) +} +func (r *Recorder) ObserveLatency(q string, d time.Duration) { + r.latency.WithLabelValues(q).Observe(d.Seconds()) +} + +// CounterForTest returns the per-queue child counter for the named metric. It +// exists so tests can read a specific series; "name" is the short key +// (enqueued, deduped, claimed, processed, retried, dead, reaped, promoted). +func (r *Recorder) CounterForTest(name, queue string) prometheus.Counter { + var v *prometheus.CounterVec + switch name { + case "enqueued": + v = r.enqueued + case "deduped": + v = r.deduped + case "claimed": + v = r.claimed + case "processed": + v = r.processed + case "retried": + v = r.retried + case "dead": + v = r.dead + case "reaped": + v = r.reaped + case "promoted": + v = r.promoted + default: + return nil + } + return v.WithLabelValues(queue) +} diff --git a/internal/metrics/recorder_test.go b/internal/metrics/recorder_test.go new file mode 100644 index 0000000..a71b6ea --- /dev/null +++ b/internal/metrics/recorder_test.go @@ -0,0 +1,58 @@ +package metrics_test + +import ( + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus/testutil" + + "github.com/StrangeNoob/relay/internal/broker" + "github.com/StrangeNoob/relay/internal/metrics" +) + +// Compile-time proof the Recorder satisfies the broker's instrumentation contract. +var _ broker.Metrics = (*metrics.Recorder)(nil) + +func TestRecorderCountersIncrement(t *testing.T) { + rec := metrics.NewRecorder() + + rec.IncEnqueued("emails") + rec.IncEnqueued("emails") + rec.IncDeduplicated("emails") + rec.IncClaimed("emails") + rec.IncProcessed("emails") + rec.IncRetried("emails") + rec.IncDead("emails") + rec.AddReaped("emails", 3) + rec.AddPromoted("emails", 4) + + checks := []struct { + name string + metric string + want float64 + }{ + {"enqueued", "relay_jobs_enqueued_total", 2}, + {"deduped", "relay_jobs_deduplicated_total", 1}, + {"claimed", "relay_jobs_claimed_total", 1}, + {"processed", "relay_jobs_processed_total", 1}, + {"retried", "relay_jobs_retried_total", 1}, + {"dead", "relay_jobs_dead_total", 1}, + {"reaped", "relay_jobs_reaped_total", 3}, + {"promoted", "relay_jobs_promoted_total", 4}, + } + for _, c := range checks { + if got := testutil.ToFloat64(rec.CounterForTest(c.name, "emails")); got != c.want { + t.Errorf("%s{queue=emails} = %v, want %v", c.metric, got, c.want) + } + } +} + +func TestRecorderObservesLatency(t *testing.T) { + rec := metrics.NewRecorder() + rec.ObserveLatency("emails", 250*time.Millisecond) + + got := testutil.CollectAndCount(rec.Registry(), "relay_job_latency_seconds") + if got == 0 { + t.Fatal("relay_job_latency_seconds: no series collected after one observation") + } +} From 3e40d388430ba6dcf4c9b90c320d3c6e21dfa76a Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 13:57:14 +0530 Subject: [PATCH 14/19] Add DepthCollector reporting per-queue depth gauges at scrape time --- internal/metrics/depth.go | 64 +++++++++++++++++++++++++ internal/metrics/depth_test.go | 85 ++++++++++++++++++++++++++++++++++ 2 files changed, 149 insertions(+) create mode 100644 internal/metrics/depth.go create mode 100644 internal/metrics/depth_test.go diff --git a/internal/metrics/depth.go b/internal/metrics/depth.go new file mode 100644 index 0000000..c4cba93 --- /dev/null +++ b/internal/metrics/depth.go @@ -0,0 +1,64 @@ +package metrics + +import ( + "context" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/redis/go-redis/v9" +) + +// scrapeTimeout bounds the Redis work done during one Prometheus scrape so a +// slow or unreachable Redis cannot hang the /metrics handler. +const scrapeTimeout = 5 * time.Second + +// DepthCollector reports per-queue depth gauges by reading Redis at scrape time, +// so the values can never go stale between scrapes. It implements +// prometheus.Collector and is registered on the Recorder's registry. +type DepthCollector struct { + rdb *redis.Client + queues []string + desc *prometheus.Desc +} + +// NewDepthCollector builds a collector that reports depths for the given queues. +func NewDepthCollector(rdb *redis.Client, queues ...string) *DepthCollector { + return &DepthCollector{ + rdb: rdb, + queues: queues, + desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "", "queue_depth"), + "Number of jobs in a queue, by state.", + []string{"queue", "state"}, nil, + ), + } +} + +// Describe sends the single gauge descriptor. (Required by prometheus.Collector.) +func (c *DepthCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.desc +} + +// Collect reads ready/inflight/delayed (ZCARD) and dlq (LLEN) for each queue and +// emits one gauge sample per (queue, state). On a Redis error for a given query +// it skips that sample rather than reporting a misleading zero. +func (c *DepthCollector) Collect(ch chan<- prometheus.Metric) { + ctx, cancel := context.WithTimeout(context.Background(), scrapeTimeout) + defer cancel() + + for _, q := range c.queues { + c.emit(ch, q, "ready", c.rdb.ZCard(ctx, "q:"+q+":ready")) + c.emit(ch, q, "inflight", c.rdb.ZCard(ctx, "q:"+q+":inflight")) + c.emit(ch, q, "delayed", c.rdb.ZCard(ctx, "q:"+q+":delayed")) + c.emit(ch, q, "dlq", c.rdb.LLen(ctx, "q:"+q+":dlq")) + } +} + +// emit turns one IntCmd result into a gauge sample, skipping it on error. +func (c *DepthCollector) emit(ch chan<- prometheus.Metric, queue, state string, cmd *redis.IntCmd) { + n, err := cmd.Result() + if err != nil { + return // skip this sample; do not report a stale/zero depth + } + ch <- prometheus.MustNewConstMetric(c.desc, prometheus.GaugeValue, float64(n), queue, state) +} diff --git a/internal/metrics/depth_test.go b/internal/metrics/depth_test.go new file mode 100644 index 0000000..8fa4bf1 --- /dev/null +++ b/internal/metrics/depth_test.go @@ -0,0 +1,85 @@ +package metrics_test + +import ( + "context" + "os" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/redis/go-redis/v9" + + "github.com/StrangeNoob/relay/internal/metrics" +) + +// metricsTestRedisDB is this package's dedicated Redis DB. broker tests use 15, +// worker tests 14; metrics claims 13 so parallel `go test ./...` never collides. +const metricsTestRedisDB = 13 + +func newTestRedis(t *testing.T) *redis.Client { + t.Helper() + addr := os.Getenv("REDIS_ADDR") + if addr == "" { + addr = "localhost:6379" + } + rdb := redis.NewClient(&redis.Options{Addr: addr, DB: metricsTestRedisDB}) + ctx := context.Background() + if err := rdb.Ping(ctx).Err(); err != nil { + t.Skipf("redis not available at %s: %v", addr, err) + } + if err := rdb.FlushDB(ctx).Err(); err != nil { + t.Fatalf("flushdb: %v", err) + } + t.Cleanup(func() { _ = rdb.Close() }) + return rdb +} + +func TestDepthCollectorReportsDepths(t *testing.T) { + rdb := newTestRedis(t) + ctx := context.Background() + + rdb.ZAdd(ctx, "q:emails:ready", redis.Z{Score: 1, Member: "a"}, redis.Z{Score: 2, Member: "b"}) + rdb.ZAdd(ctx, "q:emails:inflight", redis.Z{Score: 1, Member: "c"}) + rdb.ZAdd(ctx, "q:emails:delayed", redis.Z{Score: 1, Member: "d"}, redis.Z{Score: 2, Member: "e"}, redis.Z{Score: 3, Member: "f"}) + rdb.RPush(ctx, "q:emails:dlq", "g") + + c := metrics.NewDepthCollector(rdb, "emails") + + want := ` +# HELP relay_queue_depth Number of jobs in a queue, by state. +# TYPE relay_queue_depth gauge +relay_queue_depth{queue="emails",state="ready"} 2 +relay_queue_depth{queue="emails",state="inflight"} 1 +relay_queue_depth{queue="emails",state="delayed"} 3 +relay_queue_depth{queue="emails",state="dlq"} 1 +` + if err := testutil.CollectAndCompare(c, strings.NewReader(want), "relay_queue_depth"); err != nil { + t.Errorf("CollectAndCompare: %v", err) + } +} + +func TestDepthCollectorEmptyQueueReportsZeros(t *testing.T) { + rdb := newTestRedis(t) + c := metrics.NewDepthCollector(rdb, "emails") + + want := ` +# HELP relay_queue_depth Number of jobs in a queue, by state. +# TYPE relay_queue_depth gauge +relay_queue_depth{queue="emails",state="ready"} 0 +relay_queue_depth{queue="emails",state="inflight"} 0 +relay_queue_depth{queue="emails",state="delayed"} 0 +relay_queue_depth{queue="emails",state="dlq"} 0 +` + if err := testutil.CollectAndCompare(c, strings.NewReader(want), "relay_queue_depth"); err != nil { + t.Errorf("CollectAndCompare: %v", err) + } +} + +func TestDepthCollectorRegisters(t *testing.T) { + rdb := newTestRedis(t) + reg := prometheus.NewRegistry() + if err := reg.Register(metrics.NewDepthCollector(rdb, "emails")); err != nil { + t.Fatalf("Register: %v", err) + } +} From d2639b46092e902ef167c84dac4dc570e446d3b2 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 13:59:28 +0530 Subject: [PATCH 15/19] Check Redis seed errors in depth collector test --- internal/metrics/depth_test.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/internal/metrics/depth_test.go b/internal/metrics/depth_test.go index 8fa4bf1..2c95b76 100644 --- a/internal/metrics/depth_test.go +++ b/internal/metrics/depth_test.go @@ -35,14 +35,24 @@ func newTestRedis(t *testing.T) *redis.Client { return rdb } +// mustSeed fails the test immediately if seeding a fixture into Redis errored. +func mustSeed(t *testing.T, err error) { + t.Helper() + if err != nil { + t.Fatalf("seed redis: %v", err) + } +} + func TestDepthCollectorReportsDepths(t *testing.T) { rdb := newTestRedis(t) ctx := context.Background() - rdb.ZAdd(ctx, "q:emails:ready", redis.Z{Score: 1, Member: "a"}, redis.Z{Score: 2, Member: "b"}) - rdb.ZAdd(ctx, "q:emails:inflight", redis.Z{Score: 1, Member: "c"}) - rdb.ZAdd(ctx, "q:emails:delayed", redis.Z{Score: 1, Member: "d"}, redis.Z{Score: 2, Member: "e"}, redis.Z{Score: 3, Member: "f"}) - rdb.RPush(ctx, "q:emails:dlq", "g") + // Seed known cardinalities; fail loudly if seeding itself errors so a + // comparison mismatch can never be misread as a collector bug. + mustSeed(t, rdb.ZAdd(ctx, "q:emails:ready", redis.Z{Score: 1, Member: "a"}, redis.Z{Score: 2, Member: "b"}).Err()) + mustSeed(t, rdb.ZAdd(ctx, "q:emails:inflight", redis.Z{Score: 1, Member: "c"}).Err()) + mustSeed(t, rdb.ZAdd(ctx, "q:emails:delayed", redis.Z{Score: 1, Member: "d"}, redis.Z{Score: 2, Member: "e"}, redis.Z{Score: 3, Member: "f"}).Err()) + mustSeed(t, rdb.RPush(ctx, "q:emails:dlq", "g").Err()) c := metrics.NewDepthCollector(rdb, "emails") From 006d45f0282415e16ad7aa4eee5ad0f8da8a810b Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 19:00:24 +0530 Subject: [PATCH 16/19] Serve Prometheus /metrics from cmd/worker behind --metrics-addr --- cmd/worker/main.go | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 6da2bc6..c2a8ee6 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -12,16 +12,19 @@ import ( "fmt" "log/slog" "math/rand" + "net/http" "os" "os/signal" "sync" "syscall" "time" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/redis/go-redis/v9" "github.com/StrangeNoob/relay/internal/broker" "github.com/StrangeNoob/relay/internal/job" + "github.com/StrangeNoob/relay/internal/metrics" "github.com/StrangeNoob/relay/internal/worker" ) @@ -36,6 +39,7 @@ func main() { backoffMax := flag.Duration("backoff-max", 10*time.Minute, "retry backoff ceiling") rate := flag.Float64("rate", 0, "max claims/second for this queue (0 = unlimited)") burst := flag.Int("burst", 0, "rate-limit burst capacity (defaults to 1 when --rate is set)") + metricsAddr := flag.String("metrics-addr", "", "address to serve Prometheus /metrics on (e.g. :9090); empty = disabled") flag.Parse() logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) @@ -61,7 +65,32 @@ func main() { } brokerOpts = append(brokerOpts, broker.WithRateLimit(*queue, *rate, *burst)) } + + // Metrics: build recorder and register a depth collector only when + // --metrics-addr is set; otherwise rec stays nil and all metric paths + // are skipped, preserving byte-identical behaviour to before. + var rec *metrics.Recorder + if *metricsAddr != "" { + rec = metrics.NewRecorder() + rec.Registry().MustRegister(metrics.NewDepthCollector(rdb, *queue)) + brokerOpts = append(brokerOpts, broker.WithMetrics(rec)) + } + b := broker.New(rdb, brokerOpts...) + // Start the Prometheus metrics HTTP server when --metrics-addr is set. + var metricsSrv *http.Server + if rec != nil { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor(rec.Registry(), promhttp.HandlerOpts{})) + metricsSrv = &http.Server{Addr: *metricsAddr, Handler: mux} + go func() { + logger.Info("metrics server listening", "addr", *metricsAddr) + if err := metricsSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Error("metrics server error", "err", err) + } + }() + } + handler := demoHandler(*failRate, logger) var wg sync.WaitGroup @@ -98,6 +127,17 @@ func main() { <-ctx.Done() logger.Info("shutdown signal received, draining in-flight jobs") wg.Wait() + + // Shut down the metrics server after all workers have drained, so the + // final scrape can still observe counters from the last batch of jobs. + if metricsSrv != nil { + shutCtx, shutCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutCancel() + if err := metricsSrv.Shutdown(shutCtx); err != nil { + logger.Error("metrics server shutdown error", "err", err) + } + } + logger.Info("relay worker stopped cleanly") } From 26ab09e70b0e3457f8a6067123f688a199f656bb Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 19:04:03 +0530 Subject: [PATCH 17/19] Document Phase 2 completion: Prometheus metrics --- CLAUDE.md | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index f2fc62a..497f218 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -10,25 +10,30 @@ project: the point is to *prove understanding of queue internals*, not to wrap a library. Do not introduce a queue dependency (BullMQ, asynq, Machinery, Celery, etc.) — the mechanics are the deliverable. -**Status: Phase 1 complete; Phase 2 nearly complete.** The core engine plus delayed jobs, the -promoter, retry backoff, priority, idempotency enforcement, and per-queue rate limiting are built, -tested against a real Redis under `-race`, and CI is green (only Prometheus metrics remain in -Phase 2). Repo: . What exists today: +**Status: Phase 1 complete; Phase 2 complete.** The core engine plus delayed jobs, the promoter, +retry backoff, priority, idempotency enforcement, per-queue rate limiting, and Prometheus metrics +are built, tested against a real Redis under `-race`, and CI is green. Phase 3 (API/dashboard) is +next. Repo: . What exists today: - `internal/job` — the `Job` model + Redis-hash encoding (`ToHash`/`FromHash`). - `internal/broker` — `Enqueue` (with `WithDelay`/`WithReadyAt`/`WithPriority`/`WithIdempotencyKey` options), atomic `Claim`, `Ack`, `Nack` (full-jitter backoff via the delayed set), `Reap`, `Promote`, `Extend` (heartbeat), with Lua under `internal/broker/scripts/`: `enqueue.lua`, `claim.lua`, `ack.lua`, `nack.lua`, `reaper.lua`, `promote.lua`, `heartbeat.lua`. Broker options: `WithBackoff`, `WithDedupTTL`, - `WithRateLimit(queue, rate, burst)` (token-bucket per-queue rate limiting via Redis hash). + `WithRateLimit(queue, rate, burst)` (token-bucket per-queue rate limiting via Redis hash), + `WithMetrics(m)` (installs a `broker.Metrics` implementation; default is a no-op). +- `internal/metrics` — Prometheus `Recorder` (implements `broker.Metrics`; counters + `relay_jobs_*_total`, histogram `relay_job_latency_seconds`, all labelled by queue) and + `DepthCollector` (a `prometheus.Collector` reporting `relay_queue_depth{queue,state}` gauges + by reading ZCARD/LLEN at scrape time). - `internal/worker` — `Worker` (claim loop, dispatch, heartbeat, graceful shutdown), plus `Reaper` and `Promoter` background loops sharing one `runDrainLoop` helper. - `cmd/worker`, `cmd/demo` — thin runnable entrypoints (worker pool + reaper + promoter; load - generator with `--delay`). + generator with `--delay`). `cmd/worker` accepts `--metrics-addr` (default "" = off); when set, + serves `/metrics` and registers the depth collector with graceful shutdown. - `.github/workflows/ci.yml` — Redis service + `go test -race` + `golangci-lint`. -Remaining Phase 2 (Prometheus metrics) and Phase 3 (API/dashboard/`cmd/server`, docker-compose, -deploy) are **not** built yet. +Phase 3 (API/dashboard/`cmd/server`, docker-compose, deploy) is **not** built yet. ## Source of truth @@ -65,6 +70,7 @@ spec disagree, the spec wins until the spec is deliberately updated. (`broker.WithBackoff`). - **Idempotency is enqueue-only, TTL-window.** A keyed duplicate is dropped within the dedup TTL (default 24h, `WithDedupTTL`); the key is not released on completion. Delivery remains at-least-once — consumers needing exactly-once effects still dedup on the key. - **Rate-limit config is per-worker, not stored in Redis.** All workers on a queue must register the same `WithRateLimit` (they share one Redis bucket and pass rate/burst on every claim); mismatched configs refill inconsistently. A rate-limited claim is indistinguishable from an empty queue to the worker (it polls again). +- **Metrics are per-process and opt-in.** `broker.WithMetrics` installs a Prometheus recorder (default is a no-op); `cmd/worker --metrics-addr` serves `/metrics`. Counters/latency are per worker process — aggregate across workers in Prometheus. Queue-depth gauges read shared Redis at scrape time (one round-trip per queue/state), so every worker reports the same depths (aggregate with max/avg, not sum). Label cardinality is per queue. The endpoint lives on `cmd/worker` until the Phase 3 server exists. ## Redis data model & job lifecycle (the architecture in brief) @@ -111,7 +117,8 @@ internal/job/ # ✅ job model + hash encoding internal/broker/ # ✅ enqueue/claim/ack/nack/reap/promote/extend internal/broker/scripts/*.lua # ✅ enqueue, claim, ack, nack, reaper, promote, heartbeat (go:embed) internal/worker/ # ✅ Worker + Reaper + Promoter runtime -internal/{client,api,metrics}/ # ◻ producer SDK / HTTP API / Prometheus (Phase 2–3) +internal/metrics/ # ✅ Prometheus Recorder + DepthCollector +internal/{client,api}/ # ◻ producer SDK / HTTP API (Phase 3) web/ # ◻ embedded dashboard assets (Phase 3) deployments/docker-compose.yml # ◻ redis + server + N workers + demo (Phase 3) .github/workflows/ci.yml # ✅ Redis service + go test -race + golangci-lint @@ -123,7 +130,7 @@ Use `internal/` for everything not meant as a public import surface. `cmd/` hold ## Build order (do not jump ahead) 1. **Phase 1 — core: ✅ done.** job model; enqueue/claim/ack/nack Lua; reaper; worker runtime; basic DLQ; integration tests; CI. A working, testable queue ships first. -2. **Phase 2 — depth (in progress):** delayed jobs + promoter ✅; backoff + jitter ✅; priority ✅; idempotency ✅; rate limiting ✅; Prometheus metrics still to do. +2. **Phase 2 — depth: ✅ done.** delayed jobs + promoter ✅; backoff + jitter ✅; priority ✅; idempotency ✅; rate limiting ✅; Prometheus metrics ✅. 3. **Phase 3 — polish:** dashboard; docker-compose demo; deployed demo; README + diagram. 4. **Future work (NOT now):** Postgres-backed (`SKIP LOCKED`) mode; exactly-once via consumer outbox. @@ -144,8 +151,10 @@ Use `internal/` for everything not meant as a public import surface. `cmd/` hold - **Module:** `github.com/StrangeNoob/relay`. - **Toolchain:** `go 1.24` with `toolchain go1.25.11` pinned in `go.mod` (go-redis v9 needs ≥1.24). If a `go1.24` toolchain download fails, the pin makes the build use the already-cached 1.25.x. -- **Only dependency:** `github.com/redis/go-redis/v9` — a Redis *driver*, not a queue library; it - does not violate the "no queue dependency" rule. The queue logic is ours. +- **Direct dependencies (two):** + - `github.com/redis/go-redis/v9` — a Redis *driver*, not a queue library. + - `github.com/prometheus/client_golang` — a metrics instrumentation library, not a queue library. + Neither violates the "build the queue from scratch on Redis" rule. The queue logic is ours. - **Tests need a real Redis** at `localhost:6379` (override with `REDIS_ADDR`). They use **DB 15** and `FlushDB` per test, and **skip** (not fail) when Redis is unreachable — so a green local run with no Redis means the broker/worker suites were skipped. CI provides a Redis service. From 80b6c9e938b302150e85260ad489244fda3a4822 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 19:07:30 +0530 Subject: [PATCH 18/19] Move test-only CounterForTest accessor into export_test.go --- internal/metrics/export_test.go | 33 +++++++++++++++++++++++++++++++++ internal/metrics/recorder.go | 28 ---------------------------- 2 files changed, 33 insertions(+), 28 deletions(-) create mode 100644 internal/metrics/export_test.go diff --git a/internal/metrics/export_test.go b/internal/metrics/export_test.go new file mode 100644 index 0000000..6ea9ddc --- /dev/null +++ b/internal/metrics/export_test.go @@ -0,0 +1,33 @@ +package metrics + +import "github.com/prometheus/client_golang/prometheus" + +// CounterForTest returns the per-queue child counter for the named metric so +// tests in the external metrics_test package can read a specific series. It +// lives in an _test.go file so it never ships in the production build. "name" is +// the short key (enqueued, deduped, claimed, processed, retried, dead, reaped, +// promoted); an unknown name returns nil. +func (r *Recorder) CounterForTest(name, queue string) prometheus.Counter { + var v *prometheus.CounterVec + switch name { + case "enqueued": + v = r.enqueued + case "deduped": + v = r.deduped + case "claimed": + v = r.claimed + case "processed": + v = r.processed + case "retried": + v = r.retried + case "dead": + v = r.dead + case "reaped": + v = r.reaped + case "promoted": + v = r.promoted + default: + return nil + } + return v.WithLabelValues(queue) +} diff --git a/internal/metrics/recorder.go b/internal/metrics/recorder.go index 75370d5..3dd3508 100644 --- a/internal/metrics/recorder.go +++ b/internal/metrics/recorder.go @@ -84,31 +84,3 @@ func (r *Recorder) AddPromoted(q string, n int) { func (r *Recorder) ObserveLatency(q string, d time.Duration) { r.latency.WithLabelValues(q).Observe(d.Seconds()) } - -// CounterForTest returns the per-queue child counter for the named metric. It -// exists so tests can read a specific series; "name" is the short key -// (enqueued, deduped, claimed, processed, retried, dead, reaped, promoted). -func (r *Recorder) CounterForTest(name, queue string) prometheus.Counter { - var v *prometheus.CounterVec - switch name { - case "enqueued": - v = r.enqueued - case "deduped": - v = r.deduped - case "claimed": - v = r.claimed - case "processed": - v = r.processed - case "retried": - v = r.retried - case "dead": - v = r.dead - case "reaped": - v = r.reaped - case "promoted": - v = r.promoted - default: - return nil - } - return v.WithLabelValues(queue) -} From c3236396c3f879df91682b3be240a1f10f3b7eef Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 19:31:10 +0530 Subject: [PATCH 19/19] Document per-package test Redis DB isolation (broker 15 / worker 14 / metrics 13) --- CLAUDE.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 497f218..01e5e94 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -155,9 +155,11 @@ Use `internal/` for everything not meant as a public import surface. `cmd/` hold - `github.com/redis/go-redis/v9` — a Redis *driver*, not a queue library. - `github.com/prometheus/client_golang` — a metrics instrumentation library, not a queue library. Neither violates the "build the queue from scratch on Redis" rule. The queue logic is ours. -- **Tests need a real Redis** at `localhost:6379` (override with `REDIS_ADDR`). They use **DB 15** - and `FlushDB` per test, and **skip** (not fail) when Redis is unreachable — so a green local run - with no Redis means the broker/worker suites were skipped. CI provides a Redis service. +- **Tests need a real Redis** at `localhost:6379` (override with `REDIS_ADDR`). Each Redis-using + package claims its own logical DB so `go test ./...` runs them in parallel without flushing each + other (broker → **DB 15**, worker → **DB 14**, metrics → **DB 13**; a new one picks another), with + `FlushDB` per test, and they **skip** (not fail) when Redis is unreachable — so a green local run + with no Redis means those suites were skipped. CI provides a Redis service. ```sh go build ./...