From 5e018b269e7e2413731c441907b768d44164dd14 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 19:49:44 +0530 Subject: [PATCH 01/13] Add Phase 3a design spec: HTTP API and server foundation --- ...026-06-08-relay-phase3a-http-api-design.md | 175 ++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 docs/superpowers/specs/2026-06-08-relay-phase3a-http-api-design.md diff --git a/docs/superpowers/specs/2026-06-08-relay-phase3a-http-api-design.md b/docs/superpowers/specs/2026-06-08-relay-phase3a-http-api-design.md new file mode 100644 index 0000000..62a7d90 --- /dev/null +++ b/docs/superpowers/specs/2026-06-08-relay-phase3a-http-api-design.md @@ -0,0 +1,175 @@ +# Relay — Phase 3a: HTTP API + Server Foundation + +**Status:** Approved design · **Date:** 2026-06-08 +**Parent spec:** [`2026-06-07-relay-distributed-task-queue-design.md`](2026-06-07-relay-distributed-task-queue-design.md) +**Phase:** 3 (polish) — first sub-project. Phase 3 decomposes into 3a (this), 3b live dashboard, +3c producer SDK, 3d packaging/deploy/README. + +## Purpose + +Give Relay an HTTP control surface and a long-running server process. Until now the only ways to +drive the queue are the Go broker API and the `cmd/worker`/`cmd/demo` binaries. This sub-project +adds a JSON REST API (`internal/api`) and an always-on server (`cmd/server`) that can enqueue jobs, +report per-queue stats, inspect the dead-letter queue, and requeue dead-lettered jobs — plus the +broker methods those need. It is the foundation the dashboard (3b) and producer SDK (3c) consume. + +## Scope + +In scope: + +- New broker read/admin methods: `Stats`, `ListDLQ`, `RequeueDLQ` (with a new atomic `requeue.lua`), + and `Queues` (discovery via `SCAN`). +- `internal/api`: a stdlib `net/http` handler exposing the REST endpoints below. +- `cmd/server`: wires Redis + broker + the API handler + `/metrics` + `/healthz`, with graceful + shutdown. API enqueues are counted via a Phase 2 `metrics.Recorder`. + +Out of scope: authentication/authorization (demo-grade); binary payloads (UTF-8 string payloads +only; base64 is a documented future option); cursor pagination (simple offset/limit over the DLQ +list); a rate-limit configuration API; the dashboard UI (3b); the producer SDK (3c); docker-compose +and deploy (3d). + +## Key decisions + +| Decision | Choice | Rationale | +|---|---|---| +| HTTP stack | **stdlib `net/http` with 1.22 `ServeMux` pattern routing** | No new dependency (project ships only go-redis + prometheus); method+path patterns (`POST /api/queues/{queue}/jobs`) cover routing; fully testable with `httptest`. | +| API style | **JSON REST** | Simple, dashboard- and SDK-friendly, language-agnostic. | +| Requeue attempts | **Reset to 0** | A DLQ job exhausted its retry budget; a manual requeue is a deliberate "try again from scratch", so it gets a fresh budget. | +| Requeue atomicity | **New `requeue.lua`** (LREM + reset + ZADD ready) | The move out of the DLQ and back into ready must be one atomic step, consistent with the "every state transition is atomic Lua" invariant. | +| Ready score on requeue | **Recomputed in Lua** from the hash's priority + `now` + `priorityScale` | Matches how `promote.lua`/`reaper.lua` already rebuild ready scores; keeps priority ordering correct. | +| Queue discovery | **`Queues(ctx)` via `SCAN`** | The dashboard needs queue names without hardcoding; `SCAN` is non-blocking. | +| Metrics on server | **Mount `/metrics`; install a `Recorder` on the server's broker** | API enqueues get counted (`relay_jobs_enqueued_total`); a `DepthCollector` for the configured queues exposes gauges from the always-on server. Reuses Phase 2. | +| Payload encoding | **JSON string (UTF-8 → bytes)** | Friendly for the common case; binary via base64 is deferred. | + +## Components & changes + +### `internal/broker` + +- `type Stats struct { Ready, Inflight, Delayed, DLQ int64 }`. +- `Stats(ctx context.Context, queue string) (Stats, error)` — one pipeline issuing `ZCARD` on + ready/inflight/delayed and `LLEN` on dlq; maps results into `Stats`. +- `ListDLQ(ctx context.Context, queue string, limit, offset int64) ([]job.Job, error)` — `LRANGE` + `q:{queue}:dlq` over `[offset, offset+limit)` to get ids, then `HGETALL` each into a `job.Job` + via the existing `FromHash`. `limit <= 0` uses a default (e.g. 50); a hard max (e.g. 1000) caps + it. An id whose hash is missing is skipped (it was already cleaned up). +- `RequeueDLQ(ctx context.Context, queue, id string) (bool, error)` — runs `requeue.lua`. Returns + `(false, nil)` when the id was not present in the DLQ (so the API can answer `404`). +- `Queues(ctx context.Context) ([]string, error)` — `SCAN` (cursor loop, `MATCH q:*:*`, + reasonable `COUNT`) collecting distinct queue names by stripping the `q:` prefix and the + `:{suffix}` (ready/inflight/delayed/dlq/delayed/ratelimit/dedup:*) — parse the name as the segment + between the first `q:` and the next `:`. Deduplicated, sorted for stable output. + +### `internal/broker/scripts/requeue.lua` (new, `go:embed`) + +``` +-- KEYS[1] = dlq list q:{name}:dlq +-- KEYS[2] = ready set q:{name}:ready +-- ARGV[1] = job id +-- ARGV[2] = job hash key prefix ("job:") +-- ARGV[3] = now (unix ms) +-- ARGV[4] = priority scale (for the ready score) +-- Returns 1 if the job was requeued, 0 if it was not in the DLQ. + +local removed = redis.call('LREM', KEYS[1], 1, ARGV[1]) +if removed == 0 then + return 0 +end +local job_key = ARGV[2] .. ARGV[1] +local priority = tonumber(redis.call('HGET', job_key, 'priority')) or 0 +redis.call('HSET', job_key, 'state', 'ready', 'attempts', 0) +local score = priority * tonumber(ARGV[4]) - tonumber(ARGV[3]) +redis.call('ZADD', KEYS[2], score, ARGV[1]) +return 1 +``` + +### `internal/api` + +`New(b *broker.Broker) http.Handler` returns a configured `*http.ServeMux`. Routes: + +| Method + pattern | Handler behavior | +|---|---| +| `POST /api/queues/{queue}/jobs` | Decode `{payload string, delay_ms?: int, priority?: int, idempotency_key?: string}`. Build a `job.New(queue, []byte(payload))`; apply `WithDelay`/`WithPriority`/`WithIdempotencyKey` from the present fields; `Enqueue`. `201 {id, state}`; `broker.ErrDuplicate` → `409`. | +| `GET /api/queues/{queue}/stats` | `Stats` → `200 {ready, inflight, delayed, dlq}`. | +| `GET /api/queues/{queue}/dlq?limit=&offset=` | Parse/clamp `limit`/`offset`; `ListDLQ` → `200 [job…]`. | +| `POST /api/queues/{queue}/dlq/{id}/requeue` | `RequeueDLQ` → `200 {requeued:true}`; not found → `404`. | +| `GET /api/queues` | `Queues` → `200 [name…]`. | + +Cross-cutting: a `writeJSON(w, status, v)` helper and a `writeError(w, status, msg)` helper that +emits `{ "error": "..." }`. Bad JSON or non-integer query params → `400`. Broker errors → `500` +(logged via the handler's `*slog.Logger`, injected so tests can silence it). A wrong method on a +known path yields `405` from `ServeMux` automatically. The job JSON shape exposes +`id, queue, payload, state, attempts, max_retries, priority, created_at, idempotency_key` (payload +rendered as a string). + +### `cmd/server` + +Flags: `-addr` (default `:8080`), `-redis` (default from `REDIS_ADDR` or `localhost:6379`), +`-queues` (comma-separated list for the depth collector; empty = none). Builds +`rec := metrics.NewRecorder()`, `b := broker.New(rdb, broker.WithMetrics(rec))`, registers +`metrics.NewDepthCollector(rdb, queues...)` on `rec.Registry()`. Mounts a top-level mux: +`/api/` → `api.New(b)`, `/metrics` → `promhttp.HandlerFor(rec.Registry(), …)`, `/healthz` → `200`. +Graceful shutdown on SIGINT/SIGTERM (`http.Server.Shutdown` with a timeout). + +## Error handling + +- `400` — malformed JSON body, missing/empty required fields, non-integer query params. +- `404` — requeue of an id not present in the queue's DLQ. +- `405` — wrong method on a defined path (stdlib `ServeMux`). +- `409` — enqueue rejected as an idempotency-key duplicate (`ErrDuplicate`). +- `500` — broker/Redis errors; logged, generic `{ "error": "internal error" }` to the client. +- All errors share the `{ "error": "..." }` envelope. + +## Testing + +Real Redis where needed; skip (not fail) when unreachable. + +### `internal/broker` (DB 15) + +- `Stats`: enqueue N to ready, claim some (inflight), delay some, nack some to DLQ; assert each count. +- `ListDLQ`: drive K jobs to the DLQ; assert ids/fields, ordering, and that `limit`/`offset` page + correctly; assert a missing-hash id is skipped. +- `RequeueDLQ`: dead-letter a job; requeue it; assert it left the DLQ, is back in ready, `state=ready`, + `attempts=0`; requeue of an unknown id returns `(false, nil)`. Atomicity exercised by the single + script. +- `Queues`: seed several `q:*:*` keys; assert distinct, sorted names; empty Redis → empty slice. + +### `internal/api` (dedicated test **DB 12** — broker uses 15, worker 14, metrics 13) + +`httptest` against a real broker. End-to-end flows: + +- `POST …/jobs` → `201`, body has an id; a follow-up `GET …/stats` shows `ready == 1`. +- `POST …/jobs` twice with the same `idempotency_key` → second is `409`. +- Drive a job to the DLQ (enqueue→claim→nack with max retries 0), `GET …/dlq` lists it, then + `POST …/dlq/{id}/requeue` → `200`, and `GET …/stats` shows it back in `ready`, dlq `0`. +- `POST …/dlq/{unknown}/requeue` → `404`. +- `GET /api/queues` lists the queue used. +- Malformed JSON body → `400`. + +### `cmd/server` + +Build/vet only (consistent with prior phases). + +## Known limitations + +- **No authentication.** The API is open; intended for the demo/dashboard, not a hostile network. + A future hardening step would add an auth middleware. +- **UTF-8 string payloads.** The JSON `payload` is treated as a UTF-8 string; binary payloads would + need a base64 field (deferred). +- **Offset/limit DLQ paging over a Redis list.** Simple and fine at demo scale; very large DLQs + would want cursoring. `LRANGE` is O(offset+limit). +- **`Queues` uses `SCAN`.** Eventually-consistent and unordered at the Redis level; results are + deduped and sorted in Go. On a huge keyspace `SCAN` still iterates everything (bounded work per + call, multiple round-trips). +- **Server depth gauges use the `-queues` flag.** The `/metrics` depth collector reports only the + queues passed at startup; it does not auto-discover. (Discovery exists for the API but is not wired + into the collector in 3a.) + +## Invariants preserved + +- At-least-once delivery — the API is a control surface; `RequeueDLQ` is an explicit operator action + that moves a job dlq→ready atomically, consistent with the existing transition model. +- The atomic claim is sacred — unchanged; the new `requeue.lua` follows the same one-script-per- + transition rule. +- Crash safety via the reaper — untouched. +- Build the queue from scratch on Redis primitives — the API adds no queue library; routing is + stdlib `net/http`. From 3a6e1e30a9b68bc380db21faa2072db977826730 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 19:59:35 +0530 Subject: [PATCH 02/13] Add Phase 3a implementation plan: HTTP API and server --- .../2026-06-08-relay-phase3a-http-api.md | 1368 +++++++++++++++++ 1 file changed, 1368 insertions(+) create mode 100644 docs/superpowers/plans/2026-06-08-relay-phase3a-http-api.md diff --git a/docs/superpowers/plans/2026-06-08-relay-phase3a-http-api.md b/docs/superpowers/plans/2026-06-08-relay-phase3a-http-api.md new file mode 100644 index 0000000..4ee68bc --- /dev/null +++ b/docs/superpowers/plans/2026-06-08-relay-phase3a-http-api.md @@ -0,0 +1,1368 @@ +# Phase 3a HTTP API + Server Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Give Relay a JSON HTTP API (`internal/api`) and an always-on server (`cmd/server`) for enqueueing jobs, reading per-queue stats, inspecting the DLQ, and requeueing dead-lettered jobs — plus the broker methods those need. + +**Architecture:** `internal/api` is a thin stdlib `net/http` handler over a `*broker.Broker` (parse → call broker → encode JSON/status). `internal/broker` gains read/admin methods (`Stats`, `ListDLQ`, `RequeueDLQ` + atomic `requeue.lua`, `Queues`). `cmd/server` wires Redis + broker (with a Phase 2 metrics `Recorder` so API enqueues are counted) + the API handler + `/metrics` + `/healthz`, with graceful shutdown. + +**Tech Stack:** Go, stdlib `net/http` (1.22 `ServeMux` pattern routing), `github.com/redis/go-redis/v9`, `github.com/prometheus/client_golang` (already a dep), real-Redis integration tests. + +**Spec:** [`docs/superpowers/specs/2026-06-08-relay-phase3a-http-api-design.md`](../specs/2026-06-08-relay-phase3a-http-api-design.md) + +--- + +## File Structure + +- **Modify `internal/broker/broker.go`** — add `Stats`, `ListDLQ`, `RequeueDLQ`, `Queues` methods + the `Stats` struct + DLQ limit constants. +- **Create `internal/broker/scripts/requeue.lua`** — atomic dlq→ready move. +- **Modify `internal/broker/scripts.go`** — embed + register `requeueScript`. +- **Modify `internal/broker/broker_test.go`** — tests for the four new methods (DB 15). +- **Create `internal/api/api.go`** — the `http.Handler`: router, JSON helpers, `jobView`, all five handlers. +- **Create `internal/api/api_test.go`** — `httptest` end-to-end tests against a real broker (DB 12). +- **Create `cmd/server/main.go`** — server wiring. +- **Modify `CLAUDE.md`** — document the API/server, the new Lua script, the new broker methods, and Phase 3a status. + +--- + +## Task 1: Broker `Stats` + +**Files:** +- Modify: `internal/broker/broker.go` +- Test: `internal/broker/broker_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `internal/broker/broker_test.go`: + +```go +func TestStatsCountsEachState(t *testing.T) { + b, _ := newTestBroker(t) + ctx := context.Background() + + // 3 ready + for i := 0; i < 3; i++ { + if err := b.Enqueue(ctx, job.New("emails", []byte("r"))); err != nil { + t.Fatalf("Enqueue ready: %v", err) + } + } + // 1 delayed (far future so it stays in delayed) + if err := b.Enqueue(ctx, job.New("emails", []byte("d")), broker.WithDelay(time.Hour)); err != nil { + t.Fatalf("Enqueue delayed: %v", err) + } + // claim 2 -> inflight + for i := 0; i < 2; i++ { + if _, ok, err := b.Claim(ctx, "emails", time.Minute); err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v", ok, err) + } + } + // 1 dead-lettered: enqueue with no retry budget, claim, nack -> dlq + jd := job.New("emails", []byte("x")) + jd.MaxRetries = 0 + if err := b.Enqueue(ctx, jd); err != nil { + t.Fatalf("Enqueue dead: %v", err) + } + claimed, ok, err := b.Claim(ctx, "emails", time.Minute) + if err != nil || !ok { + t.Fatalf("Claim dead: ok=%v err=%v", ok, err) + } + if err := b.Nack(ctx, claimed); err != nil { + t.Fatalf("Nack: %v", err) + } + + s, err := b.Stats(ctx, "emails") + if err != nil { + t.Fatalf("Stats: %v", err) + } + // After the above: 3 enqueued ready, then 2 of the ready+dead pool were claimed. + // Careful accounting: 3 ready + 1 dead-candidate = 4 ready before claims; 2 claimed + // to inflight leaves 2 ready; the dead-candidate may or may not be among the claimed. + // To keep the assertion deterministic, assert the totals that are unambiguous: + if s.Delayed != 1 { + t.Errorf("Delayed = %d, want 1", s.Delayed) + } + if s.DLQ != 1 { + t.Errorf("DLQ = %d, want 1", s.DLQ) + } + if s.Ready+s.Inflight < 0 { // structural sanity; refined below + t.Errorf("unexpected counts: %+v", s) + } +} +``` + +NOTE on determinism: the mixed claim ordering above makes Ready/Inflight hard to assert exactly. Replace the final block with a deterministic scenario instead — rewrite the test body so each state is isolated: + +```go +func TestStatsCountsEachState(t *testing.T) { + b, rdb := newTestBroker(t) + ctx := context.Background() + + // 2 ready + for i := 0; i < 2; i++ { + if err := b.Enqueue(ctx, job.New("emails", []byte("r"))); err != nil { + t.Fatalf("Enqueue ready: %v", err) + } + } + // 1 delayed + if err := b.Enqueue(ctx, job.New("emails", []byte("d")), broker.WithDelay(time.Hour)); err != nil { + t.Fatalf("Enqueue delayed: %v", err) + } + // 1 inflight: enqueue then claim it (claims highest-priority ready; all equal here) + if err := b.Enqueue(ctx, job.New("emails", []byte("i"))); err != nil { + t.Fatalf("Enqueue inflight: %v", err) + } + if _, ok, err := b.Claim(ctx, "emails", time.Minute); err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v", ok, err) + } + // 1 dlq: push an id directly so the count is unambiguous + if err := rdb.RPush(ctx, "q:emails:dlq", "deadid").Err(); err != nil { + t.Fatalf("seed dlq: %v", err) + } + + s, err := b.Stats(ctx, "emails") + if err != nil { + t.Fatalf("Stats: %v", err) + } + if s.Ready != 2 { + t.Errorf("Ready = %d, want 2", s.Ready) + } + if s.Inflight != 1 { + t.Errorf("Inflight = %d, want 1", s.Inflight) + } + if s.Delayed != 1 { + t.Errorf("Delayed = %d, want 1", s.Delayed) + } + if s.DLQ != 1 { + t.Errorf("DLQ = %d, want 1", s.DLQ) + } +} +``` + +Use the second (deterministic) version. (3 ready enqueued, 1 claimed → 2 ready + 1 inflight.) + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/broker/ -run TestStatsCountsEachState -v` +Expected: FAIL — `b.Stats` undefined (compile error). + +- [ ] **Step 3: Implement `Stats`** + +In `internal/broker/broker.go`, add (near the other methods): + +```go +// Stats is a point-in-time count of a queue's jobs by state. Each field is the +// cardinality of the corresponding Redis structure for the queue. +type Stats struct { + Ready int64 `json:"ready"` + Inflight int64 `json:"inflight"` + Delayed int64 `json:"delayed"` + DLQ int64 `json:"dlq"` +} + +// Stats returns the current depth of each of a queue's states in one round trip. +// ready/inflight/delayed are ZSETs (ZCARD); the dlq is a list (LLEN). +func (b *Broker) Stats(ctx context.Context, queue string) (Stats, error) { + pipe := b.rdb.Pipeline() + ready := pipe.ZCard(ctx, readyKey(queue)) + inflight := pipe.ZCard(ctx, inflightKey(queue)) + delayed := pipe.ZCard(ctx, delayedKey(queue)) + dlq := pipe.LLen(ctx, dlqKey(queue)) + if _, err := pipe.Exec(ctx); err != nil { + return Stats{}, fmt.Errorf("broker: stats for %q: %w", queue, err) + } + return Stats{ + Ready: ready.Val(), + Inflight: inflight.Val(), + Delayed: delayed.Val(), + DLQ: dlq.Val(), + }, nil +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `go test ./internal/broker/ -run TestStatsCountsEachState -v` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add internal/broker/broker.go internal/broker/broker_test.go +git commit -m "Add broker Stats: per-queue depth by state" +``` + +--- + +## Task 2: Broker `ListDLQ` + +**Files:** +- Modify: `internal/broker/broker.go` +- Test: `internal/broker/broker_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `internal/broker/broker_test.go`: + +```go +// deadLetter enqueues a job with no retry budget, claims it, and nacks it so it +// lands in the DLQ; it returns the dead-lettered job's id. +func deadLetter(t *testing.T, b *broker.Broker, ctx context.Context, queue, payload string) string { + t.Helper() + j := job.New(queue, []byte(payload)) + j.MaxRetries = 0 + if err := b.Enqueue(ctx, j); err != nil { + t.Fatalf("Enqueue: %v", err) + } + claimed, ok, err := b.Claim(ctx, queue, time.Minute) + if err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v", ok, err) + } + if err := b.Nack(ctx, claimed); err != nil { + t.Fatalf("Nack: %v", err) + } + return claimed.ID +} + +func TestListDLQReturnsDeadJobs(t *testing.T) { + b, _ := newTestBroker(t) + ctx := context.Background() + + id1 := deadLetter(t, b, ctx, "emails", "a") + id2 := deadLetter(t, b, ctx, "emails", "b") + + jobs, err := b.ListDLQ(ctx, "emails", 0, 0) + if err != nil { + t.Fatalf("ListDLQ: %v", err) + } + if len(jobs) != 2 { + t.Fatalf("len = %d, want 2", len(jobs)) + } + // DLQ is a list pushed with RPUSH, so order is insertion order. + if jobs[0].ID != id1 || jobs[1].ID != id2 { + t.Errorf("ids = %s,%s want %s,%s", jobs[0].ID, jobs[1].ID, id1, id2) + } + if jobs[0].State != job.StateDead { + t.Errorf("state = %q, want dead", jobs[0].State) + } +} + +func TestListDLQPaginates(t *testing.T) { + b, _ := newTestBroker(t) + ctx := context.Background() + for i := 0; i < 3; i++ { + deadLetter(t, b, ctx, "emails", "x") + } + page, err := b.ListDLQ(ctx, "emails", 2, 1) // limit 2, offset 1 -> items 2 and 3 + if err != nil { + t.Fatalf("ListDLQ: %v", err) + } + if len(page) != 2 { + t.Errorf("len = %d, want 2", len(page)) + } +} + +func TestListDLQEmpty(t *testing.T) { + b, _ := newTestBroker(t) + jobs, err := b.ListDLQ(context.Background(), "emails", 0, 0) + if err != nil { + t.Fatalf("ListDLQ: %v", err) + } + if len(jobs) != 0 { + t.Errorf("len = %d, want 0", len(jobs)) + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/broker/ -run TestListDLQ -v` +Expected: FAIL — `b.ListDLQ` undefined. + +- [ ] **Step 3: Implement `ListDLQ`** + +In `internal/broker/broker.go`, add: + +```go +// DLQ listing bounds: an unset/zero limit uses the default; the max caps a single +// page so a huge DLQ cannot be slurped in one request. +const ( + defaultDLQLimit = 50 + maxDLQLimit = 1000 +) + +// ListDLQ returns up to limit dead-lettered jobs for a queue, starting at offset +// (0-based) in insertion order. A limit <= 0 uses the default; limits above the +// max are clamped. Job ids whose hash has already been removed are skipped. +func (b *Broker) ListDLQ(ctx context.Context, queue string, limit, offset int64) ([]job.Job, error) { + if limit <= 0 { + limit = defaultDLQLimit + } + if limit > maxDLQLimit { + limit = maxDLQLimit + } + if offset < 0 { + offset = 0 + } + ids, err := b.rdb.LRange(ctx, dlqKey(queue), offset, offset+limit-1).Result() + if err != nil { + return nil, fmt.Errorf("broker: listing dlq for %q: %w", queue, err) + } + jobs := make([]job.Job, 0, len(ids)) + for _, id := range ids { + h, err := b.rdb.HGetAll(ctx, jobKey(id)).Result() + if err != nil { + return nil, fmt.Errorf("broker: loading dlq job %s: %w", id, err) + } + if len(h) == 0 { + continue // hash already cleaned up; skip + } + j, err := job.FromHash(h) + if err != nil { + return nil, fmt.Errorf("broker: decoding dlq job %s: %w", id, err) + } + jobs = append(jobs, j) + } + return jobs, nil +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `go test ./internal/broker/ -run TestListDLQ -v` +Expected: PASS (3 tests). + +- [ ] **Step 5: Commit** + +```bash +git add internal/broker/broker.go internal/broker/broker_test.go +git commit -m "Add broker ListDLQ: paged dead-letter inspection" +``` + +--- + +## Task 3: `requeue.lua` + Broker `RequeueDLQ` + +**Files:** +- Create: `internal/broker/scripts/requeue.lua` +- Modify: `internal/broker/scripts.go` +- Modify: `internal/broker/broker.go` +- Test: `internal/broker/broker_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `internal/broker/broker_test.go`: + +```go +func TestRequeueDLQMovesJobBackToReady(t *testing.T) { + b, rdb := newTestBroker(t) + ctx := context.Background() + + id := deadLetter(t, b, ctx, "emails", "x") + + ok, err := b.RequeueDLQ(ctx, "emails", id) + if err != nil { + t.Fatalf("RequeueDLQ: %v", err) + } + if !ok { + t.Fatal("RequeueDLQ returned false, want true") + } + + // gone from dlq + if n, _ := rdb.LLen(ctx, "q:emails:dlq").Result(); n != 0 { + t.Errorf("dlq len = %d, want 0", n) + } + // back in ready + if n, _ := rdb.ZCard(ctx, "q:emails:ready").Result(); n != 1 { + t.Errorf("ready card = %d, want 1", n) + } + // state reset to ready, attempts reset to 0 + h, err := rdb.HGetAll(ctx, "job:"+id).Result() + if err != nil { + t.Fatalf("HGetAll: %v", err) + } + if h["state"] != "ready" { + t.Errorf("state = %q, want ready", h["state"]) + } + if h["attempts"] != "0" { + t.Errorf("attempts = %q, want 0", h["attempts"]) + } + + // and it is claimable again + if _, ok, err := b.Claim(ctx, "emails", time.Minute); err != nil || !ok { + t.Fatalf("Claim after requeue: ok=%v err=%v", ok, err) + } +} + +func TestRequeueDLQUnknownIDReturnsFalse(t *testing.T) { + b, _ := newTestBroker(t) + ok, err := b.RequeueDLQ(context.Background(), "emails", "nope") + if err != nil { + t.Fatalf("RequeueDLQ: %v", err) + } + if ok { + t.Error("RequeueDLQ returned true for an id not in the DLQ, want false") + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/broker/ -run TestRequeueDLQ -v` +Expected: FAIL — `b.RequeueDLQ` undefined. + +- [ ] **Step 3: Create the Lua script** + +Create `internal/broker/scripts/requeue.lua`: + +```lua +-- requeue.lua — move a dead-lettered job back into ready for another run. +-- +-- An operator action: a job that exhausted its retry budget is given a fresh +-- start. The remove-from-dlq and add-to-ready must be one atomic step so the job +-- can never be in both or neither. attempts is reset to 0 so the job gets a full +-- retry budget again; the ready score is rebuilt from the job's priority exactly +-- like promote.lua/reaper.lua do, so priority ordering is preserved. +-- +-- KEYS[1] = dlq list q:{name}:dlq +-- KEYS[2] = ready set q:{name}:ready (ZSET scored by priority) +-- ARGV[1] = job id +-- ARGV[2] = job hash key prefix ("job:") +-- ARGV[3] = now in unix milliseconds +-- ARGV[4] = priority scale (composite ready-score multiplier) +-- +-- Returns 1 if the job was requeued, 0 if it was not present in the DLQ. + +local removed = redis.call('LREM', KEYS[1], 1, ARGV[1]) +if removed == 0 then + return 0 +end + +local job_key = ARGV[2] .. ARGV[1] +local priority = tonumber(redis.call('HGET', job_key, 'priority')) or 0 +redis.call('HSET', job_key, 'state', 'ready', 'attempts', 0) + +local score = priority * tonumber(ARGV[4]) - tonumber(ARGV[3]) +redis.call('ZADD', KEYS[2], score, ARGV[1]) +return 1 +``` + +- [ ] **Step 4: Register the script** + +In `internal/broker/scripts.go`, append (after the `enqueueScript` block): + +```go +//go:embed scripts/requeue.lua +var requeueSrc string + +var requeueScript = redis.NewScript(requeueSrc) +``` + +- [ ] **Step 5: Implement `RequeueDLQ`** + +In `internal/broker/broker.go`, add: + +```go +// RequeueDLQ moves a dead-lettered job back into the ready set for another run, +// resetting its attempts to 0 (a deliberate operator retry). The move is atomic +// in requeue.lua. It returns (false, nil) when the id is not in the queue's DLQ. +func (b *Broker) RequeueDLQ(ctx context.Context, queue, id string) (bool, error) { + n, err := requeueScript.Run(ctx, b.rdb, + []string{dlqKey(queue), readyKey(queue)}, + id, jobKeyPrefix, time.Now().UnixMilli(), priorityScale, + ).Int() + if err != nil { + return false, fmt.Errorf("broker: requeuing dlq job %s: %w", id, err) + } + return n == 1, nil +} +``` + +- [ ] **Step 6: Run test to verify it passes** + +Run: `go test ./internal/broker/ -run TestRequeueDLQ -v` +Expected: PASS (2 tests). + +- [ ] **Step 7: Commit** + +```bash +git add internal/broker/scripts/requeue.lua internal/broker/scripts.go internal/broker/broker.go internal/broker/broker_test.go +git commit -m "Add broker RequeueDLQ with atomic requeue.lua (dlq -> ready, attempts reset)" +``` + +--- + +## Task 4: Broker `Queues` (discovery via SCAN) + +**Files:** +- Modify: `internal/broker/broker.go` +- Test: `internal/broker/broker_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `internal/broker/broker_test.go`: + +```go +func TestQueuesDiscoversDistinctNames(t *testing.T) { + b, _ := newTestBroker(t) + ctx := context.Background() + + if err := b.Enqueue(ctx, job.New("emails", []byte("a"))); err != nil { + t.Fatalf("Enqueue emails: %v", err) + } + if err := b.Enqueue(ctx, job.New("sms", []byte("b"))); err != nil { + t.Fatalf("Enqueue sms: %v", err) + } + // a second key family for the same queue must not double-count it + if err := b.Enqueue(ctx, job.New("emails", []byte("c")), broker.WithDelay(time.Hour)); err != nil { + t.Fatalf("Enqueue emails delayed: %v", err) + } + + names, err := b.Queues(ctx) + if err != nil { + t.Fatalf("Queues: %v", err) + } + if len(names) != 2 || names[0] != "emails" || names[1] != "sms" { + t.Errorf("names = %v, want [emails sms]", names) + } +} + +func TestQueuesEmpty(t *testing.T) { + b, _ := newTestBroker(t) + names, err := b.Queues(context.Background()) + if err != nil { + t.Fatalf("Queues: %v", err) + } + if len(names) != 0 { + t.Errorf("names = %v, want empty", names) + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/broker/ -run TestQueues -v` +Expected: FAIL — `b.Queues` undefined. + +- [ ] **Step 3: Implement `Queues`** + +In `internal/broker/broker.go`, add `"sort"` and `"strings"` to the import block if not already present, then add: + +```go +// Queues discovers the distinct queue names present in Redis by scanning for the +// per-queue key prefix `q:{name}:...`. It uses a non-blocking SCAN cursor loop, +// dedupes, and returns the names sorted for stable output. On a large keyspace +// this still iterates every key (bounded work per round trip). +func (b *Broker) Queues(ctx context.Context) ([]string, error) { + seen := make(map[string]struct{}) + var cursor uint64 + for { + keys, next, err := b.rdb.Scan(ctx, cursor, "q:*", 200).Result() + if err != nil { + return nil, fmt.Errorf("broker: scanning queues: %w", err) + } + for _, k := range keys { + // k is "q:{name}:{suffix...}"; the name is the segment between the + // leading "q:" and the next ":". + rest := strings.TrimPrefix(k, "q:") + i := strings.IndexByte(rest, ':') + if i <= 0 { + continue + } + seen[rest[:i]] = struct{}{} + } + cursor = next + if cursor == 0 { + break + } + } + names := make([]string, 0, len(seen)) + for n := range seen { + names = append(names, n) + } + sort.Strings(names) + return names, nil +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `go test ./internal/broker/ -run TestQueues -v` +Expected: PASS (2 tests). + +- [ ] **Step 5: Full broker suite under race** + +Run: `go test -race ./internal/broker/` +Expected: PASS (all existing + the four new methods). + +- [ ] **Step 6: Commit** + +```bash +git add internal/broker/broker.go internal/broker/broker_test.go +git commit -m "Add broker Queues: discover queue names via SCAN" +``` + +--- + +## Task 5: `internal/api` — router, JSON helpers, and enqueue endpoint + +**Files:** +- Create: `internal/api/api.go` +- Create: `internal/api/api_test.go` + +- [ ] **Step 1: Write the failing test** + +Create `internal/api/api_test.go`: + +```go +package api_test + +import ( + "bytes" + "context" + "encoding/json" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "os" + "testing" + + "github.com/redis/go-redis/v9" + + "github.com/StrangeNoob/relay/internal/api" + "github.com/StrangeNoob/relay/internal/broker" +) + +// apiTestRedisDB is this package's dedicated Redis DB. broker tests use 15, +// worker 14, metrics 13; api claims 12 so parallel `go test ./...` never collides. +const apiTestRedisDB = 12 + +func newTestAPI(t *testing.T) (http.Handler, *broker.Broker, *redis.Client) { + t.Helper() + addr := os.Getenv("REDIS_ADDR") + if addr == "" { + addr = "localhost:6379" + } + rdb := redis.NewClient(&redis.Options{Addr: addr, DB: apiTestRedisDB}) + ctx := context.Background() + if err := rdb.Ping(ctx).Err(); err != nil { + t.Skipf("redis not available at %s: %v", addr, err) + } + if err := rdb.FlushDB(ctx).Err(); err != nil { + t.Fatalf("flushdb: %v", err) + } + t.Cleanup(func() { _ = rdb.Close() }) + b := broker.New(rdb) + h := api.New(b, slog.New(slog.NewTextHandler(io.Discard, nil))) + return h, b, rdb +} + +// do issues a request against the handler and returns the recorder. +func do(t *testing.T, h http.Handler, method, target string, body any) *httptest.ResponseRecorder { + t.Helper() + var r io.Reader + if body != nil { + buf, err := json.Marshal(body) + if err != nil { + t.Fatalf("marshal body: %v", err) + } + r = bytes.NewReader(buf) + } + req := httptest.NewRequest(method, target, r) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + return rec +} + +func TestEnqueueEndpointCreatesJob(t *testing.T) { + h, _, _ := newTestAPI(t) + + rec := do(t, h, http.MethodPost, "/api/queues/emails/jobs", map[string]any{ + "payload": "hello", + }) + if rec.Code != http.StatusCreated { + t.Fatalf("status = %d, want 201; body=%s", rec.Code, rec.Body.String()) + } + var resp struct { + ID string `json:"id"` + State string `json:"state"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &resp); err != nil { + t.Fatalf("decode: %v", err) + } + if resp.ID == "" { + t.Error("response id is empty") + } + if resp.State != "ready" { + t.Errorf("state = %q, want ready", resp.State) + } +} + +func TestEnqueueDuplicateReturns409(t *testing.T) { + h, _, _ := newTestAPI(t) + body := map[string]any{"payload": "x", "idempotency_key": "k1"} + + if rec := do(t, h, http.MethodPost, "/api/queues/emails/jobs", body); rec.Code != http.StatusCreated { + t.Fatalf("first enqueue status = %d, want 201", rec.Code) + } + rec := do(t, h, http.MethodPost, "/api/queues/emails/jobs", body) + if rec.Code != http.StatusConflict { + t.Errorf("duplicate status = %d, want 409", rec.Code) + } +} + +func TestEnqueueBadJSONReturns400(t *testing.T) { + h, _, _ := newTestAPI(t) + req := httptest.NewRequest(http.MethodPost, "/api/queues/emails/jobs", bytes.NewReader([]byte("{not json"))) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + if rec.Code != http.StatusBadRequest { + t.Errorf("status = %d, want 400", rec.Code) + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/api/ -run TestEnqueue -v` +Expected: FAIL — package `internal/api` does not exist / `api.New` undefined. + +- [ ] **Step 3: Implement the API scaffold + enqueue** + +Create `internal/api/api.go`: + +```go +// Package api is Relay's HTTP control surface: a thin JSON layer over the broker. +// Handlers parse and validate the request, call one broker method, and encode the +// result and status code — all queue semantics stay in internal/broker. +package api + +import ( + "encoding/json" + "errors" + "log/slog" + "net/http" + "strconv" + "time" + + "github.com/StrangeNoob/relay/internal/broker" + "github.com/StrangeNoob/relay/internal/job" +) + +// API holds the dependencies shared by the handlers. +type API struct { + broker *broker.Broker + logger *slog.Logger +} + +// New returns an http.Handler serving the Relay REST API over the given broker. +// A nil logger falls back to slog.Default(); tests pass a discard logger to stay +// quiet. Routes use stdlib method+path patterns (Go 1.22+). +func New(b *broker.Broker, logger *slog.Logger) http.Handler { + if logger == nil { + logger = slog.Default() + } + a := &API{broker: b, logger: logger} + mux := http.NewServeMux() + mux.HandleFunc("POST /api/queues/{queue}/jobs", a.enqueue) + mux.HandleFunc("GET /api/queues/{queue}/stats", a.stats) + mux.HandleFunc("GET /api/queues/{queue}/dlq", a.listDLQ) + mux.HandleFunc("POST /api/queues/{queue}/dlq/{id}/requeue", a.requeueDLQ) + mux.HandleFunc("GET /api/queues", a.queues) + return mux +} + +// writeJSON encodes v as the response body with the given status code. +func (a *API) writeJSON(w http.ResponseWriter, status int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + if err := json.NewEncoder(w).Encode(v); err != nil { + a.logger.Error("api: encoding response", "err", err) + } +} + +// writeError emits a {"error": msg} body with the given status code. +func (a *API) writeError(w http.ResponseWriter, status int, msg string) { + a.writeJSON(w, status, map[string]string{"error": msg}) +} + +// jobView is the JSON shape of a job in API responses. Payload is rendered as a +// string (UTF-8); created_at as RFC3339Nano. +type jobView struct { + ID string `json:"id"` + Queue string `json:"queue"` + Payload string `json:"payload"` + State string `json:"state"` + Attempts int `json:"attempts"` + MaxRetries int `json:"max_retries"` + Priority int `json:"priority"` + CreatedAt string `json:"created_at"` + IdempotencyKey string `json:"idempotency_key,omitempty"` +} + +func toJobView(j job.Job) jobView { + return jobView{ + ID: j.ID, + Queue: j.Queue, + Payload: string(j.Payload), + State: string(j.State), + Attempts: j.Attempts, + MaxRetries: j.MaxRetries, + Priority: j.Priority, + CreatedAt: j.CreatedAt.Format(time.RFC3339Nano), + IdempotencyKey: j.IdempotencyKey, + } +} + +type enqueueRequest struct { + Payload string `json:"payload"` + DelayMs int64 `json:"delay_ms"` + Priority *int `json:"priority"` + IdempotencyKey string `json:"idempotency_key"` +} + +type enqueueResponse struct { + ID string `json:"id"` + State string `json:"state"` +} + +// enqueue handles POST /api/queues/{queue}/jobs. +func (a *API) enqueue(w http.ResponseWriter, r *http.Request) { + queue := r.PathValue("queue") + var req enqueueRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + a.writeError(w, http.StatusBadRequest, "invalid JSON body") + return + } + + j := job.New(queue, []byte(req.Payload)) + var opts []broker.EnqueueOption + if req.DelayMs > 0 { + opts = append(opts, broker.WithDelay(time.Duration(req.DelayMs)*time.Millisecond)) + } + if req.Priority != nil { + opts = append(opts, broker.WithPriority(*req.Priority)) + } + if req.IdempotencyKey != "" { + opts = append(opts, broker.WithIdempotencyKey(req.IdempotencyKey)) + } + + if err := a.broker.Enqueue(r.Context(), j, opts...); err != nil { + if errors.Is(err, broker.ErrDuplicate) { + a.writeError(w, http.StatusConflict, "duplicate idempotency key") + return + } + a.logger.Error("api: enqueue failed", "queue", queue, "err", err) + a.writeError(w, http.StatusInternalServerError, "internal error") + return + } + + // Enqueue routes to delayed only for a future ready-at; mirror that here for + // the reported state (Enqueue takes the job by value, so j.State is unchanged). + state := job.StateReady + if req.DelayMs > 0 { + state = job.StateDelayed + } + a.writeJSON(w, http.StatusCreated, enqueueResponse{ID: j.ID, State: string(state)}) +} + +// parseInt64 parses a query value, returning def for an empty string. +func parseInt64(s string, def int64) (int64, error) { + if s == "" { + return def, nil + } + return strconv.ParseInt(s, 10, 64) +} +``` + +NOTE: this task adds `stats`, `listDLQ`, `requeueDLQ`, and `queues` to the router but those methods are implemented in Tasks 6–7. To keep the package compiling between tasks, add temporary stubs now and replace them in the next tasks: + +```go +func (a *API) stats(w http.ResponseWriter, r *http.Request) { a.writeError(w, http.StatusNotImplemented, "not implemented") } +func (a *API) listDLQ(w http.ResponseWriter, r *http.Request) { a.writeError(w, http.StatusNotImplemented, "not implemented") } +func (a *API) requeueDLQ(w http.ResponseWriter, r *http.Request) { a.writeError(w, http.StatusNotImplemented, "not implemented") } +func (a *API) queues(w http.ResponseWriter, r *http.Request) { a.writeError(w, http.StatusNotImplemented, "not implemented") } +``` + +(Remove `parseInt64`'s unused warning by leaving it; it is used in Task 6. If the compiler complains about an unused function in this task, that is fine — Go does not error on unused package-level functions, only unused imports/locals. `strconv` is used by `parseInt64`, so the import is fine.) + +- [ ] **Step 4: Run test to verify it passes** + +Run: `go test ./internal/api/ -run TestEnqueue -v` +Expected: PASS (3 tests). Also `go build ./...` clean. + +- [ ] **Step 5: Commit** + +```bash +git add internal/api/api.go internal/api/api_test.go +git commit -m "Add internal/api with enqueue endpoint and JSON scaffolding" +``` + +--- + +## Task 6: API stats + DLQ-list endpoints + +**Files:** +- Modify: `internal/api/api.go` (replace `stats` and `listDLQ` stubs) +- Modify: `internal/api/api_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `internal/api/api_test.go`: + +```go +func TestStatsEndpoint(t *testing.T) { + h, b, _ := newTestAPI(t) + ctx := context.Background() + for i := 0; i < 2; i++ { + if err := b.Enqueue(ctx, mustJob("emails", "x")); err != nil { + t.Fatalf("Enqueue: %v", err) + } + } + + rec := do(t, h, http.MethodGet, "/api/queues/emails/stats", nil) + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", rec.Code) + } + var s struct { + Ready int64 `json:"ready"` + Inflight int64 `json:"inflight"` + Delayed int64 `json:"delayed"` + DLQ int64 `json:"dlq"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &s); err != nil { + t.Fatalf("decode: %v", err) + } + if s.Ready != 2 { + t.Errorf("ready = %d, want 2", s.Ready) + } +} + +func TestDLQListEndpoint(t *testing.T) { + h, b, rdb := newTestAPI(t) + ctx := context.Background() + // seed one dead-lettered job directly: push id + write its hash. + j := mustJob("emails", "dead") + j.State = "dead" + if err := rdb.HSet(ctx, "job:"+j.ID, j.ToHash()).Err(); err != nil { + t.Fatalf("HSet: %v", err) + } + if err := rdb.RPush(ctx, "q:emails:dlq", j.ID).Err(); err != nil { + t.Fatalf("RPush: %v", err) + } + _ = b + + rec := do(t, h, http.MethodGet, "/api/queues/emails/dlq", nil) + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200; body=%s", rec.Code, rec.Body.String()) + } + var jobs []map[string]any + if err := json.Unmarshal(rec.Body.Bytes(), &jobs); err != nil { + t.Fatalf("decode: %v", err) + } + if len(jobs) != 1 || jobs[0]["id"] != j.ID { + t.Errorf("jobs = %v, want one with id %s", jobs, j.ID) + } +} + +func TestDLQListBadLimitReturns400(t *testing.T) { + h, _, _ := newTestAPI(t) + rec := do(t, h, http.MethodGet, "/api/queues/emails/dlq?limit=abc", nil) + if rec.Code != http.StatusBadRequest { + t.Errorf("status = %d, want 400", rec.Code) + } +} +``` + +Also add this helper to `api_test.go` (used above and later): + +```go +// mustJob builds a job for tests via the broker's job package. +func mustJob(queue, payload string) job.Job { + return job.New(queue, []byte(payload)) +} +``` + +And add the import `"github.com/StrangeNoob/relay/internal/job"` to `api_test.go`. + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/api/ -run 'TestStatsEndpoint|TestDLQList' -v` +Expected: FAIL — stats returns 501 (the stub), so `status = 501, want 200`. + +- [ ] **Step 3: Replace the stubs** + +In `internal/api/api.go`, replace the `stats` and `listDLQ` stub functions with: + +```go +// stats handles GET /api/queues/{queue}/stats. +func (a *API) stats(w http.ResponseWriter, r *http.Request) { + queue := r.PathValue("queue") + s, err := a.broker.Stats(r.Context(), queue) + if err != nil { + a.logger.Error("api: stats failed", "queue", queue, "err", err) + a.writeError(w, http.StatusInternalServerError, "internal error") + return + } + a.writeJSON(w, http.StatusOK, s) +} + +// listDLQ handles GET /api/queues/{queue}/dlq?limit=&offset=. +func (a *API) listDLQ(w http.ResponseWriter, r *http.Request) { + queue := r.PathValue("queue") + limit, err := parseInt64(r.URL.Query().Get("limit"), 0) + if err != nil { + a.writeError(w, http.StatusBadRequest, "invalid limit") + return + } + offset, err := parseInt64(r.URL.Query().Get("offset"), 0) + if err != nil { + a.writeError(w, http.StatusBadRequest, "invalid offset") + return + } + jobs, err := a.broker.ListDLQ(r.Context(), queue, limit, offset) + if err != nil { + a.logger.Error("api: list dlq failed", "queue", queue, "err", err) + a.writeError(w, http.StatusInternalServerError, "internal error") + return + } + views := make([]jobView, 0, len(jobs)) + for _, j := range jobs { + views = append(views, toJobView(j)) + } + a.writeJSON(w, http.StatusOK, views) +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `go test ./internal/api/ -run 'TestStatsEndpoint|TestDLQList' -v` +Expected: PASS (3 tests). + +- [ ] **Step 5: Commit** + +```bash +git add internal/api/api.go internal/api/api_test.go +git commit -m "Implement API stats and DLQ-list endpoints" +``` + +--- + +## Task 7: API requeue + queues endpoints + +**Files:** +- Modify: `internal/api/api.go` (replace `requeueDLQ` and `queues` stubs) +- Modify: `internal/api/api_test.go` + +- [ ] **Step 1: Write the failing test** + +Append to `internal/api/api_test.go`: + +```go +func TestRequeueEndpointMovesJobBack(t *testing.T) { + h, _, rdb := newTestAPI(t) + ctx := context.Background() + j := mustJob("emails", "dead") + j.State = "dead" + if err := rdb.HSet(ctx, "job:"+j.ID, j.ToHash()).Err(); err != nil { + t.Fatalf("HSet: %v", err) + } + if err := rdb.RPush(ctx, "q:emails:dlq", j.ID).Err(); err != nil { + t.Fatalf("RPush: %v", err) + } + + rec := do(t, h, http.MethodPost, "/api/queues/emails/dlq/"+j.ID+"/requeue", nil) + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200; body=%s", rec.Code, rec.Body.String()) + } + if n, _ := rdb.ZCard(ctx, "q:emails:ready").Result(); n != 1 { + t.Errorf("ready card = %d, want 1", n) + } + if n, _ := rdb.LLen(ctx, "q:emails:dlq").Result(); n != 0 { + t.Errorf("dlq len = %d, want 0", n) + } +} + +func TestRequeueUnknownReturns404(t *testing.T) { + h, _, _ := newTestAPI(t) + rec := do(t, h, http.MethodPost, "/api/queues/emails/dlq/nope/requeue", nil) + if rec.Code != http.StatusNotFound { + t.Errorf("status = %d, want 404", rec.Code) + } +} + +func TestQueuesEndpointListsNames(t *testing.T) { + h, b, _ := newTestAPI(t) + ctx := context.Background() + if err := b.Enqueue(ctx, mustJob("emails", "a")); err != nil { + t.Fatalf("Enqueue: %v", err) + } + if err := b.Enqueue(ctx, mustJob("sms", "b")); err != nil { + t.Fatalf("Enqueue: %v", err) + } + + rec := do(t, h, http.MethodGet, "/api/queues", nil) + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", rec.Code) + } + var names []string + if err := json.Unmarshal(rec.Body.Bytes(), &names); err != nil { + t.Fatalf("decode: %v", err) + } + if len(names) != 2 || names[0] != "emails" || names[1] != "sms" { + t.Errorf("names = %v, want [emails sms]", names) + } +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `go test ./internal/api/ -run 'TestRequeue|TestQueuesEndpoint' -v` +Expected: FAIL — stubs return 501, so `status = 501, want 200/404`. + +- [ ] **Step 3: Replace the stubs** + +In `internal/api/api.go`, replace the `requeueDLQ` and `queues` stub functions with: + +```go +// requeueDLQ handles POST /api/queues/{queue}/dlq/{id}/requeue. +func (a *API) requeueDLQ(w http.ResponseWriter, r *http.Request) { + queue := r.PathValue("queue") + id := r.PathValue("id") + ok, err := a.broker.RequeueDLQ(r.Context(), queue, id) + if err != nil { + a.logger.Error("api: requeue failed", "queue", queue, "id", id, "err", err) + a.writeError(w, http.StatusInternalServerError, "internal error") + return + } + if !ok { + a.writeError(w, http.StatusNotFound, "job not found in dlq") + return + } + a.writeJSON(w, http.StatusOK, map[string]bool{"requeued": true}) +} + +// queues handles GET /api/queues. +func (a *API) queues(w http.ResponseWriter, r *http.Request) { + names, err := a.broker.Queues(r.Context()) + if err != nil { + a.logger.Error("api: queues failed", "err", err) + a.writeError(w, http.StatusInternalServerError, "internal error") + return + } + a.writeJSON(w, http.StatusOK, names) +} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `go test ./internal/api/ -run 'TestRequeue|TestQueuesEndpoint' -v` +Expected: PASS (3 tests). + +- [ ] **Step 5: Full api suite under race** + +Run: `go test -race ./internal/api/` +Expected: PASS (all api tests). + +- [ ] **Step 6: Commit** + +```bash +git add internal/api/api.go internal/api/api_test.go +git commit -m "Implement API requeue and queue-discovery endpoints" +``` + +--- + +## Task 8: `cmd/server` + +**Files:** +- Create: `cmd/server/main.go` + +- [ ] **Step 1: Implement the server** + +Create `cmd/server/main.go`: + +```go +// Command server runs Relay's HTTP control surface: the JSON API plus a +// Prometheus /metrics endpoint and a health check. It is a thin wiring layer; +// all behaviour lives in internal/api, internal/broker, and internal/metrics. +package main + +import ( + "context" + "flag" + "log/slog" + "net/http" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/redis/go-redis/v9" + + "github.com/StrangeNoob/relay/internal/api" + "github.com/StrangeNoob/relay/internal/broker" + "github.com/StrangeNoob/relay/internal/metrics" +) + +func main() { + addr := flag.String("addr", ":8080", "HTTP listen address") + redisAddr := flag.String("redis", envOr("REDIS_ADDR", "localhost:6379"), "Redis address") + queuesFlag := flag.String("queues", "", "comma-separated queues for the /metrics depth collector") + flag.Parse() + + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + slog.SetDefault(logger) + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + rdb := redis.NewClient(&redis.Options{Addr: *redisAddr}) + defer func() { _ = rdb.Close() }() + if err := rdb.Ping(ctx).Err(); err != nil { + logger.Error("cannot reach redis", "addr", *redisAddr, "err", err) + os.Exit(1) + } + + // A metrics recorder on the broker means API enqueues are counted; a depth + // collector for the configured queues exposes live gauges from the server. + rec := metrics.NewRecorder() + if qs := splitQueues(*queuesFlag); len(qs) > 0 { + rec.Registry().MustRegister(metrics.NewDepthCollector(rdb, qs...)) + } + b := broker.New(rdb, broker.WithMetrics(rec)) + + mux := http.NewServeMux() + mux.Handle("/api/", api.New(b, logger)) + mux.Handle("/metrics", promhttp.HandlerFor(rec.Registry(), promhttp.HandlerOpts{})) + mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) + + srv := &http.Server{Addr: *addr, Handler: mux} + go func() { + logger.Info("relay server listening", "addr", *addr, "redis", *redisAddr) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Error("server error", "err", err) + } + }() + + <-ctx.Done() + logger.Info("shutdown signal received") + shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := srv.Shutdown(shutCtx); err != nil { + logger.Error("server shutdown error", "err", err) + } + logger.Info("relay server stopped cleanly") +} + +// envOr returns the environment value for key, or def when it is unset/empty. +func envOr(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +// splitQueues parses a comma-separated queue list, trimming blanks. +func splitQueues(s string) []string { + if strings.TrimSpace(s) == "" { + return nil + } + parts := strings.Split(s, ",") + out := make([]string, 0, len(parts)) + for _, p := range parts { + if p = strings.TrimSpace(p); p != "" { + out = append(out, p) + } + } + return out +} +``` + +- [ ] **Step 2: Build and vet** + +Run: +```bash +go build ./... +go vet ./... +gofmt -l cmd/ internal/ +``` +Expected: all clean. + +- [ ] **Step 3: Smoke check (optional, requires Redis)** + +Run (manual, then Ctrl-C): +```bash +go run ./cmd/server -addr :8080 -queues demo & +sleep 1 +curl -s localhost:8080/healthz; echo +curl -s -X POST localhost:8080/api/queues/demo/jobs -d '{"payload":"hi"}'; echo +curl -s localhost:8080/api/queues/demo/stats; echo +curl -s localhost:8080/api/queues; echo +curl -s localhost:8080/metrics | grep -c relay_ +kill %1 +``` +Expected: `ok`; a `201` job JSON; stats with `ready:1`; `["demo"]`; some `relay_` metric lines. Skip if no local Redis. + +- [ ] **Step 4: Commit** + +```bash +git add cmd/server/main.go +git commit -m "Add cmd/server: HTTP API + /metrics + /healthz with graceful shutdown" +``` + +--- + +## Task 9: Update CLAUDE.md and final verification + +**Files:** +- Modify: `CLAUDE.md` + +- [ ] **Step 1: Update CLAUDE.md** + +Make these edits (match the file's exact wording when editing): + +1. **Status line** — note Phase 3 is in progress with 3a (HTTP API + server) done. E.g. change the "Phase 3 (API/dashboard) is next" framing to "Phase 3 in progress: 3a HTTP API + server ✅; dashboard/SDK/packaging remain." +2. **"What exists today" list** — add bullets: `internal/api` (JSON REST: enqueue, stats, DLQ list, requeue, queue discovery) and `cmd/server` (API + `/metrics` + `/healthz`). Add `requeue.lua` to the Lua script enumeration(s). Add the new broker methods (`Stats`, `ListDLQ`, `RequeueDLQ`, `Queues`) to the broker description. +3. **Redis data model / lifecycle** — note the DLQ now has an inspect/requeue surface (was "inspect/requeue surface is Phase 3"); requeue moves dlq→ready resetting attempts. +4. **Layout (✅ built · ◻ planned)** — mark `internal/api/` ✅ and `cmd/server/main.go` ✅; leave `internal/client`, `web/`, `deployments/` as ◻ (3b–3d). +5. **Lua script inventory** — wherever the script list appears (e.g. `internal/broker/scripts/*.lua` go:embed line), add `requeue.lua`. +6. **Build order** — Phase 3 line: mark 3a (API/server) done; 3b dashboard, 3c SDK, 3d packaging remain. +7. **Known limitations** — add an API bullet: no auth (demo-grade); UTF-8 string payloads only (base64 future); offset/limit DLQ paging; server depth-gauge `/metrics` covers only `-queues` passed at startup; `Queues` discovery via `SCAN`. +8. **Run commands** — optionally add `go run ./cmd/server -queues demo` to the end-to-end section. + +Keep claims accurate to what was built. Do not contradict existing invariants (at-least-once, atomic claim). + +- [ ] **Step 2: Full verification** + +Run: +```bash +go build ./... +go test -race ./... +go vet ./... +gofmt -l internal/ cmd/ +``` +Expected: build clean; all tests pass (broker DB 15, worker DB 14, metrics DB 13, api DB 12 — no collisions); vet clean; `gofmt -l` prints nothing. Tests need Redis at localhost:6379; if up, the broker/worker/metrics/api suites must run and pass. + +If anything fails, STOP and report — do not paper over a real failure. + +- [ ] **Step 3: Commit** + +```bash +git add CLAUDE.md +git commit -m "Document Phase 3a: HTTP API and server" +``` + +--- + +## Self-Review (completed during planning) + +- **Spec coverage:** broker `Stats` (Task 1), `ListDLQ` (Task 2), `RequeueDLQ` + `requeue.lua` (Task 3), `Queues` (Task 4); API enqueue/JSON scaffold (Task 5), stats + dlq list (Task 6), requeue + queues (Task 7); `cmd/server` with `/metrics` + `/healthz` + graceful shutdown and a metrics `Recorder` so API enqueues are counted (Task 8); CLAUDE.md + verification (Task 9). All spec sections mapped. +- **Type consistency:** broker methods — `Stats(ctx, queue) (Stats, error)`, `ListDLQ(ctx, queue, limit, offset int64) ([]job.Job, error)`, `RequeueDLQ(ctx, queue, id) (bool, error)`, `Queues(ctx) ([]string, error)` — match their call sites in `internal/api`. `Stats` struct JSON tags (`ready/inflight/delayed/dlq`) match the API test's decode struct. `api.New(b *broker.Broker, logger *slog.Logger) http.Handler` matches `cmd/server` and the api tests. `requeue.lua` ARGV order (id, prefix, now, priorityScale) matches `RequeueDLQ`'s `.Run` call. Test DBs: broker 15, worker 14, metrics 13, api 12. +- **No placeholders:** every code step has complete code. The Task 5 stubs are intentional, replaced in Tasks 6–7, and clearly marked. +- **Known soft spots:** Task 1's first test draft is explicitly discarded in favor of the deterministic version (the instruction says to use the second). Task 5 notes that `parseInt64` is used in Task 6 so it is not dead between tasks (and Go does not error on unused package-level funcs regardless). From 68803810ca64fe37ab14e667a5f59bca3fb0b5cc Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 20:01:19 +0530 Subject: [PATCH 03/13] Add broker Stats: per-queue depth by state --- internal/broker/broker.go | 28 ++++++++++++++++++++++ internal/broker/broker_test.go | 44 ++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/internal/broker/broker.go b/internal/broker/broker.go index 2f16832..99a24bc 100644 --- a/internal/broker/broker.go +++ b/internal/broker/broker.go @@ -353,6 +353,34 @@ func (b *Broker) Extend(ctx context.Context, j job.Job, visibility time.Duration return n == 1, nil } +// Stats is a point-in-time count of a queue's jobs by state. Each field is the +// cardinality of the corresponding Redis structure for the queue. +type Stats struct { + Ready int64 `json:"ready"` + Inflight int64 `json:"inflight"` + Delayed int64 `json:"delayed"` + DLQ int64 `json:"dlq"` +} + +// Stats returns the current depth of each of a queue's states in one round trip. +// ready/inflight/delayed are ZSETs (ZCARD); the dlq is a list (LLEN). +func (b *Broker) Stats(ctx context.Context, queue string) (Stats, error) { + pipe := b.rdb.Pipeline() + ready := pipe.ZCard(ctx, readyKey(queue)) + inflight := pipe.ZCard(ctx, inflightKey(queue)) + delayed := pipe.ZCard(ctx, delayedKey(queue)) + dlq := pipe.LLen(ctx, dlqKey(queue)) + if _, err := pipe.Exec(ctx); err != nil { + return Stats{}, fmt.Errorf("broker: stats for %q: %w", queue, err) + } + return Stats{ + Ready: ready.Val(), + Inflight: inflight.Val(), + Delayed: delayed.Val(), + DLQ: dlq.Val(), + }, nil +} + // hashFromLua converts the flat HGETALL array a script returns (alternating // field, value, field, value …) into a Go map. go-redis decodes a Lua table as // []interface{} of strings. diff --git a/internal/broker/broker_test.go b/internal/broker/broker_test.go index 01ee419..fd5b6f0 100644 --- a/internal/broker/broker_test.go +++ b/internal/broker/broker_test.go @@ -1215,3 +1215,47 @@ func TestRateLimitConcurrentClaimsRespectBurst(t *testing.T) { t.Errorf("inflight = %d, want 5", n) } } + +func TestStatsCountsEachState(t *testing.T) { + b, rdb := newTestBroker(t) + ctx := context.Background() + + // 2 ready + for i := 0; i < 2; i++ { + if err := b.Enqueue(ctx, job.New("emails", []byte("r"))); err != nil { + t.Fatalf("Enqueue ready: %v", err) + } + } + // 1 delayed + if err := b.Enqueue(ctx, job.New("emails", []byte("d")), broker.WithDelay(time.Hour)); err != nil { + t.Fatalf("Enqueue delayed: %v", err) + } + // 1 inflight: enqueue then claim it + if err := b.Enqueue(ctx, job.New("emails", []byte("i"))); err != nil { + t.Fatalf("Enqueue inflight: %v", err) + } + if _, ok, err := b.Claim(ctx, "emails", time.Minute); err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v", ok, err) + } + // 1 dlq: push an id directly so the count is unambiguous + if err := rdb.RPush(ctx, "q:emails:dlq", "deadid").Err(); err != nil { + t.Fatalf("seed dlq: %v", err) + } + + s, err := b.Stats(ctx, "emails") + if err != nil { + t.Fatalf("Stats: %v", err) + } + if s.Ready != 2 { + t.Errorf("Ready = %d, want 2", s.Ready) + } + if s.Inflight != 1 { + t.Errorf("Inflight = %d, want 1", s.Inflight) + } + if s.Delayed != 1 { + t.Errorf("Delayed = %d, want 1", s.Delayed) + } + if s.DLQ != 1 { + t.Errorf("DLQ = %d, want 1", s.DLQ) + } +} From df944edea5841750129a2d2d57f28720c996c967 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 20:05:01 +0530 Subject: [PATCH 04/13] Add broker ListDLQ: paged dead-letter inspection --- internal/broker/broker.go | 42 +++++++++++++++++++++ internal/broker/broker_test.go | 67 ++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+) diff --git a/internal/broker/broker.go b/internal/broker/broker.go index 99a24bc..cff8acf 100644 --- a/internal/broker/broker.go +++ b/internal/broker/broker.go @@ -381,6 +381,48 @@ func (b *Broker) Stats(ctx context.Context, queue string) (Stats, error) { }, nil } +// DLQ listing bounds: an unset/zero limit uses the default; the max caps a single +// page so a huge DLQ cannot be slurped in one request. +const ( + defaultDLQLimit = 50 + maxDLQLimit = 1000 +) + +// ListDLQ returns up to limit dead-lettered jobs for a queue, starting at offset +// (0-based) in insertion order. A limit <= 0 uses the default; limits above the +// max are clamped. Job ids whose hash has already been removed are skipped. +func (b *Broker) ListDLQ(ctx context.Context, queue string, limit, offset int64) ([]job.Job, error) { + if limit <= 0 { + limit = defaultDLQLimit + } + if limit > maxDLQLimit { + limit = maxDLQLimit + } + if offset < 0 { + offset = 0 + } + ids, err := b.rdb.LRange(ctx, dlqKey(queue), offset, offset+limit-1).Result() + if err != nil { + return nil, fmt.Errorf("broker: listing dlq for %q: %w", queue, err) + } + jobs := make([]job.Job, 0, len(ids)) + for _, id := range ids { + h, err := b.rdb.HGetAll(ctx, jobKey(id)).Result() + if err != nil { + return nil, fmt.Errorf("broker: loading dlq job %s: %w", id, err) + } + if len(h) == 0 { + continue // hash already cleaned up; skip + } + j, err := job.FromHash(h) + if err != nil { + return nil, fmt.Errorf("broker: decoding dlq job %s: %w", id, err) + } + jobs = append(jobs, j) + } + return jobs, nil +} + // hashFromLua converts the flat HGETALL array a script returns (alternating // field, value, field, value …) into a Go map. go-redis decodes a Lua table as // []interface{} of strings. diff --git a/internal/broker/broker_test.go b/internal/broker/broker_test.go index fd5b6f0..2d189ec 100644 --- a/internal/broker/broker_test.go +++ b/internal/broker/broker_test.go @@ -1216,6 +1216,73 @@ func TestRateLimitConcurrentClaimsRespectBurst(t *testing.T) { } } +// deadLetter enqueues a job with no retry budget, claims it, and nacks it so it +// lands in the DLQ; it returns the dead-lettered job's id. +func deadLetter(t *testing.T, b *broker.Broker, ctx context.Context, queue, payload string) string { + t.Helper() + j := job.New(queue, []byte(payload)) + j.MaxRetries = 0 + if err := b.Enqueue(ctx, j); err != nil { + t.Fatalf("Enqueue: %v", err) + } + claimed, ok, err := b.Claim(ctx, queue, time.Minute) + if err != nil || !ok { + t.Fatalf("Claim: ok=%v err=%v", ok, err) + } + if err := b.Nack(ctx, claimed); err != nil { + t.Fatalf("Nack: %v", err) + } + return claimed.ID +} + +func TestListDLQReturnsDeadJobs(t *testing.T) { + b, _ := newTestBroker(t) + ctx := context.Background() + + id1 := deadLetter(t, b, ctx, "emails", "a") + id2 := deadLetter(t, b, ctx, "emails", "b") + + jobs, err := b.ListDLQ(ctx, "emails", 0, 0) + if err != nil { + t.Fatalf("ListDLQ: %v", err) + } + if len(jobs) != 2 { + t.Fatalf("len = %d, want 2", len(jobs)) + } + if jobs[0].ID != id1 || jobs[1].ID != id2 { + t.Errorf("ids = %s,%s want %s,%s", jobs[0].ID, jobs[1].ID, id1, id2) + } + if jobs[0].State != job.StateDead { + t.Errorf("state = %q, want dead", jobs[0].State) + } +} + +func TestListDLQPaginates(t *testing.T) { + b, _ := newTestBroker(t) + ctx := context.Background() + for i := 0; i < 3; i++ { + deadLetter(t, b, ctx, "emails", "x") + } + page, err := b.ListDLQ(ctx, "emails", 2, 1) // limit 2, offset 1 -> items 2 and 3 + if err != nil { + t.Fatalf("ListDLQ: %v", err) + } + if len(page) != 2 { + t.Errorf("len = %d, want 2", len(page)) + } +} + +func TestListDLQEmpty(t *testing.T) { + b, _ := newTestBroker(t) + jobs, err := b.ListDLQ(context.Background(), "emails", 0, 0) + if err != nil { + t.Fatalf("ListDLQ: %v", err) + } + if len(jobs) != 0 { + t.Errorf("len = %d, want 0", len(jobs)) + } +} + func TestStatsCountsEachState(t *testing.T) { b, rdb := newTestBroker(t) ctx := context.Background() From c4601af9e4bc4609968e1e61bd27bf91eefa4c49 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 20:09:03 +0530 Subject: [PATCH 05/13] Add broker RequeueDLQ with atomic requeue.lua (dlq -> ready, attempts reset) --- internal/broker/broker.go | 14 +++++++++ internal/broker/broker_test.go | 47 +++++++++++++++++++++++++++++ internal/broker/scripts.go | 5 +++ internal/broker/scripts/requeue.lua | 29 ++++++++++++++++++ 4 files changed, 95 insertions(+) create mode 100644 internal/broker/scripts/requeue.lua diff --git a/internal/broker/broker.go b/internal/broker/broker.go index cff8acf..f6f6201 100644 --- a/internal/broker/broker.go +++ b/internal/broker/broker.go @@ -423,6 +423,20 @@ func (b *Broker) ListDLQ(ctx context.Context, queue string, limit, offset int64) return jobs, nil } +// RequeueDLQ moves a dead-lettered job back into the ready set for another run, +// resetting its attempts to 0 (a deliberate operator retry). The move is atomic +// in requeue.lua. It returns (false, nil) when the id is not in the queue's DLQ. +func (b *Broker) RequeueDLQ(ctx context.Context, queue, id string) (bool, error) { + n, err := requeueScript.Run(ctx, b.rdb, + []string{dlqKey(queue), readyKey(queue)}, + id, jobKeyPrefix, time.Now().UnixMilli(), priorityScale, + ).Int() + if err != nil { + return false, fmt.Errorf("broker: requeuing dlq job %s: %w", id, err) + } + return n == 1, nil +} + // hashFromLua converts the flat HGETALL array a script returns (alternating // field, value, field, value …) into a Go map. go-redis decodes a Lua table as // []interface{} of strings. diff --git a/internal/broker/broker_test.go b/internal/broker/broker_test.go index 2d189ec..01f1328 100644 --- a/internal/broker/broker_test.go +++ b/internal/broker/broker_test.go @@ -1326,3 +1326,50 @@ func TestStatsCountsEachState(t *testing.T) { t.Errorf("DLQ = %d, want 1", s.DLQ) } } + +func TestRequeueDLQMovesJobBackToReady(t *testing.T) { + b, rdb := newTestBroker(t) + ctx := context.Background() + + id := deadLetter(t, b, ctx, "emails", "x") + + ok, err := b.RequeueDLQ(ctx, "emails", id) + if err != nil { + t.Fatalf("RequeueDLQ: %v", err) + } + if !ok { + t.Fatal("RequeueDLQ returned false, want true") + } + + if n, _ := rdb.LLen(ctx, "q:emails:dlq").Result(); n != 0 { + t.Errorf("dlq len = %d, want 0", n) + } + if n, _ := rdb.ZCard(ctx, "q:emails:ready").Result(); n != 1 { + t.Errorf("ready card = %d, want 1", n) + } + h, err := rdb.HGetAll(ctx, "job:"+id).Result() + if err != nil { + t.Fatalf("HGetAll: %v", err) + } + if h["state"] != "ready" { + t.Errorf("state = %q, want ready", h["state"]) + } + if h["attempts"] != "0" { + t.Errorf("attempts = %q, want 0", h["attempts"]) + } + + if _, ok, err := b.Claim(ctx, "emails", time.Minute); err != nil || !ok { + t.Fatalf("Claim after requeue: ok=%v err=%v", ok, err) + } +} + +func TestRequeueDLQUnknownIDReturnsFalse(t *testing.T) { + b, _ := newTestBroker(t) + ok, err := b.RequeueDLQ(context.Background(), "emails", "nope") + if err != nil { + t.Fatalf("RequeueDLQ: %v", err) + } + if ok { + t.Error("RequeueDLQ returned true for an id not in the DLQ, want false") + } +} diff --git a/internal/broker/scripts.go b/internal/broker/scripts.go index 62067ee..10d992a 100644 --- a/internal/broker/scripts.go +++ b/internal/broker/scripts.go @@ -45,3 +45,8 @@ var promoteScript = redis.NewScript(promoteSrc) var enqueueSrc string var enqueueScript = redis.NewScript(enqueueSrc) + +//go:embed scripts/requeue.lua +var requeueSrc string + +var requeueScript = redis.NewScript(requeueSrc) diff --git a/internal/broker/scripts/requeue.lua b/internal/broker/scripts/requeue.lua new file mode 100644 index 0000000..c66fd86 --- /dev/null +++ b/internal/broker/scripts/requeue.lua @@ -0,0 +1,29 @@ +-- requeue.lua — move a dead-lettered job back into ready for another run. +-- +-- An operator action: a job that exhausted its retry budget is given a fresh +-- start. The remove-from-dlq and add-to-ready must be one atomic step so the job +-- can never be in both or neither. attempts is reset to 0 so the job gets a full +-- retry budget again; the ready score is rebuilt from the job's priority exactly +-- like promote.lua/reaper.lua do, so priority ordering is preserved. +-- +-- KEYS[1] = dlq list q:{name}:dlq +-- KEYS[2] = ready set q:{name}:ready (ZSET scored by priority) +-- ARGV[1] = job id +-- ARGV[2] = job hash key prefix ("job:") +-- ARGV[3] = now in unix milliseconds +-- ARGV[4] = priority scale (composite ready-score multiplier) +-- +-- Returns 1 if the job was requeued, 0 if it was not present in the DLQ. + +local removed = redis.call('LREM', KEYS[1], 1, ARGV[1]) +if removed == 0 then + return 0 +end + +local job_key = ARGV[2] .. ARGV[1] +local priority = tonumber(redis.call('HGET', job_key, 'priority')) or 0 +redis.call('HSET', job_key, 'state', 'ready', 'attempts', 0) + +local score = priority * tonumber(ARGV[4]) - tonumber(ARGV[3]) +redis.call('ZADD', KEYS[2], score, ARGV[1]) +return 1 From d12e791ffb37abce41a6557aa3edbf9bdfcb13d0 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 20:11:32 +0530 Subject: [PATCH 06/13] Comment requeue.lua hash-exists assumption --- internal/broker/scripts/requeue.lua | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/broker/scripts/requeue.lua b/internal/broker/scripts/requeue.lua index c66fd86..2ab401e 100644 --- a/internal/broker/scripts/requeue.lua +++ b/internal/broker/scripts/requeue.lua @@ -20,6 +20,9 @@ if removed == 0 then return 0 end +-- Assumes the job hash still exists: under normal Relay operation a hash is only +-- deleted by ack.lua (which removes from inflight, never the DLQ), so an id in +-- the DLQ always has its hash. local job_key = ARGV[2] .. ARGV[1] local priority = tonumber(redis.call('HGET', job_key, 'priority')) or 0 redis.call('HSET', job_key, 'state', 'ready', 'attempts', 0) From 18f20045a5487cda3401b91db55e96ac87846367 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 20:13:01 +0530 Subject: [PATCH 07/13] Add broker Queues: discover queue names via SCAN --- internal/broker/broker.go | 37 ++++++++++++++++++++++++++++++++++ internal/broker/broker_test.go | 35 ++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/internal/broker/broker.go b/internal/broker/broker.go index f6f6201..6cdf44c 100644 --- a/internal/broker/broker.go +++ b/internal/broker/broker.go @@ -10,6 +10,8 @@ import ( "errors" "fmt" "math/rand" + "sort" + "strings" "sync" "time" @@ -437,6 +439,41 @@ func (b *Broker) RequeueDLQ(ctx context.Context, queue, id string) (bool, error) return n == 1, nil } +// Queues discovers the distinct queue names present in Redis by scanning for the +// per-queue key prefix `q:{name}:...`. It uses a non-blocking SCAN cursor loop, +// dedupes, and returns the names sorted for stable output. On a large keyspace +// this still iterates every key (bounded work per round trip). +func (b *Broker) Queues(ctx context.Context) ([]string, error) { + seen := make(map[string]struct{}) + var cursor uint64 + for { + keys, next, err := b.rdb.Scan(ctx, cursor, "q:*", 200).Result() + if err != nil { + return nil, fmt.Errorf("broker: scanning queues: %w", err) + } + for _, k := range keys { + // k is "q:{name}:{suffix...}"; the name is the segment between the + // leading "q:" and the next ":". + rest := strings.TrimPrefix(k, "q:") + i := strings.IndexByte(rest, ':') + if i <= 0 { + continue + } + seen[rest[:i]] = struct{}{} + } + cursor = next + if cursor == 0 { + break + } + } + names := make([]string, 0, len(seen)) + for n := range seen { + names = append(names, n) + } + sort.Strings(names) + return names, nil +} + // hashFromLua converts the flat HGETALL array a script returns (alternating // field, value, field, value …) into a Go map. go-redis decodes a Lua table as // []interface{} of strings. diff --git a/internal/broker/broker_test.go b/internal/broker/broker_test.go index 01f1328..a992471 100644 --- a/internal/broker/broker_test.go +++ b/internal/broker/broker_test.go @@ -1373,3 +1373,38 @@ func TestRequeueDLQUnknownIDReturnsFalse(t *testing.T) { t.Error("RequeueDLQ returned true for an id not in the DLQ, want false") } } + +func TestQueuesDiscoversDistinctNames(t *testing.T) { + b, _ := newTestBroker(t) + ctx := context.Background() + + if err := b.Enqueue(ctx, job.New("emails", []byte("a"))); err != nil { + t.Fatalf("Enqueue emails: %v", err) + } + if err := b.Enqueue(ctx, job.New("sms", []byte("b"))); err != nil { + t.Fatalf("Enqueue sms: %v", err) + } + // a second key family for the same queue must not double-count it + if err := b.Enqueue(ctx, job.New("emails", []byte("c")), broker.WithDelay(time.Hour)); err != nil { + t.Fatalf("Enqueue emails delayed: %v", err) + } + + names, err := b.Queues(ctx) + if err != nil { + t.Fatalf("Queues: %v", err) + } + if len(names) != 2 || names[0] != "emails" || names[1] != "sms" { + t.Errorf("names = %v, want [emails sms]", names) + } +} + +func TestQueuesEmpty(t *testing.T) { + b, _ := newTestBroker(t) + names, err := b.Queues(context.Background()) + if err != nil { + t.Fatalf("Queues: %v", err) + } + if len(names) != 0 { + t.Errorf("names = %v, want empty", names) + } +} From 63f32c1695a60fb10b7c865d63c4ebb9075974c0 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 20:17:22 +0530 Subject: [PATCH 08/13] Add internal/api with enqueue endpoint and JSON scaffolding Implements the HTTP control surface package: api.New wires a Go 1.22 method+path ServeMux; enqueue handler (POST /api/queues/{queue}/jobs) returns 201, 400, 409, or 500; jobView, toJobView, parseInt64 helpers defined for Tasks 6-7; stats/listDLQ/requeueDLQ/queues are 501 stubs. Tests use Redis DB 12 and skip when Redis is unreachable. --- internal/api/api.go | 159 +++++++++++++++++++++++++++++++++++++++ internal/api/api_test.go | 106 ++++++++++++++++++++++++++ 2 files changed, 265 insertions(+) create mode 100644 internal/api/api.go create mode 100644 internal/api/api_test.go diff --git a/internal/api/api.go b/internal/api/api.go new file mode 100644 index 0000000..ec132f3 --- /dev/null +++ b/internal/api/api.go @@ -0,0 +1,159 @@ +// Package api is Relay's HTTP control surface: a thin JSON layer over the broker. +// Handlers parse and validate the request, call one broker method, and encode the +// result and status code — all queue semantics stay in internal/broker. +package api + +import ( + "encoding/json" + "errors" + "log/slog" + "net/http" + "strconv" + "time" + + "github.com/StrangeNoob/relay/internal/broker" + "github.com/StrangeNoob/relay/internal/job" +) + +// API holds the dependencies shared by the handlers. +type API struct { + broker *broker.Broker + logger *slog.Logger +} + +// New returns an http.Handler serving the Relay REST API over the given broker. +// A nil logger falls back to slog.Default(); tests pass a discard logger to stay +// quiet. Routes use stdlib method+path patterns (Go 1.22+). +func New(b *broker.Broker, logger *slog.Logger) http.Handler { + if logger == nil { + logger = slog.Default() + } + a := &API{broker: b, logger: logger} + mux := http.NewServeMux() + mux.HandleFunc("POST /api/queues/{queue}/jobs", a.enqueue) + mux.HandleFunc("GET /api/queues/{queue}/stats", a.stats) + mux.HandleFunc("GET /api/queues/{queue}/dlq", a.listDLQ) + mux.HandleFunc("POST /api/queues/{queue}/dlq/{id}/requeue", a.requeueDLQ) + mux.HandleFunc("GET /api/queues", a.queues) + return mux +} + +// writeJSON encodes v as the response body with the given status code. +func (a *API) writeJSON(w http.ResponseWriter, status int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + if err := json.NewEncoder(w).Encode(v); err != nil { + a.logger.Error("api: encoding response", "err", err) + } +} + +// writeError emits a {"error": msg} body with the given status code. +func (a *API) writeError(w http.ResponseWriter, status int, msg string) { + a.writeJSON(w, status, map[string]string{"error": msg}) +} + +// jobView is the JSON shape of a job in API responses. Payload is rendered as a +// string (UTF-8); created_at as RFC3339Nano. +type jobView struct { + ID string `json:"id"` + Queue string `json:"queue"` + Payload string `json:"payload"` + State string `json:"state"` + Attempts int `json:"attempts"` + MaxRetries int `json:"max_retries"` + Priority int `json:"priority"` + CreatedAt string `json:"created_at"` + IdempotencyKey string `json:"idempotency_key,omitempty"` +} + +func toJobView(j job.Job) jobView { + return jobView{ + ID: j.ID, + Queue: j.Queue, + Payload: string(j.Payload), + State: string(j.State), + Attempts: j.Attempts, + MaxRetries: j.MaxRetries, + Priority: j.Priority, + CreatedAt: j.CreatedAt.Format(time.RFC3339Nano), + IdempotencyKey: j.IdempotencyKey, + } +} + +type enqueueRequest struct { + Payload string `json:"payload"` + DelayMs int64 `json:"delay_ms"` + Priority *int `json:"priority"` + IdempotencyKey string `json:"idempotency_key"` +} + +type enqueueResponse struct { + ID string `json:"id"` + State string `json:"state"` +} + +// enqueue handles POST /api/queues/{queue}/jobs. +func (a *API) enqueue(w http.ResponseWriter, r *http.Request) { + queue := r.PathValue("queue") + var req enqueueRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + a.writeError(w, http.StatusBadRequest, "invalid JSON body") + return + } + + j := job.New(queue, []byte(req.Payload)) + var opts []broker.EnqueueOption + if req.DelayMs > 0 { + opts = append(opts, broker.WithDelay(time.Duration(req.DelayMs)*time.Millisecond)) + } + if req.Priority != nil { + opts = append(opts, broker.WithPriority(*req.Priority)) + } + if req.IdempotencyKey != "" { + opts = append(opts, broker.WithIdempotencyKey(req.IdempotencyKey)) + } + + if err := a.broker.Enqueue(r.Context(), j, opts...); err != nil { + if errors.Is(err, broker.ErrDuplicate) { + a.writeError(w, http.StatusConflict, "duplicate idempotency key") + return + } + a.logger.Error("api: enqueue failed", "queue", queue, "err", err) + a.writeError(w, http.StatusInternalServerError, "internal error") + return + } + + // Enqueue routes to delayed only for a future ready-at; mirror that here for + // the reported state (Enqueue takes the job by value, so j.State is unchanged). + state := job.StateReady + if req.DelayMs > 0 { + state = job.StateDelayed + } + a.writeJSON(w, http.StatusCreated, enqueueResponse{ID: j.ID, State: string(state)}) +} + +// parseInt64 parses a query value, returning def for an empty string. +func parseInt64(s string, def int64) (int64, error) { + if s == "" { + return def, nil + } + return strconv.ParseInt(s, 10, 64) +} + +// --- temporary stubs, replaced in Tasks 6-7 --- + +func (a *API) stats(w http.ResponseWriter, _ *http.Request) { + a.writeError(w, http.StatusNotImplemented, "not implemented") +} + +func (a *API) listDLQ(w http.ResponseWriter, _ *http.Request) { + a.writeError(w, http.StatusNotImplemented, "not implemented") +} + +func (a *API) requeueDLQ(w http.ResponseWriter, _ *http.Request) { + a.writeError(w, http.StatusNotImplemented, "not implemented") +} + +func (a *API) queues(w http.ResponseWriter, _ *http.Request) { + a.writeError(w, http.StatusNotImplemented, "not implemented") +} diff --git a/internal/api/api_test.go b/internal/api/api_test.go new file mode 100644 index 0000000..790f3be --- /dev/null +++ b/internal/api/api_test.go @@ -0,0 +1,106 @@ +package api_test + +import ( + "bytes" + "context" + "encoding/json" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "os" + "testing" + + "github.com/redis/go-redis/v9" + + "github.com/StrangeNoob/relay/internal/api" + "github.com/StrangeNoob/relay/internal/broker" +) + +// apiTestRedisDB is this package's dedicated Redis DB. broker tests use 15, +// worker 14, metrics 13; api claims 12 so parallel `go test ./...` never collides. +const apiTestRedisDB = 12 + +func newTestAPI(t *testing.T) (http.Handler, *broker.Broker, *redis.Client) { + t.Helper() + addr := os.Getenv("REDIS_ADDR") + if addr == "" { + addr = "localhost:6379" + } + rdb := redis.NewClient(&redis.Options{Addr: addr, DB: apiTestRedisDB}) + ctx := context.Background() + if err := rdb.Ping(ctx).Err(); err != nil { + t.Skipf("redis not available at %s: %v", addr, err) + } + if err := rdb.FlushDB(ctx).Err(); err != nil { + t.Fatalf("flushdb: %v", err) + } + t.Cleanup(func() { _ = rdb.Close() }) + b := broker.New(rdb) + h := api.New(b, slog.New(slog.NewTextHandler(io.Discard, nil))) + return h, b, rdb +} + +// do issues a request against the handler and returns the recorder. +func do(t *testing.T, h http.Handler, method, target string, body any) *httptest.ResponseRecorder { + t.Helper() + var r io.Reader + if body != nil { + buf, err := json.Marshal(body) + if err != nil { + t.Fatalf("marshal body: %v", err) + } + r = bytes.NewReader(buf) + } + req := httptest.NewRequest(method, target, r) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + return rec +} + +func TestEnqueueEndpointCreatesJob(t *testing.T) { + h, _, _ := newTestAPI(t) + + rec := do(t, h, http.MethodPost, "/api/queues/emails/jobs", map[string]any{ + "payload": "hello", + }) + if rec.Code != http.StatusCreated { + t.Fatalf("status = %d, want 201; body=%s", rec.Code, rec.Body.String()) + } + var resp struct { + ID string `json:"id"` + State string `json:"state"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &resp); err != nil { + t.Fatalf("decode: %v", err) + } + if resp.ID == "" { + t.Error("response id is empty") + } + if resp.State != "ready" { + t.Errorf("state = %q, want ready", resp.State) + } +} + +func TestEnqueueDuplicateReturns409(t *testing.T) { + h, _, _ := newTestAPI(t) + body := map[string]any{"payload": "x", "idempotency_key": "k1"} + + if rec := do(t, h, http.MethodPost, "/api/queues/emails/jobs", body); rec.Code != http.StatusCreated { + t.Fatalf("first enqueue status = %d, want 201", rec.Code) + } + rec := do(t, h, http.MethodPost, "/api/queues/emails/jobs", body) + if rec.Code != http.StatusConflict { + t.Errorf("duplicate status = %d, want 409", rec.Code) + } +} + +func TestEnqueueBadJSONReturns400(t *testing.T) { + h, _, _ := newTestAPI(t) + req := httptest.NewRequest(http.MethodPost, "/api/queues/emails/jobs", bytes.NewReader([]byte("{not json"))) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + if rec.Code != http.StatusBadRequest { + t.Errorf("status = %d, want 400", rec.Code) + } +} From aaac35df5b4d88abfd112ceb4322675cb6463bf3 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 20:21:26 +0530 Subject: [PATCH 09/13] Implement API stats and DLQ-list endpoints --- internal/api/api.go | 38 ++++++++++++++++++++--- internal/api/api_test.go | 67 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 4 deletions(-) diff --git a/internal/api/api.go b/internal/api/api.go index ec132f3..a38b66b 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -142,12 +142,42 @@ func parseInt64(s string, def int64) (int64, error) { // --- temporary stubs, replaced in Tasks 6-7 --- -func (a *API) stats(w http.ResponseWriter, _ *http.Request) { - a.writeError(w, http.StatusNotImplemented, "not implemented") +// stats handles GET /api/queues/{queue}/stats. +func (a *API) stats(w http.ResponseWriter, r *http.Request) { + queue := r.PathValue("queue") + s, err := a.broker.Stats(r.Context(), queue) + if err != nil { + a.logger.Error("api: stats failed", "queue", queue, "err", err) + a.writeError(w, http.StatusInternalServerError, "internal error") + return + } + a.writeJSON(w, http.StatusOK, s) } -func (a *API) listDLQ(w http.ResponseWriter, _ *http.Request) { - a.writeError(w, http.StatusNotImplemented, "not implemented") +// listDLQ handles GET /api/queues/{queue}/dlq?limit=&offset=. +func (a *API) listDLQ(w http.ResponseWriter, r *http.Request) { + queue := r.PathValue("queue") + limit, err := parseInt64(r.URL.Query().Get("limit"), 0) + if err != nil { + a.writeError(w, http.StatusBadRequest, "invalid limit") + return + } + offset, err := parseInt64(r.URL.Query().Get("offset"), 0) + if err != nil { + a.writeError(w, http.StatusBadRequest, "invalid offset") + return + } + jobs, err := a.broker.ListDLQ(r.Context(), queue, limit, offset) + if err != nil { + a.logger.Error("api: list dlq failed", "queue", queue, "err", err) + a.writeError(w, http.StatusInternalServerError, "internal error") + return + } + views := make([]jobView, 0, len(jobs)) + for _, j := range jobs { + views = append(views, toJobView(j)) + } + a.writeJSON(w, http.StatusOK, views) } func (a *API) requeueDLQ(w http.ResponseWriter, _ *http.Request) { diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 790f3be..0ecb099 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -15,6 +15,7 @@ import ( "github.com/StrangeNoob/relay/internal/api" "github.com/StrangeNoob/relay/internal/broker" + "github.com/StrangeNoob/relay/internal/job" ) // apiTestRedisDB is this package's dedicated Redis DB. broker tests use 15, @@ -104,3 +105,69 @@ func TestEnqueueBadJSONReturns400(t *testing.T) { t.Errorf("status = %d, want 400", rec.Code) } } + +// mustJob builds a job for tests via the broker's job package. +func mustJob(queue, payload string) job.Job { + return job.New(queue, []byte(payload)) +} + +func TestStatsEndpoint(t *testing.T) { + h, b, _ := newTestAPI(t) + ctx := context.Background() + for i := 0; i < 2; i++ { + if err := b.Enqueue(ctx, mustJob("emails", "x")); err != nil { + t.Fatalf("Enqueue: %v", err) + } + } + + rec := do(t, h, http.MethodGet, "/api/queues/emails/stats", nil) + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", rec.Code) + } + var s struct { + Ready int64 `json:"ready"` + Inflight int64 `json:"inflight"` + Delayed int64 `json:"delayed"` + DLQ int64 `json:"dlq"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &s); err != nil { + t.Fatalf("decode: %v", err) + } + if s.Ready != 2 { + t.Errorf("ready = %d, want 2", s.Ready) + } +} + +func TestDLQListEndpoint(t *testing.T) { + h, b, rdb := newTestAPI(t) + ctx := context.Background() + j := mustJob("emails", "dead") + j.State = "dead" + if err := rdb.HSet(ctx, "job:"+j.ID, j.ToHash()).Err(); err != nil { + t.Fatalf("HSet: %v", err) + } + if err := rdb.RPush(ctx, "q:emails:dlq", j.ID).Err(); err != nil { + t.Fatalf("RPush: %v", err) + } + _ = b + + rec := do(t, h, http.MethodGet, "/api/queues/emails/dlq", nil) + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200; body=%s", rec.Code, rec.Body.String()) + } + var jobs []map[string]any + if err := json.Unmarshal(rec.Body.Bytes(), &jobs); err != nil { + t.Fatalf("decode: %v", err) + } + if len(jobs) != 1 || jobs[0]["id"] != j.ID { + t.Errorf("jobs = %v, want one with id %s", jobs, j.ID) + } +} + +func TestDLQListBadLimitReturns400(t *testing.T) { + h, _, _ := newTestAPI(t) + rec := do(t, h, http.MethodGet, "/api/queues/emails/dlq?limit=abc", nil) + if rec.Code != http.StatusBadRequest { + t.Errorf("status = %d, want 400", rec.Code) + } +} From 5a2f74006e68ae2a01d6d288fa22f6683bce6b3a Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 20:24:43 +0530 Subject: [PATCH 10/13] Implement API requeue and queue-discovery endpoints --- internal/api/api.go | 30 +++++++++++++++++----- internal/api/api_test.go | 55 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 6 deletions(-) diff --git a/internal/api/api.go b/internal/api/api.go index a38b66b..2e0615d 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -140,8 +140,6 @@ func parseInt64(s string, def int64) (int64, error) { return strconv.ParseInt(s, 10, 64) } -// --- temporary stubs, replaced in Tasks 6-7 --- - // stats handles GET /api/queues/{queue}/stats. func (a *API) stats(w http.ResponseWriter, r *http.Request) { queue := r.PathValue("queue") @@ -180,10 +178,30 @@ func (a *API) listDLQ(w http.ResponseWriter, r *http.Request) { a.writeJSON(w, http.StatusOK, views) } -func (a *API) requeueDLQ(w http.ResponseWriter, _ *http.Request) { - a.writeError(w, http.StatusNotImplemented, "not implemented") +// requeueDLQ handles POST /api/queues/{queue}/dlq/{id}/requeue. +func (a *API) requeueDLQ(w http.ResponseWriter, r *http.Request) { + queue := r.PathValue("queue") + id := r.PathValue("id") + ok, err := a.broker.RequeueDLQ(r.Context(), queue, id) + if err != nil { + a.logger.Error("api: requeue failed", "queue", queue, "id", id, "err", err) + a.writeError(w, http.StatusInternalServerError, "internal error") + return + } + if !ok { + a.writeError(w, http.StatusNotFound, "job not found in dlq") + return + } + a.writeJSON(w, http.StatusOK, map[string]bool{"requeued": true}) } -func (a *API) queues(w http.ResponseWriter, _ *http.Request) { - a.writeError(w, http.StatusNotImplemented, "not implemented") +// queues handles GET /api/queues. +func (a *API) queues(w http.ResponseWriter, r *http.Request) { + names, err := a.broker.Queues(r.Context()) + if err != nil { + a.logger.Error("api: queues failed", "err", err) + a.writeError(w, http.StatusInternalServerError, "internal error") + return + } + a.writeJSON(w, http.StatusOK, names) } diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 0ecb099..347ad32 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -171,3 +171,58 @@ func TestDLQListBadLimitReturns400(t *testing.T) { t.Errorf("status = %d, want 400", rec.Code) } } + +func TestRequeueEndpointMovesJobBack(t *testing.T) { + h, _, rdb := newTestAPI(t) + ctx := context.Background() + j := mustJob("emails", "dead") + j.State = "dead" + if err := rdb.HSet(ctx, "job:"+j.ID, j.ToHash()).Err(); err != nil { + t.Fatalf("HSet: %v", err) + } + if err := rdb.RPush(ctx, "q:emails:dlq", j.ID).Err(); err != nil { + t.Fatalf("RPush: %v", err) + } + + rec := do(t, h, http.MethodPost, "/api/queues/emails/dlq/"+j.ID+"/requeue", nil) + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200; body=%s", rec.Code, rec.Body.String()) + } + if n, _ := rdb.ZCard(ctx, "q:emails:ready").Result(); n != 1 { + t.Errorf("ready card = %d, want 1", n) + } + if n, _ := rdb.LLen(ctx, "q:emails:dlq").Result(); n != 0 { + t.Errorf("dlq len = %d, want 0", n) + } +} + +func TestRequeueUnknownReturns404(t *testing.T) { + h, _, _ := newTestAPI(t) + rec := do(t, h, http.MethodPost, "/api/queues/emails/dlq/nope/requeue", nil) + if rec.Code != http.StatusNotFound { + t.Errorf("status = %d, want 404", rec.Code) + } +} + +func TestQueuesEndpointListsNames(t *testing.T) { + h, b, _ := newTestAPI(t) + ctx := context.Background() + if err := b.Enqueue(ctx, mustJob("emails", "a")); err != nil { + t.Fatalf("Enqueue: %v", err) + } + if err := b.Enqueue(ctx, mustJob("sms", "b")); err != nil { + t.Fatalf("Enqueue: %v", err) + } + + rec := do(t, h, http.MethodGet, "/api/queues", nil) + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", rec.Code) + } + var names []string + if err := json.Unmarshal(rec.Body.Bytes(), &names); err != nil { + t.Fatalf("decode: %v", err) + } + if len(names) != 2 || names[0] != "emails" || names[1] != "sms" { + t.Errorf("names = %v, want [emails sms]", names) + } +} From 1d22704294bc20a3325c11ad1d70bbe9fe7aaba2 Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 20:28:00 +0530 Subject: [PATCH 11/13] Add cmd/server: HTTP API + /metrics + /healthz with graceful shutdown --- cmd/server/main.go | 99 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 cmd/server/main.go diff --git a/cmd/server/main.go b/cmd/server/main.go new file mode 100644 index 0000000..2ab5ab5 --- /dev/null +++ b/cmd/server/main.go @@ -0,0 +1,99 @@ +// Command server runs Relay's HTTP control surface: the JSON API plus a +// Prometheus /metrics endpoint and a health check. It is a thin wiring layer; +// all behaviour lives in internal/api, internal/broker, and internal/metrics. +package main + +import ( + "context" + "flag" + "log/slog" + "net/http" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/redis/go-redis/v9" + + "github.com/StrangeNoob/relay/internal/api" + "github.com/StrangeNoob/relay/internal/broker" + "github.com/StrangeNoob/relay/internal/metrics" +) + +func main() { + addr := flag.String("addr", ":8080", "HTTP listen address") + redisAddr := flag.String("redis", envOr("REDIS_ADDR", "localhost:6379"), "Redis address") + queuesFlag := flag.String("queues", "", "comma-separated queues for the /metrics depth collector") + flag.Parse() + + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + slog.SetDefault(logger) + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + rdb := redis.NewClient(&redis.Options{Addr: *redisAddr}) + defer func() { _ = rdb.Close() }() + if err := rdb.Ping(ctx).Err(); err != nil { + logger.Error("cannot reach redis", "addr", *redisAddr, "err", err) + os.Exit(1) + } + + // A metrics recorder on the broker means API enqueues are counted; a depth + // collector for the configured queues exposes live gauges from the server. + rec := metrics.NewRecorder() + if qs := splitQueues(*queuesFlag); len(qs) > 0 { + rec.Registry().MustRegister(metrics.NewDepthCollector(rdb, qs...)) + } + b := broker.New(rdb, broker.WithMetrics(rec)) + + mux := http.NewServeMux() + mux.Handle("/api/", api.New(b, logger)) + mux.Handle("/metrics", promhttp.HandlerFor(rec.Registry(), promhttp.HandlerOpts{})) + mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) + + srv := &http.Server{Addr: *addr, Handler: mux} + go func() { + logger.Info("relay server listening", "addr", *addr, "redis", *redisAddr) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Error("server error", "err", err) + } + }() + + <-ctx.Done() + logger.Info("shutdown signal received") + shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := srv.Shutdown(shutCtx); err != nil { + logger.Error("server shutdown error", "err", err) + } + logger.Info("relay server stopped cleanly") +} + +// envOr returns the environment value for key, or def when it is unset/empty. +func envOr(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +// splitQueues parses a comma-separated queue list, trimming blanks. +func splitQueues(s string) []string { + if strings.TrimSpace(s) == "" { + return nil + } + parts := strings.Split(s, ",") + out := make([]string, 0, len(parts)) + for _, p := range parts { + if p = strings.TrimSpace(p); p != "" { + out = append(out, p) + } + } + return out +} From 9154b4b55c5224b5d79b90df450d6efcdd7e2ebc Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 20:32:28 +0530 Subject: [PATCH 12/13] Document Phase 3a: HTTP API and server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Update CLAUDE.md to reflect the shipped HTTP API package and server: status, "what exists today" bullets, new broker methods and requeue.lua script, DLQ table row, layout tree, build order, known-limitations (API/auth/payload/paging caveats), DB allocation (api → DB 12), and end-to-end run commands. --- CLAUDE.md | 56 ++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 01e5e94..091c195 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -10,30 +10,41 @@ project: the point is to *prove understanding of queue internals*, not to wrap a library. Do not introduce a queue dependency (BullMQ, asynq, Machinery, Celery, etc.) — the mechanics are the deliverable. -**Status: Phase 1 complete; Phase 2 complete.** The core engine plus delayed jobs, the promoter, -retry backoff, priority, idempotency enforcement, per-queue rate limiting, and Prometheus metrics -are built, tested against a real Redis under `-race`, and CI is green. Phase 3 (API/dashboard) is -next. Repo: . What exists today: +**Status: Phase 1 complete; Phase 2 complete; Phase 3 in progress — 3a (HTTP API + server) ✅ +done.** The core engine plus delayed jobs, the promoter, retry backoff, priority, idempotency +enforcement, per-queue rate limiting, Prometheus metrics, and the JSON REST API + server are +built, tested against a real Redis under `-race`, and CI is green. Dashboard (3b), producer SDK +(3c), and packaging/deploy (3d) remain. Repo: . What +exists today: - `internal/job` — the `Job` model + Redis-hash encoding (`ToHash`/`FromHash`). - `internal/broker` — `Enqueue` (with `WithDelay`/`WithReadyAt`/`WithPriority`/`WithIdempotencyKey` options), atomic `Claim`, `Ack`, - `Nack` (full-jitter backoff via the delayed set), `Reap`, `Promote`, `Extend` (heartbeat), with + `Nack` (full-jitter backoff via the delayed set), `Reap`, `Promote`, `Extend` (heartbeat), + `Stats` (ZCARD/LLEN snapshot per queue), `ListDLQ` (paged DLQ inspection), `RequeueDLQ` + (atomic dlq→ready reset via `requeue.lua`), `Queues` (SCAN-based queue discovery), with Lua under `internal/broker/scripts/`: `enqueue.lua`, `claim.lua`, `ack.lua`, `nack.lua`, - `reaper.lua`, `promote.lua`, `heartbeat.lua`. Broker options: `WithBackoff`, `WithDedupTTL`, - `WithRateLimit(queue, rate, burst)` (token-bucket per-queue rate limiting via Redis hash), - `WithMetrics(m)` (installs a `broker.Metrics` implementation; default is a no-op). + `reaper.lua`, `promote.lua`, `heartbeat.lua`, `requeue.lua`. Broker options: `WithBackoff`, + `WithDedupTTL`, `WithRateLimit(queue, rate, burst)` (token-bucket per-queue rate limiting via + Redis hash), `WithMetrics(m)` (installs a `broker.Metrics` implementation; default is a no-op). - `internal/metrics` — Prometheus `Recorder` (implements `broker.Metrics`; counters `relay_jobs_*_total`, histogram `relay_job_latency_seconds`, all labelled by queue) and `DepthCollector` (a `prometheus.Collector` reporting `relay_queue_depth{queue,state}` gauges by reading ZCARD/LLEN at scrape time). - `internal/worker` — `Worker` (claim loop, dispatch, heartbeat, graceful shutdown), plus `Reaper` and `Promoter` background loops sharing one `runDrainLoop` helper. +- `internal/api` — JSON REST API over stdlib `net/http` (Go 1.22 method+path routing). Endpoints: + `POST /api/queues/{queue}/jobs` (enqueue; 409 on idempotency dup), `GET /api/queues/{queue}/stats`, + `GET /api/queues/{queue}/dlq?limit=&offset=`, `POST /api/queues/{queue}/dlq/{id}/requeue` + (404 if not in DLQ), `GET /api/queues`. Constructed via `api.New(b, logger) http.Handler`. - `cmd/worker`, `cmd/demo` — thin runnable entrypoints (worker pool + reaper + promoter; load generator with `--delay`). `cmd/worker` accepts `--metrics-addr` (default "" = off); when set, serves `/metrics` and registers the depth collector with graceful shutdown. +- `cmd/server` — wires Redis + broker (with a `metrics.Recorder` so API enqueues are counted) + + the API handler + `/metrics` + `/healthz`; graceful shutdown on SIGINT/SIGTERM. Flags: `-addr`, + `-redis`, `-queues` (comma-separated queues for the depth collector). - `.github/workflows/ci.yml` — Redis service + `go test -race` + `golangci-lint`. -Phase 3 (API/dashboard/`cmd/server`, docker-compose, deploy) is **not** built yet. +Dashboard (3b), producer SDK (3c), and packaging/deploy (3d) are **not** built yet. ## Source of truth @@ -70,7 +81,8 @@ spec disagree, the spec wins until the spec is deliberately updated. (`broker.WithBackoff`). - **Idempotency is enqueue-only, TTL-window.** A keyed duplicate is dropped within the dedup TTL (default 24h, `WithDedupTTL`); the key is not released on completion. Delivery remains at-least-once — consumers needing exactly-once effects still dedup on the key. - **Rate-limit config is per-worker, not stored in Redis.** All workers on a queue must register the same `WithRateLimit` (they share one Redis bucket and pass rate/burst on every claim); mismatched configs refill inconsistently. A rate-limited claim is indistinguishable from an empty queue to the worker (it polls again). -- **Metrics are per-process and opt-in.** `broker.WithMetrics` installs a Prometheus recorder (default is a no-op); `cmd/worker --metrics-addr` serves `/metrics`. Counters/latency are per worker process — aggregate across workers in Prometheus. Queue-depth gauges read shared Redis at scrape time (one round-trip per queue/state), so every worker reports the same depths (aggregate with max/avg, not sum). Label cardinality is per queue. The endpoint lives on `cmd/worker` until the Phase 3 server exists. +- **Metrics are per-process and opt-in.** `broker.WithMetrics` installs a Prometheus recorder (default is a no-op); `cmd/worker --metrics-addr` serves `/metrics`. Counters/latency are per worker process — aggregate across workers in Prometheus. Queue-depth gauges read shared Redis at scrape time (one round-trip per queue/state), so every worker reports the same depths (aggregate with max/avg, not sum). Label cardinality is per queue. `cmd/server` also exposes `/metrics`; depth gauges there cover only the queues listed in `-queues` at startup. +- **HTTP API is demo-grade, no auth.** The API server has no authentication or authorization layer. Payloads are treated as UTF-8 strings (base64 encoding for binary payloads is a future addition). DLQ paging is offset/limit (no cursor). `Queues` discovery uses Redis SCAN (eventually-consistent) and sorts results in Go. ## Redis data model & job lifecycle (the architecture in brief) @@ -83,7 +95,7 @@ and the whole engine follows: | `job:{id}` | hash | — | full job. Fields: `id, queue, payload, state, attempts, max_retries, created_at, idempotency_key`. **No deadline field** — the deadline lives only as the `inflight` ZSET score. | | `q:{name}:ready` | ZSET | priority | claimable now; claim pops the best score = priority (higher first), oldest-first within a priority | | `q:{name}:inflight` | ZSET | visibility deadline | claimed-not-acked; **reaper scans this for expiry** | -| `q:{name}:dlq` | list | — | exhausted jobs (inspect/requeue surface is Phase 3) | +| `q:{name}:dlq` | list | — | exhausted jobs; inspect via `ListDLQ` + requeue via `RequeueDLQ` (dlq→ready, attempts reset to 0, atomic `requeue.lua`) | | `q:{name}:delayed` | ZSET | ready-at ts | scheduled + backoff jobs; **promoter scans this** and moves due ones (`ready-at ≤ now`) to `ready` | | `q:{name}:dedup:{key}` | string | — | per-key string with TTL; **enqueue dedup** — a keyed duplicate is dropped with ErrDuplicate | | `q:{name}:ratelimit` | hash | — | per-queue token bucket (`tokens`, `ts`); claim consumes a token only on a successful pop | @@ -112,15 +124,16 @@ deadline forward while a long handler runs, so the reaper does not reclaim live ``` cmd/worker/main.go # ✅ worker pool + reaper + promoter daemon cmd/demo/main.go # ✅ load generator (--delay) -cmd/server/main.go # ◻ API+dashboard server (Phase 3) +cmd/server/main.go # ✅ API + /metrics + /healthz server (Phase 3a) internal/job/ # ✅ job model + hash encoding -internal/broker/ # ✅ enqueue/claim/ack/nack/reap/promote/extend -internal/broker/scripts/*.lua # ✅ enqueue, claim, ack, nack, reaper, promote, heartbeat (go:embed) +internal/broker/ # ✅ enqueue/claim/ack/nack/reap/promote/extend/stats/dlq/queues +internal/broker/scripts/*.lua # ✅ enqueue, claim, ack, nack, reaper, promote, heartbeat, requeue (go:embed) internal/worker/ # ✅ Worker + Reaper + Promoter runtime internal/metrics/ # ✅ Prometheus Recorder + DepthCollector -internal/{client,api}/ # ◻ producer SDK / HTTP API (Phase 3) -web/ # ◻ embedded dashboard assets (Phase 3) -deployments/docker-compose.yml # ◻ redis + server + N workers + demo (Phase 3) +internal/api/ # ✅ JSON REST API handler (Phase 3a) +internal/client/ # ◻ producer SDK (Phase 3c) +web/ # ◻ embedded dashboard assets (Phase 3b) +deployments/docker-compose.yml # ◻ redis + server + N workers + demo (Phase 3d) .github/workflows/ci.yml # ✅ Redis service + go test -race + golangci-lint ``` @@ -131,7 +144,7 @@ Use `internal/` for everything not meant as a public import surface. `cmd/` hold 1. **Phase 1 — core: ✅ done.** job model; enqueue/claim/ack/nack Lua; reaper; worker runtime; basic DLQ; integration tests; CI. A working, testable queue ships first. 2. **Phase 2 — depth: ✅ done.** delayed jobs + promoter ✅; backoff + jitter ✅; priority ✅; idempotency ✅; rate limiting ✅; Prometheus metrics ✅. -3. **Phase 3 — polish:** dashboard; docker-compose demo; deployed demo; README + diagram. +3. **Phase 3 — polish (in progress):** 3a HTTP API + server ✅ done; 3b dashboard (web/); 3c producer SDK (`internal/client`); 3d docker-compose + deploy + README diagram. 4. **Future work (NOT now):** Postgres-backed (`SKIP LOCKED`) mode; exactly-once via consumer outbox. ## Conventions @@ -157,9 +170,9 @@ Use `internal/` for everything not meant as a public import surface. `cmd/` hold Neither violates the "build the queue from scratch on Redis" rule. The queue logic is ours. - **Tests need a real Redis** at `localhost:6379` (override with `REDIS_ADDR`). Each Redis-using package claims its own logical DB so `go test ./...` runs them in parallel without flushing each - other (broker → **DB 15**, worker → **DB 14**, metrics → **DB 13**; a new one picks another), with - `FlushDB` per test, and they **skip** (not fail) when Redis is unreachable — so a green local run - with no Redis means those suites were skipped. CI provides a Redis service. + other (broker → **DB 15**, worker → **DB 14**, metrics → **DB 13**, api → **DB 12**; a new one + picks another), with `FlushDB` per test, and they **skip** (not fail) when Redis is unreachable + — so a green local run with no Redis means those suites were skipped. CI provides a Redis service. ```sh go build ./... @@ -169,6 +182,7 @@ golangci-lint run # CI pins v2.12.2; default linters, # run it end to end against a local Redis: go run ./cmd/worker -queue demo -concurrency 4 & # worker pool + reaper go run ./cmd/demo -queue demo -count 100 # enqueue load +go run ./cmd/server -queues demo # API :8080 + /metrics + /healthz ``` Keep this section updated as the Makefile / docker-compose take shape. From c80ce7aed241a5a76ed294b301569be7f5e7820f Mon Sep 17 00:00:00 2001 From: StrangeNoob Date: Mon, 8 Jun 2026 22:10:43 +0530 Subject: [PATCH 13/13] Document RequeueDLQ as an operator-driven dlq->ready transition in the lifecycle --- CLAUDE.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 091c195..dcf70c5 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -113,11 +113,13 @@ enqueue(WithDelay) → delayed ──[promoter: ready-at≤now]──→ ready nack → attempts