Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 31 additions & 19 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ library. Do not introduce a queue dependency (BullMQ, asynq, Machinery, Celery,
mechanics are the deliverable.

**Status: Phase 1 complete; Phase 2 complete; Phase 3 in progress — 3a (HTTP API + server) ✅,
3b (dashboard) ✅ done.** The core engine plus delayed jobs, the promoter, retry backoff, priority,
idempotency enforcement, per-queue rate limiting, Prometheus metrics, the JSON REST API + server,
and the embedded React dashboard are built, tested against a real Redis under `-race`, and CI is
green. Producer SDK (3c) and packaging/deploy (3d) remain. Repo: <https://github.com/StrangeNoob/relay>.
What exists today:
3b (dashboard) ✅, 3c (producer SDK) ✅ done.** The core engine plus delayed jobs, the promoter,
retry backoff, priority, idempotency enforcement, per-queue rate limiting, Prometheus metrics, the
JSON REST API + server, the embedded React dashboard, and the stdlib-only HTTP producer SDK are
built, tested against a real Redis under `-race`, and CI is green. Only packaging/deploy (3d)
remains. Repo: <https://github.com/StrangeNoob/relay>. What exists today:

- `internal/job` — the `Job` model + Redis-hash encoding (`ToHash`/`FromHash`).
- `internal/broker` — `Enqueue` (with `WithDelay`/`WithReadyAt`/`WithPriority`/`WithIdempotencyKey` options), atomic `Claim`, `Ack`,
Expand All @@ -40,9 +40,16 @@ What exists today:
(404 if not in DLQ), `GET /api/queues`, `GET /api/stream` (SSE; pushes per-queue depth +
`processed`/`dead` counters to every connected dashboard every ~1 s; implemented in
`internal/api/stream.go`). Constructed via `api.New(b, logger) http.Handler`.
- `cmd/worker`, `cmd/demo` — thin runnable entrypoints (worker pool + reaper + promoter; load
generator with `--delay`). `cmd/worker` accepts `--metrics-addr` (default "" = off); when set,
serves `/metrics` and registers the depth collector with graceful shutdown.
- `internal/client` — stdlib-only HTTP producer SDK (no broker/job/redis import; no new Go
dependency). `New(baseURL, ...Option)` (options: `WithHTTPClient`, `WithTimeout`). Methods:
`Enqueue` (with `WithDelay`/`WithPriority`/`WithIdempotencyKey`; maps 409 → `ErrDuplicate`),
`Stats`, `ListDLQ`, `Requeue` (maps 404 → `ErrNotFound`), `Queues`. Typed errors:
`ErrDuplicate`, `ErrNotFound`, `APIError`.
- `cmd/worker`, `cmd/demo` — thin runnable entrypoints. `cmd/worker`: worker pool + reaper +
promoter daemon; accepts `--metrics-addr` (default "" = off); when set, serves `/metrics` and
registers the depth collector with graceful shutdown. `cmd/demo`: load generator that produces
jobs through the HTTP SDK (`-server` flag, e.g. `-server http://localhost:8080`); requires
`cmd/server` running; no longer imports broker/job/redis directly.
- `cmd/server` — wires Redis + broker (with a `metrics.Recorder` so API enqueues are counted) +
the API handler + `/metrics` + `/healthz` + embedded dashboard at `/`; graceful shutdown on
SIGINT/SIGTERM. Flags: `-addr`, `-redis`, `-queues` (comma-separated queues for the depth
Expand All @@ -55,7 +62,7 @@ What exists today:
- `.github/workflows/ci.yml` — Redis service + `go test -race` + `golangci-lint` + dashboard
build/typecheck/test/dist-sync check.

Producer SDK (3c) and packaging/deploy (3d) are **not** built yet.
Packaging/deploy (3d) is **not** built yet.

## Source of truth

Expand Down Expand Up @@ -97,6 +104,8 @@ spec disagree, the spec wins until the spec is deliberately updated.
- **Dashboard charts are in-memory rolling windows.** The client-side time-series buffer resets on page reload; there is no server-side history. `processed`/`dead` counters are monotonic Redis INCRs (no reset); the dashboard derives a rate by differencing successive SSE snapshots.
- **SSE is per-connection.** Each open dashboard tab runs its own server-side ticker goroutine reading Redis every ~1 s. This is fine for a demo; a production deployment would fan-out from a single poller.
- **Committed `web/dist` must be rebuilt on UI change.** The Go binary embeds the committed dist; CI has a `git diff --exit-code -- dist` step to catch stale builds. Run `cd web && npm run build` and commit the updated dist whenever source changes.
- **Producer SDK does no client-side retries.** `internal/client` makes one HTTP request per call; transient failures are surfaced as errors. The caller is responsible for retry logic (with backoff) if needed.
- **`cmd/demo` requires a running `cmd/server`.** The demo load generator now produces jobs through the HTTP SDK (`-server` flag) and no longer talks to Redis directly. Running `cmd/demo` without `cmd/server` will produce connection errors immediately.

## Redis data model & job lifecycle (the architecture in brief)

Expand Down Expand Up @@ -143,15 +152,15 @@ they do not introduce any new job-state transition.

```
cmd/worker/main.go # ✅ worker pool + reaper + promoter daemon
cmd/demo/main.go # ✅ load generator (--delay)
cmd/demo/main.go # ✅ load generator; produces via HTTP SDK (-server flag; needs cmd/server)
cmd/server/main.go # ✅ API + /metrics + /healthz server (Phase 3a)
internal/job/ # ✅ job model + hash encoding
internal/broker/ # ✅ enqueue/claim/ack/nack/reap/promote/extend/stats/dlq/queues
internal/broker/scripts/*.lua # ✅ enqueue, claim, ack, nack, reaper, promote, heartbeat, requeue (go:embed)
internal/worker/ # ✅ Worker + Reaper + Promoter runtime
internal/metrics/ # ✅ Prometheus Recorder + DepthCollector
internal/api/ # ✅ JSON REST API handler (Phase 3a)
internal/client/ # producer SDK (Phase 3c)
internal/client/ # ✅ stdlib-only HTTP producer SDK (Phase 3c)
web/ # ✅ Vite+React dashboard + web/embed.go (Phase 3b)
deployments/docker-compose.yml # ◻ redis + server + N workers + demo (Phase 3d)
.github/workflows/ci.yml # ✅ Redis service + go test -race + golangci-lint + dashboard CI
Expand All @@ -164,7 +173,7 @@ Use `internal/` for everything not meant as a public import surface. `cmd/` hold

1. **Phase 1 — core: ✅ done.** job model; enqueue/claim/ack/nack Lua; reaper; worker runtime; basic DLQ; integration tests; CI. A working, testable queue ships first.
2. **Phase 2 — depth: ✅ done.** delayed jobs + promoter ✅; backoff + jitter ✅; priority ✅; idempotency ✅; rate limiting ✅; Prometheus metrics ✅.
3. **Phase 3 — polish (in progress):** 3a HTTP API + server ✅; 3b dashboard ✅; 3c producer SDK (`internal/client`); 3d docker-compose + deploy + README diagram.
3. **Phase 3 — polish (in progress):** 3a HTTP API + server ✅; 3b dashboard ✅; 3c producer SDK (`internal/client`); 3d docker-compose + deployed demo + README diagram.
4. **Future work (NOT now):** Postgres-backed (`SKIP LOCKED`) mode; exactly-once via consumer outbox.

## Conventions
Expand Down Expand Up @@ -192,22 +201,25 @@ Use `internal/` for everything not meant as a public import surface. `cmd/` hold
`npm run test` (vitest), and `npm run build` are all independent of the Go module. The Go binary
embeds the committed `web/dist` at compile time via `go:embed` (`web/embed.go`), so `go build
./...` needs no Node toolchain — only a stale dist would be an issue (CI catches it).
- **`internal/client` is stdlib-only.** The producer SDK imports no broker, job, or redis package
and adds no new Go dependency. The Go module still depends only on go-redis and
prometheus/client_golang.
- **Tests need a real Redis** at `localhost:6379` (override with `REDIS_ADDR`). Each Redis-using
package claims its own logical DB so `go test ./...` runs them in parallel without flushing each
other (broker → **DB 15**, worker → **DB 14**, metrics → **DB 13**, api → **DB 12**; a new one
picks another), with `FlushDB` per test, and they **skip** (not fail) when Redis is unreachable
so a green local run with no Redis means those suites were skipped. CI provides a Redis
service. Frontend tests (`npm run test`) need no Redis.
other (broker → **DB 15**, worker → **DB 14**, metrics → **DB 13**, api → **DB 12**, client →
**DB 11**), with `FlushDB` per test, and they **skip** (not fail) when Redis is unreachable
so a green local run with no Redis means those suites were skipped. CI provides a Redis service.
Frontend tests (`npm run test`) need no Redis.

```sh
go build ./...
go test -race ./... # needs Redis on :6379 (or REDIS_ADDR)
golangci-lint run # CI pins v2.12.2; default linters, currently clean

# run it end to end against a local Redis:
go run ./cmd/worker -queue demo -concurrency 4 & # worker pool + reaper
go run ./cmd/demo -queue demo -count 100 # enqueue load
go run ./cmd/server -queues demo # API + dashboard at http://localhost:8080
go run ./cmd/server -queues demo # API + dashboard at http://localhost:8080
go run ./cmd/worker -queue demo -concurrency 4 & # worker pool + reaper
go run ./cmd/demo -server http://localhost:8080 -queue demo -count 100 # enqueue via SDK (needs cmd/server)

# frontend dev/test (requires Node 20+):
cd web && npm ci && npm run typecheck && npm run test && npm run build
Expand Down
38 changes: 13 additions & 25 deletions cmd/demo/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Command demo is a load generator: it enqueues a batch of jobs onto a queue so
// a running worker pool has something to chew on. Thin wiring over the client
// path of internal/broker.
// Command demo is a load generator: it enqueues a batch of jobs onto a queue
// through the Relay HTTP API (via internal/client) so a running worker pool has
// something to chew on. It is a pure SDK consumer — it needs cmd/server running.
package main

import (
Expand All @@ -11,14 +11,11 @@ import (
"log/slog"
"os"

"github.com/redis/go-redis/v9"

"github.com/StrangeNoob/relay/internal/broker"
"github.com/StrangeNoob/relay/internal/job"
"github.com/StrangeNoob/relay/internal/client"
)

func main() {
addr := flag.String("redis", "localhost:6379", "Redis address")
server := flag.String("server", "http://localhost:8080", "Relay server base URL")
queue := flag.String("queue", "default", "queue to enqueue into")
count := flag.Int("count", 100, "number of jobs to enqueue")
delay := flag.Duration("delay", 0, "schedule jobs this far in the future (0 = immediate)")
Expand All @@ -27,38 +24,29 @@ func main() {
flag.Parse()

logger := slog.New(slog.NewTextHandler(os.Stdout, nil))

ctx := context.Background()
rdb := redis.NewClient(&redis.Options{Addr: *addr})
defer func() { _ = rdb.Close() }()

if err := rdb.Ping(ctx).Err(); err != nil {
logger.Error("cannot reach redis", "addr", *addr, "err", err)
os.Exit(1)
}
c := client.New(*server)

b := broker.New(rdb)
for i := range *count {
payload := fmt.Sprintf(`{"n":%d}`, i)
j := job.New(*queue, []byte(payload))
var opts []broker.EnqueueOption
var opts []client.EnqueueOption
if *delay > 0 {
opts = append(opts, broker.WithDelay(*delay))
opts = append(opts, client.WithDelay(*delay))
}
if *priority != 0 {
opts = append(opts, broker.WithPriority(*priority))
opts = append(opts, client.WithPriority(*priority))
}
if *idempotencyKey != "" {
opts = append(opts, broker.WithIdempotencyKey(*idempotencyKey))
opts = append(opts, client.WithIdempotencyKey(*idempotencyKey))
}
switch err := b.Enqueue(ctx, j, opts...); {
case errors.Is(err, broker.ErrDuplicate):
switch _, err := c.Enqueue(ctx, *queue, []byte(payload), opts...); {
case errors.Is(err, client.ErrDuplicate):
logger.Info("duplicate dropped", "i", i, "key", *idempotencyKey)
case err != nil:
logger.Error("enqueue failed", "i", i, "err", err)
os.Exit(1)
}
}

logger.Info("enqueued jobs", "count", *count, "queue", *queue, "redis", *addr)
logger.Info("enqueued jobs", "count", *count, "queue", *queue, "server", *server)
}
Loading
Loading