Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 40 additions & 24 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,41 @@ project: the point is to *prove understanding of queue internals*, not to wrap a
library. Do not introduce a queue dependency (BullMQ, asynq, Machinery, Celery, etc.) — the
mechanics are the deliverable.

**Status: Phase 1 complete; Phase 2 complete.** The core engine plus delayed jobs, the promoter,
retry backoff, priority, idempotency enforcement, per-queue rate limiting, and Prometheus metrics
are built, tested against a real Redis under `-race`, and CI is green. Phase 3 (API/dashboard) is
next. Repo: <https://github.com/StrangeNoob/relay>. What exists today:
**Status: Phase 1 complete; Phase 2 complete; Phase 3 in progress — 3a (HTTP API + server) ✅
done.** The core engine plus delayed jobs, the promoter, retry backoff, priority, idempotency
enforcement, per-queue rate limiting, Prometheus metrics, and the JSON REST API + server are
built, tested against a real Redis under `-race`, and CI is green. Dashboard (3b), producer SDK
(3c), and packaging/deploy (3d) remain. Repo: <https://github.com/StrangeNoob/relay>. What
exists today:

- `internal/job` — the `Job` model + Redis-hash encoding (`ToHash`/`FromHash`).
- `internal/broker` — `Enqueue` (with `WithDelay`/`WithReadyAt`/`WithPriority`/`WithIdempotencyKey` options), atomic `Claim`, `Ack`,
`Nack` (full-jitter backoff via the delayed set), `Reap`, `Promote`, `Extend` (heartbeat), with
`Nack` (full-jitter backoff via the delayed set), `Reap`, `Promote`, `Extend` (heartbeat),
`Stats` (ZCARD/LLEN snapshot per queue), `ListDLQ` (paged DLQ inspection), `RequeueDLQ`
(atomic dlq→ready reset via `requeue.lua`), `Queues` (SCAN-based queue discovery), with
Lua under `internal/broker/scripts/`: `enqueue.lua`, `claim.lua`, `ack.lua`, `nack.lua`,
`reaper.lua`, `promote.lua`, `heartbeat.lua`. Broker options: `WithBackoff`, `WithDedupTTL`,
`WithRateLimit(queue, rate, burst)` (token-bucket per-queue rate limiting via Redis hash),
`WithMetrics(m)` (installs a `broker.Metrics` implementation; default is a no-op).
`reaper.lua`, `promote.lua`, `heartbeat.lua`, `requeue.lua`. Broker options: `WithBackoff`,
`WithDedupTTL`, `WithRateLimit(queue, rate, burst)` (token-bucket per-queue rate limiting via
Redis hash), `WithMetrics(m)` (installs a `broker.Metrics` implementation; default is a no-op).
- `internal/metrics` — Prometheus `Recorder` (implements `broker.Metrics`; counters
`relay_jobs_*_total`, histogram `relay_job_latency_seconds`, all labelled by queue) and
`DepthCollector` (a `prometheus.Collector` reporting `relay_queue_depth{queue,state}` gauges
by reading ZCARD/LLEN at scrape time).
- `internal/worker` — `Worker` (claim loop, dispatch, heartbeat, graceful shutdown), plus `Reaper`
and `Promoter` background loops sharing one `runDrainLoop` helper.
- `internal/api` — JSON REST API over stdlib `net/http` (Go 1.22 method+path routing). Endpoints:
`POST /api/queues/{queue}/jobs` (enqueue; 409 on idempotency dup), `GET /api/queues/{queue}/stats`,
`GET /api/queues/{queue}/dlq?limit=&offset=`, `POST /api/queues/{queue}/dlq/{id}/requeue`
(404 if not in DLQ), `GET /api/queues`. Constructed via `api.New(b, logger) http.Handler`.
- `cmd/worker`, `cmd/demo` — thin runnable entrypoints (worker pool + reaper + promoter; load
generator with `--delay`). `cmd/worker` accepts `--metrics-addr` (default "" = off); when set,
serves `/metrics` and registers the depth collector with graceful shutdown.
- `cmd/server` — wires Redis + broker (with a `metrics.Recorder` so API enqueues are counted) +
the API handler + `/metrics` + `/healthz`; graceful shutdown on SIGINT/SIGTERM. Flags: `-addr`,
`-redis`, `-queues` (comma-separated queues for the depth collector).
- `.github/workflows/ci.yml` — Redis service + `go test -race` + `golangci-lint`.

