Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
5e49070
Add Phase 2 metrics design spec: Prometheus instrumentation
StrangeNoob Jun 8, 2026
2f6d8ef
Add Phase 2 metrics implementation plan
StrangeNoob Jun 8, 2026
3ae6112
Add broker Metrics interface, noop default, and WithMetrics option
StrangeNoob Jun 8, 2026
cc71020
Make WithMetrics install test non-vacuous; clarify latency doc
StrangeNoob Jun 8, 2026
5f5e9de
Instrument Enqueue with enqueued/deduplicated metrics
StrangeNoob Jun 8, 2026
4ff3c7f
Use errors.Is for ErrDuplicate check, matching file convention
StrangeNoob Jun 8, 2026
f710400
Instrument Claim with claimed metric
StrangeNoob Jun 8, 2026
eee3fb8
Instrument Ack with processed counter and end-to-end latency
StrangeNoob Jun 8, 2026
616e61c
Instrument Nack with retried/dead metrics from script outcome
StrangeNoob Jun 8, 2026
6ad96a2
Comment Nack outcome switch's intentional no-match behavior
StrangeNoob Jun 8, 2026
41fc123
Instrument Reap and Promote with batch count metrics
StrangeNoob Jun 8, 2026
f62ddd1
Widen promote metric test timing margin for CI reliability
StrangeNoob Jun 8, 2026
1fcfe9b
Add internal/metrics Recorder implementing broker.Metrics over Promet…
StrangeNoob Jun 8, 2026
3e40d38
Add DepthCollector reporting per-queue depth gauges at scrape time
StrangeNoob Jun 8, 2026
d2639b4
Check Redis seed errors in depth collector test
StrangeNoob Jun 8, 2026
006d45f
Serve Prometheus /metrics from cmd/worker behind --metrics-addr
StrangeNoob Jun 8, 2026
26ab09e
Document Phase 2 completion: Prometheus metrics
StrangeNoob Jun 8, 2026
80b6c9e
Move test-only CounterForTest accessor into export_test.go
StrangeNoob Jun 8, 2026
c323639
Document per-package test Redis DB isolation (broker 15 / worker 14 /…
StrangeNoob Jun 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 26 additions & 15 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,30 @@ project: the point is to *prove understanding of queue internals*, not to wrap a
library. Do not introduce a queue dependency (BullMQ, asynq, Machinery, Celery, etc.) — the
mechanics are the deliverable.

**Status: Phase 1 complete; Phase 2 nearly complete.** The core engine plus delayed jobs, the
promoter, retry backoff, priority, idempotency enforcement, and per-queue rate limiting are built,
tested against a real Redis under `-race`, and CI is green (only Prometheus metrics remain in
Phase 2). Repo: <https://github.com/StrangeNoob/relay>. What exists today:
**Status: Phase 1 complete; Phase 2 complete.** The core engine plus delayed jobs, the promoter,
retry backoff, priority, idempotency enforcement, per-queue rate limiting, and Prometheus metrics
are built, tested against a real Redis under `-race`, and CI is green. Phase 3 (API/dashboard) is
next. Repo: <https://github.com/StrangeNoob/relay>. What exists today:

- `internal/job` — the `Job` model + Redis-hash encoding (`ToHash`/`FromHash`).
- `internal/broker` — `Enqueue` (with `WithDelay`/`WithReadyAt`/`WithPriority`/`WithIdempotencyKey` options), atomic `Claim`, `Ack`,
`Nack` (full-jitter backoff via the delayed set), `Reap`, `Promote`, `Extend` (heartbeat), with
Lua under `internal/broker/scripts/`: `enqueue.lua`, `claim.lua`, `ack.lua`, `nack.lua`,
`reaper.lua`, `promote.lua`, `heartbeat.lua`. Broker options: `WithBackoff`, `WithDedupTTL`,
`WithRateLimit(queue, rate, burst)` (token-bucket per-queue rate limiting via Redis hash).
`WithRateLimit(queue, rate, burst)` (token-bucket per-queue rate limiting via Redis hash),
`WithMetrics(m)` (installs a `broker.Metrics` implementation; default is a no-op).
- `internal/metrics` — Prometheus `Recorder` (implements `broker.Metrics`; counters
`relay_jobs_*_total`, histogram `relay_job_latency_seconds`, all labelled by queue) and
`DepthCollector` (a `prometheus.Collector` reporting `relay_queue_depth{queue,state}` gauges
by reading ZCARD/LLEN at scrape time).
- `internal/worker` — `Worker` (claim loop, dispatch, heartbeat, graceful shutdown), plus `Reaper`
and `Promoter` background loops sharing one `runDrainLoop` helper.
- `cmd/worker`, `cmd/demo` — thin runnable entrypoints (worker pool + reaper + promoter; load
generator with `--delay`).
generator with `--delay`). `cmd/worker` accepts `--metrics-addr` (default "" = off); when set,
serves `/metrics` and registers the depth collector with graceful shutdown.
- `.github/workflows/ci.yml` — Redis service + `go test -race` + `golangci-lint`.

Remaining Phase 2 (Prometheus metrics) and Phase 3 (API/dashboard/`cmd/server`, docker-compose,
deploy) are **not** built yet.
Phase 3 (API/dashboard/`cmd/server`, docker-compose, deploy) is **not** built yet.

## Source of truth

Expand Down Expand Up @@ -65,6 +70,7 @@ spec disagree, the spec wins until the spec is deliberately updated.
(`broker.WithBackoff`).
- **Idempotency is enqueue-only, TTL-window.** A keyed duplicate is dropped within the dedup TTL (default 24h, `WithDedupTTL`); the key is not released on completion. Delivery remains at-least-once — consumers needing exactly-once effects still dedup on the key.
- **Rate-limit config is per-worker, not stored in Redis.** All workers on a queue must register the same `WithRateLimit` (they share one Redis bucket and pass rate/burst on every claim); mismatched configs refill inconsistently. A rate-limited claim is indistinguishable from an empty queue to the worker (it polls again).
- **Metrics are per-process and opt-in.** `broker.WithMetrics` installs a Prometheus recorder (default is a no-op); `cmd/worker --metrics-addr` serves `/metrics`. Counters/latency are per worker process — aggregate across workers in Prometheus. Queue-depth gauges read shared Redis at scrape time (one round-trip per queue/state), so every worker reports the same depths (aggregate with max/avg, not sum). Label cardinality is per queue. The endpoint lives on `cmd/worker` until the Phase 3 server exists.

## Redis data model & job lifecycle (the architecture in brief)