Phase 3 (API/dashboard/`cmd/server`, docker-compose, deploy) is **not** built yet.
Dashboard (3b), producer SDK (3c), and packaging/deploy (3d) are **not** built yet.

## Source of truth

Expand Down Expand Up @@ -70,7 +81,8 @@ spec disagree, the spec wins until the spec is deliberately updated.
(`broker.WithBackoff`).
- **Idempotency is enqueue-only, TTL-window.** A keyed duplicate is dropped within the dedup TTL (default 24h, `WithDedupTTL`); the key is not released on completion. Delivery remains at-least-once — consumers needing exactly-once effects still dedup on the key.
- **Rate-limit config is per-worker, not stored in Redis.** All workers on a queue must register the same `WithRateLimit` (they share one Redis bucket and pass rate/burst on every claim); mismatched configs refill inconsistently. A rate-limited claim is indistinguishable from an empty queue to the worker (it polls again).
- **Metrics are per-process and opt-in.** `broker.WithMetrics` installs a Prometheus recorder (default is a no-op); `cmd/worker --metrics-addr` serves `/metrics`. Counters/latency are per worker process — aggregate across workers in Prometheus. Queue-depth gauges read shared Redis at scrape time (one round-trip per queue/state), so every worker reports the same depths (aggregate with max/avg, not sum). Label cardinality is per queue. The endpoint lives on `cmd/worker` until the Phase 3 server exists.
- **Metrics are per-process and opt-in.** `broker.WithMetrics` installs a Prometheus recorder (default is a no-op); `cmd/worker --metrics-addr` serves `/metrics`. Counters/latency are per worker process — aggregate across workers in Prometheus. Queue-depth gauges read shared Redis at scrape time (one round-trip per queue/state), so every worker reports the same depths (aggregate with max/avg, not sum). Label cardinality is per queue. `cmd/server` also exposes `/metrics`; depth gauges there cover only the queues listed in `-queues` at startup.
- **HTTP API is demo-grade, no auth.** The API server has no authentication or authorization layer. Payloads are treated as UTF-8 strings (base64 encoding for binary payloads is a future addition). DLQ paging is offset/limit (no cursor). `Queues` discovery uses Redis SCAN (eventually-consistent) and sorts results in Go.

## Redis data model & job lifecycle (the architecture in brief)