Expand Down Expand Up @@ -111,7 +117,8 @@ internal/job/ # ✅ job model + hash encoding
internal/broker/ # ✅ enqueue/claim/ack/nack/reap/promote/extend
internal/broker/scripts/*.lua # ✅ enqueue, claim, ack, nack, reaper, promote, heartbeat (go:embed)
internal/worker/ # ✅ Worker + Reaper + Promoter runtime
internal/{client,api,metrics}/ # ◻ producer SDK / HTTP API / Prometheus (Phase 2–3)
internal/metrics/ # ✅ Prometheus Recorder + DepthCollector
internal/{client,api}/ # ◻ producer SDK / HTTP API (Phase 3)
web/ # ◻ embedded dashboard assets (Phase 3)
deployments/docker-compose.yml # ◻ redis + server + N workers + demo (Phase 3)
.github/workflows/ci.yml # ✅ Redis service + go test -race + golangci-lint
Expand All @@ -123,7 +130,7 @@ Use `internal/` for everything not meant as a public import surface. `cmd/` hold
## Build order (do not jump ahead)

1. **Phase 1 — core: ✅ done.** job model; enqueue/claim/ack/nack Lua; reaper; worker runtime; basic DLQ; integration tests; CI. A working, testable queue ships first.
2. **Phase 2 — depth (in progress):** delayed jobs + promoter ✅; backoff + jitter ✅; priority ✅; idempotency ✅; rate limiting ✅; Prometheus metrics still to do.
2. **Phase 2 — depth: ✅ done.** delayed jobs + promoter ✅; backoff + jitter ✅; priority ✅; idempotency ✅; rate limiting ✅; Prometheus metrics .
3. **Phase 3 — polish:** dashboard; docker-compose demo; deployed demo; README + diagram.
4. **Future work (NOT now):** Postgres-backed (`SKIP LOCKED`) mode; exactly-once via consumer outbox.

Expand All @@ -144,11 +151,15 @@ Use `internal/` for everything not meant as a public import surface. `cmd/` hold
- **Module:** `github.com/StrangeNoob/relay`.
- **Toolchain:** `go 1.24` with `toolchain go1.25.11` pinned in `go.mod` (go-redis v9 needs ≥1.24).
If a `go1.24` toolchain download fails, the pin makes the build use the already-cached 1.25.x.
- **Only dependency:** `github.com/redis/go-redis/v9` — a Redis *driver*, not a queue library; it
does not violate the "no queue dependency" rule. The queue logic is ours.
- **Tests need a real Redis** at `localhost:6379` (override with `REDIS_ADDR`). They use **DB 15**
and `FlushDB` per test, and **skip** (not fail) when Redis is unreachable — so a green local run
with no Redis means the broker/worker suites were skipped. CI provides a Redis service.
- **Direct dependencies (two):**
- `github.com/redis/go-redis/v9` — a Redis *driver*, not a queue library.
- `github.com/prometheus/client_golang` — a metrics instrumentation library, not a queue library.
Neither violates the "build the queue from scratch on Redis" rule. The queue logic is ours.
- **Tests need a real Redis** at `localhost:6379` (override with `REDIS_ADDR`). Each Redis-using
package claims its own logical DB so `go test ./...` runs them in parallel without flushing each
other (broker → **DB 15**, worker → **DB 14**, metrics → **DB 13**; a new one picks another), with
`FlushDB` per test, and they **skip** (not fail) when Redis is unreachable — so a green local run
with no Redis means those suites were skipped. CI provides a Redis service.

```sh
go build ./...
Expand Down
40 changes: 40 additions & 0 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ import (
"fmt"
"log/slog"
"math/rand"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/redis/go-redis/v9"

"github.com/StrangeNoob/relay/internal/broker"
"github.com/StrangeNoob/relay/internal/job"
"github.com/StrangeNoob/relay/internal/metrics"
"github.com/StrangeNoob/relay/internal/worker"
)

Expand All @@ -36,6 +39,7 @@ func main() {
backoffMax := flag.Duration("backoff-max", 10*time.Minute, "retry backoff ceiling")
rate := flag.Float64("rate", 0, "max claims/second for this queue (0 = unlimited)")
burst := flag.Int("burst", 0, "rate-limit burst capacity (defaults to 1 when --rate is set)")
metricsAddr := flag.String("metrics-addr", "", "address to serve Prometheus /metrics on (e.g. :9090); empty = disabled")
flag.Parse()

logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
Expand All @@ -61,7 +65,32 @@ func main() {
}
brokerOpts = append(brokerOpts, broker.WithRateLimit(*queue, *rate, *burst))
}

// Metrics: build recorder and register a depth collector only when
// --metrics-addr is set; otherwise rec stays nil and all metric paths
// are skipped, preserving byte-identical behaviour to before.
var rec *metrics.Recorder
if *metricsAddr != "" {
rec = metrics.NewRecorder()
rec.Registry().MustRegister(metrics.NewDepthCollector(rdb, *queue))
brokerOpts = append(brokerOpts, broker.WithMetrics(rec))
}

b := broker.New(rdb, brokerOpts...)
// Start the Prometheus metrics HTTP server when --metrics-addr is set.
var metricsSrv *http.Server
if rec != nil {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(rec.Registry(), promhttp.HandlerOpts{}))
metricsSrv = &http.Server{Addr: *metricsAddr, Handler: mux}
go func() {
logger.Info("metrics server listening", "addr", *metricsAddr)
if err := metricsSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Error("metrics server error", "err", err)
}
}()
}

handler := demoHandler(*failRate, logger)

var wg sync.WaitGroup
Expand Down Expand Up @@ -98,6 +127,17 @@ func main() {
<-ctx.Done()
logger.Info("shutdown signal received, draining in-flight jobs")
wg.Wait()

// Shut down the metrics server after all workers have drained, so the
// final scrape can still observe counters from the last batch of jobs.
if metricsSrv != nil {
shutCtx, shutCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutCancel()
if err := metricsSrv.Shutdown(shutCtx); err != nil {
logger.Error("metrics server shutdown error", "err", err)
}
}

logger.Info("relay worker stopped cleanly")
}

Expand Down
Loading
Loading