Expand All @@ -83,7 +95,7 @@ and the whole engine follows:
| `job:{id}` | hash | — | full job. Fields: `id, queue, payload, state, attempts, max_retries, created_at, idempotency_key`. **No deadline field** — the deadline lives only as the `inflight` ZSET score. |
| `q:{name}:ready` | ZSET | priority | claimable now; claim pops the best score = priority (higher first), oldest-first within a priority |
| `q:{name}:inflight` | ZSET | visibility deadline | claimed-not-acked; **reaper scans this for expiry** |
| `q:{name}:dlq` | list | — | exhausted jobs (inspect/requeue surface is Phase 3) |
| `q:{name}:dlq` | list | — | exhausted jobs; inspect via `ListDLQ` + requeue via `RequeueDLQ` (dlq→ready, attempts reset to 0, atomic `requeue.lua`) |
| `q:{name}:delayed` | ZSET | ready-at ts | scheduled + backoff jobs; **promoter scans this** and moves due ones (`ready-at ≤ now`) to `ready` |
| `q:{name}:dedup:{key}` | string | — | per-key string with TTL; **enqueue dedup** — a keyed duplicate is dropped with ErrDuplicate |
| `q:{name}:ratelimit` | hash | — | per-queue token bucket (`tokens`, `ts`); claim consumes a token only on a successful pop |
Expand All @@ -101,26 +113,29 @@ enqueue(WithDelay) → delayed ──[promoter: ready-at≤now]──→ ready
nack → attempts<maxRetries ? delayed (now + full-jitter backoff) : dlq
reaper (bg): inflight where deadline<=now → ready # at-least-once on crash
promoter (bg): delayed where ready-at<=now → ready # releases scheduled + backed-off jobs
requeue (operator): dlq → ready (attempts reset to 0) # RequeueDLQ via the API
```

Two background loops (reaper, promoter) plus the worker claim loop are the only things that
move jobs between states. Heartbeat (`broker.Extend`, `ZADD XX`) pushes a job's `inflight`
deadline forward while a long handler runs, so the reaper does not reclaim live work.
Two background loops (reaper, promoter) plus the worker claim loop move jobs between states
automatically; the only operator-driven transition is `RequeueDLQ` (dlq→ready, exposed via the
API). Heartbeat (`broker.Extend`, `ZADD XX`) pushes a job's `inflight` deadline forward while a
long handler runs, so the reaper does not reclaim live work.

## Layout (✅ built · ◻ planned)

```
cmd/worker/main.go # ✅ worker pool + reaper + promoter daemon
cmd/demo/main.go # ✅ load generator (--delay)
cmd/server/main.go # API+dashboard server (Phase 3)
cmd/server/main.go # API + /metrics + /healthz server (Phase 3a)
internal/job/ # ✅ job model + hash encoding
internal/broker/ # ✅ enqueue/claim/ack/nack/reap/promote/extend
internal/broker/scripts/*.lua # ✅ enqueue, claim, ack, nack, reaper, promote, heartbeat (go:embed)
internal/broker/ # ✅ enqueue/claim/ack/nack/reap/promote/extend/stats/dlq/queues
internal/broker/scripts/*.lua # ✅ enqueue, claim, ack, nack, reaper, promote, heartbeat, requeue (go:embed)
internal/worker/ # ✅ Worker + Reaper + Promoter runtime
internal/metrics/ # ✅ Prometheus Recorder + DepthCollector
internal/{client,api}/ # ◻ producer SDK / HTTP API (Phase 3)
web/ # ◻ embedded dashboard assets (Phase 3)
deployments/docker-compose.yml # ◻ redis + server + N workers + demo (Phase 3)
internal/api/ # ✅ JSON REST API handler (Phase 3a)
internal/client/ # ◻ producer SDK (Phase 3c)
web/ # ◻ embedded dashboard assets (Phase 3b)
deployments/docker-compose.yml # ◻ redis + server + N workers + demo (Phase 3d)
.github/workflows/ci.yml # ✅ Redis service + go test -race + golangci-lint
```

Expand All @@ -131,7 +146,7 @@ Use `internal/` for everything not meant as a public import surface. `cmd/` hold

1. **Phase 1 — core: ✅ done.** job model; enqueue/claim/ack/nack Lua; reaper; worker runtime; basic DLQ; integration tests; CI. A working, testable queue ships first.
2. **Phase 2 — depth: ✅ done.** delayed jobs + promoter ✅; backoff + jitter ✅; priority ✅; idempotency ✅; rate limiting ✅; Prometheus metrics ✅.
3. **Phase 3 — polish:** dashboard; docker-compose demo; deployed demo; README + diagram.
3. **Phase 3 — polish (in progress):** 3a HTTP API + server ✅ done; 3b dashboard (web/); 3c producer SDK (`internal/client`); 3d docker-compose + deploy + README diagram.
4. **Future work (NOT now):** Postgres-backed (`SKIP LOCKED`) mode; exactly-once via consumer outbox.

## Conventions
Expand All @@ -157,9 +172,9 @@ Use `internal/` for everything not meant as a public import surface. `cmd/` hold
Neither violates the "build the queue from scratch on Redis" rule. The queue logic is ours.
- **Tests need a real Redis** at `localhost:6379` (override with `REDIS_ADDR`). Each Redis-using
package claims its own logical DB so `go test ./...` runs them in parallel without flushing each
other (broker → **DB 15**, worker → **DB 14**, metrics → **DB 13**; a new one picks another), with
`FlushDB` per test, and they **skip** (not fail) when Redis is unreachable — so a green local run
with no Redis means those suites were skipped. CI provides a Redis service.
other (broker → **DB 15**, worker → **DB 14**, metrics → **DB 13**, api → **DB 12**; a new one
picks another), with `FlushDB` per test, and they **skip** (not fail) when Redis is unreachable
— so a green local run with no Redis means those suites were skipped. CI provides a Redis service.

```sh
go build ./...
Expand All @@ -169,6 +184,7 @@ golangci-lint run # CI pins v2.12.2; default linters,
# run it end to end against a local Redis:
go run ./cmd/worker -queue demo -concurrency 4 & # worker pool + reaper
go run ./cmd/demo -queue demo -count 100 # enqueue load
go run ./cmd/server -queues demo # API :8080 + /metrics + /healthz
```

Keep this section updated as the Makefile / docker-compose take shape.
99 changes: 99 additions & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Command server runs Relay's HTTP control surface: the JSON API plus a
// Prometheus /metrics endpoint and a health check. It is a thin wiring layer;
// all behaviour lives in internal/api, internal/broker, and internal/metrics.
package main

import (
"context"
"flag"
"log/slog"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/redis/go-redis/v9"

"github.com/StrangeNoob/relay/internal/api"
"github.com/StrangeNoob/relay/internal/broker"
"github.com/StrangeNoob/relay/internal/metrics"
)

func main() {
addr := flag.String("addr", ":8080", "HTTP listen address")
redisAddr := flag.String("redis", envOr("REDIS_ADDR", "localhost:6379"), "Redis address")
queuesFlag := flag.String("queues", "", "comma-separated queues for the /metrics depth collector")
flag.Parse()

logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
slog.SetDefault(logger)

ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

rdb := redis.NewClient(&redis.Options{Addr: *redisAddr})
defer func() { _ = rdb.Close() }()
if err := rdb.Ping(ctx).Err(); err != nil {
logger.Error("cannot reach redis", "addr", *redisAddr, "err", err)
os.Exit(1)
}

// A metrics recorder on the broker means API enqueues are counted; a depth
// collector for the configured queues exposes live gauges from the server.
rec := metrics.NewRecorder()
if qs := splitQueues(*queuesFlag); len(qs) > 0 {
rec.Registry().MustRegister(metrics.NewDepthCollector(rdb, qs...))
}
b := broker.New(rdb, broker.WithMetrics(rec))

mux := http.NewServeMux()
mux.Handle("/api/", api.New(b, logger))
mux.Handle("/metrics", promhttp.HandlerFor(rec.Registry(), promhttp.HandlerOpts{}))
mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
})

srv := &http.Server{Addr: *addr, Handler: mux}
go func() {
logger.Info("relay server listening", "addr", *addr, "redis", *redisAddr)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Error("server error", "err", err)
}
}()

<-ctx.Done()
logger.Info("shutdown signal received")
shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(shutCtx); err != nil {
logger.Error("server shutdown error", "err", err)
}
logger.Info("relay server stopped cleanly")
}

// envOr returns the environment value for key, or def when it is unset/empty.
func envOr(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}

// splitQueues parses a comma-separated queue list, trimming blanks.
func splitQueues(s string) []string {
if strings.TrimSpace(s) == "" {
return nil
}
parts := strings.Split(s, ",")
out := make([]string, 0, len(parts))
for _, p := range parts {
if p = strings.TrimSpace(p); p != "" {
out = append(out, p)
}
}
return out
}
Loading
Loading