diff --git a/CLAUDE.md b/CLAUDE.md index 44957b9..d5f8106 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -116,7 +116,7 @@ spec disagree, the spec wins until the spec is deliberately updated. - **Metrics are per-process and opt-in.** `broker.WithMetrics` installs a Prometheus recorder (default is a no-op); `cmd/worker --metrics-addr` serves `/metrics`. Counters/latency are per worker process — aggregate across workers in Prometheus. Queue-depth gauges read shared Redis at scrape time (one round-trip per queue/state), so every worker reports the same depths (aggregate with max/avg, not sum). Label cardinality is per queue. `cmd/server` also exposes `/metrics`; depth gauges there cover only the queues listed in `-queues` at startup. - **HTTP API is demo-grade, no auth.** The API server has no authentication or authorization layer. Payloads are treated as UTF-8 strings (base64 encoding for binary payloads is a future addition). DLQ paging is offset/limit (no cursor). `Queues` discovery uses Redis SCAN (eventually-consistent) and sorts results in Go. - **Dashboard charts are in-memory rolling windows.** The client-side time-series buffer resets on page reload; there is no server-side history. `processed`/`dead` counters are monotonic Redis INCRs (no reset); the dashboard derives a rate by differencing successive SSE snapshots. -- **SSE is per-connection.** Each open dashboard tab runs its own server-side ticker goroutine reading Redis every ~1 s. This is fine for a demo; a production deployment would fan-out from a single poller. +- **SSE fan-out is per-process, single-poller.** While ≥1 dashboard is connected, one background goroutine per server process polls Redis every ~1 s, builds the snapshot, and broadcasts it to every connected dashboard (latest-wins per client, so a slow tab never blocks the poller; a late joiner is seeded from the last cached snapshot). Redis load is O(queues)/sec, independent of connection count; an idle server (no dashboards) does no polling. The poller is owned by the `hub` in `internal/api/hub.go` and is lazily started/stopped by subscriber count. Each server replica runs its own poller — there is no cross-replica fan-out (Redis Pub/Sub remains future work). - **Committed `web/dist` must be rebuilt on UI change.** The Go binary embeds the committed dist; CI has a `git diff --exit-code -- dist` step to catch stale builds. Run `cd web && npm run build` and commit the updated dist whenever source changes. - **Producer SDK does no client-side retries.** `internal/client` makes one HTTP request per call; transient failures are surfaced as errors. The caller is responsible for retry logic (with backoff) if needed. - **Bulk enqueue returns a count, not per-job IDs.** `POST .../jobs/bulk` (and `client.EnqueueBulk`) returns only `{enqueued, state}` — individual job IDs are not surfaced. Bulk has no idempotency support (`WithIdempotencyKey` is silently ignored). The cap is 10 000 jobs per request. All jobs in one bulk call must belong to the same queue. diff --git a/docs/superpowers/plans/2026-06-09-relay-sse-single-poller-fanout.md b/docs/superpowers/plans/2026-06-09-relay-sse-single-poller-fanout.md new file mode 100644 index 0000000..0e37077 --- /dev/null +++ b/docs/superpowers/plans/2026-06-09-relay-sse-single-poller-fanout.md @@ -0,0 +1,604 @@ +# SSE Single-Poller Fan-Out Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace per-connection SSE polling with one in-process hub so Redis load is O(queues)/sec regardless of how many dashboards are connected. + +**Architecture:** A new `hub` type owns a single background poller that, while ≥1 dashboard is connected, reads every queue's depths+counters once per interval, marshals one snapshot, and fans it out to all subscribers latest-wins. The `stream` handler shrinks to subscribe → relay → unsubscribe. The hub depends on a `snapshotSource` interface (satisfied by `*broker.Broker`) so its fan-out and lifecycle are unit-tested without Redis. Wire format is unchanged, so the committed `web/dist` client is untouched. + +**Tech Stack:** Go 1.24 (stdlib `net/http` SSE, `sync.Mutex`, `time.Ticker`, `context`), `go-redis` (only behind the broker), `log/slog`. Tests: stdlib `testing`, run under `-race`. + +**Spec:** `docs/superpowers/specs/2026-06-09-relay-sse-single-poller-fanout-design.md` + +--- + +## File Structure + +- **Create `internal/api/hub.go`** — the `snapshotSource` interface, `subscriber`, `hub`, and all hub methods (`newHub`, `subscribe`, `unsubscribe`, `run`, `pollAndBroadcast`, `send`). Owns all Redis polling + broadcast. This is where the snapshot-building logic (currently in `stream.go`'s `writeSnapshot`) moves to. +- **Create `internal/api/hub_test.go`** — white-box (`package api`) unit tests with a fake `snapshotSource`; no Redis. Covers lazy start/stop, fan-out, latest-wins, late-joiner cache, and poll-error survival. +- **Modify `internal/api/api.go`** — add a `hub *hub` field to `API`; construct it in `New`. +- **Modify `internal/api/stream.go`** — keep the `streamInterval` const and `queueSnapshot` struct; replace the handler with subscribe/relay/unsubscribe; delete `writeSnapshot`. +- **Modify `CLAUDE.md`** — rewrite the "SSE is per-connection" limitation entry to describe the single-poller fan-out. + +The existing real-Redis integration test `TestStreamEmitsSnapshot` (`internal/api/api_test.go:275`) must remain green unchanged — the hub's immediate first poll preserves instant populate. + +--- + +## Task 1: Build the fan-out hub (unit-tested, no Redis) + +**Files:** +- Create: `internal/api/hub.go` +- Test: `internal/api/hub_test.go` + +This task produces a fully-tested hub that does not yet touch the HTTP handler. It compiles against the existing `queueSnapshot` struct in `stream.go` (same package) and `broker.Stats`/`broker.Counters`. + +### Reference: existing types the hub reuses + +`queueSnapshot` (already defined in `internal/api/stream.go`, do NOT redefine): + +```go +type queueSnapshot struct { + Queue string `json:"queue"` + Ready int64 `json:"ready"` + Inflight int64 `json:"inflight"` + Delayed int64 `json:"delayed"` + DLQ int64 `json:"dlq"` + ProcessedTotal int64 `json:"processed_total"` + DeadTotal int64 `json:"dead_total"` +} +``` + +Broker types (`internal/broker/broker.go:428,456`): `broker.Stats{Ready, Inflight, Delayed, DLQ int64}` and `broker.Counters{Processed, Dead int64}`. + +- [ ] **Step 1: Write `internal/api/hub.go`** + +Create the file with the complete implementation: + +```go +package api + +import ( + "context" + "encoding/json" + "log/slog" + "sync" + "time" + + "github.com/StrangeNoob/relay/internal/broker" +) + +// snapshotSource is the slice of the broker the hub needs to build an SSE +// snapshot. *broker.Broker satisfies it; tests inject a fake so the hub's +// fan-out and lifecycle logic run without a real Redis. +type snapshotSource interface { + Queues(ctx context.Context) ([]string, error) + Stats(ctx context.Context, queue string) (broker.Stats, error) + Counters(ctx context.Context, queue string) (broker.Counters, error) +} + +// subscriber is one connected SSE client. ch is buffered with capacity 1 and +// written latest-wins (see send): a slow client only ever holds the newest +// snapshot and never blocks the poller. +type subscriber struct { + ch chan []byte +} + +// hub fans out one Redis poll to every connected SSE subscriber. A single +// background goroutine polls once per interval while at least one subscriber is +// connected (lazy: starts on the first subscribe, stops on the last +// unsubscribe), so Redis load is O(queues)/sec regardless of connection count. +type hub struct { + src snapshotSource + logger *slog.Logger + interval time.Duration + + mu sync.Mutex + subs map[*subscriber]struct{} + last []byte // most-recent marshalled snapshot, for instant populate + cancel context.CancelFunc // non-nil iff the poller goroutine is running +} + +// newHub builds an idle hub. It does not poll until the first subscribe. +func newHub(src snapshotSource, logger *slog.Logger, interval time.Duration) *hub { + return &hub{ + src: src, + logger: logger, + interval: interval, + subs: make(map[*subscriber]struct{}), + } +} + +// subscribe registers a new SSE client, lazily starting the poller when the hub +// was idle. It seeds the new subscriber with the last snapshot (if any) so a +// late joiner's UI populates without waiting for the next tick. +func (h *hub) subscribe() *subscriber { + s := &subscriber{ch: make(chan []byte, 1)} + h.mu.Lock() + if len(h.subs) == 0 { + ctx, cancel := context.WithCancel(context.Background()) + h.cancel = cancel + go h.run(ctx) + } + h.subs[s] = struct{}{} + last := h.last + h.mu.Unlock() + + // Seed from cache without blocking. If the poller already delivered a fresher + // snapshot between the unlock and here, the channel is full and we skip — + // never replace fresh with stale, never block. + if last != nil { + select { + case s.ch <- last: + default: + } + } + return s +} + +// unsubscribe removes a client. When the last subscriber leaves, the poller is +// cancelled so an idle server does no Redis work. +func (h *hub) unsubscribe(s *subscriber) { + h.mu.Lock() + defer h.mu.Unlock() + delete(h.subs, s) + if len(h.subs) == 0 && h.cancel != nil { + h.cancel() + h.cancel = nil + } +} + +// run is the single poller goroutine: an immediate snapshot, then one per +// interval until ctx is cancelled (by the last unsubscribe or server shutdown). +func (h *hub) run(ctx context.Context) { + h.pollAndBroadcast(ctx) + ticker := time.NewTicker(h.interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + h.pollAndBroadcast(ctx) + } + } +} + +// pollAndBroadcast reads every queue's depths and counters once, marshals the +// snapshot, caches it, and fans it out. A Redis or marshal error skips this +// tick (the poller and the cached snapshot survive). +func (h *hub) pollAndBroadcast(ctx context.Context) { + queues, err := h.src.Queues(ctx) + if err != nil { + h.logger.Error("api: stream listing queues", "err", err) + return + } + snaps := make([]queueSnapshot, 0, len(queues)) + for _, q := range queues { + st, err := h.src.Stats(ctx, q) + if err != nil { + h.logger.Error("api: stream stats", "queue", q, "err", err) + continue + } + ct, err := h.src.Counters(ctx, q) + if err != nil { + h.logger.Error("api: stream counters", "queue", q, "err", err) + continue + } + snaps = append(snaps, queueSnapshot{ + Queue: q, Ready: st.Ready, Inflight: st.Inflight, Delayed: st.Delayed, + DLQ: st.DLQ, ProcessedTotal: ct.Processed, DeadTotal: ct.Dead, + }) + } + buf, err := json.Marshal(snaps) + if err != nil { + h.logger.Error("api: stream marshal", "err", err) + return + } + h.mu.Lock() + h.last = buf + for s := range h.subs { + send(s, buf) + } + h.mu.Unlock() +} + +// send pushes buf to s latest-wins: if the buffer already holds a stale +// snapshot, drop it and enqueue the newest. Never blocks the caller. +func send(s *subscriber, buf []byte) { + select { + case s.ch <- buf: + default: + select { + case <-s.ch: + default: + } + select { + case s.ch <- buf: + default: + } + } +} +``` + +- [ ] **Step 2: Verify it compiles** + +Run: `go build ./internal/api/` +Expected: success (no output). If `queueSnapshot` is reported undefined, confirm it still exists in `stream.go` — do not redefine it here. + +- [ ] **Step 3: Write `internal/api/hub_test.go` (failing tests)** + +Create the file. It is white-box (`package api`) to reach the unexported hub. It defines a fake source that signals each poll over a buffered channel, enabling deterministic, sleep-light, `-race`-clean synchronization. + +```go +package api + +import ( + "bytes" + "context" + "errors" + "io" + "log/slog" + "sync" + "testing" + "time" + + "github.com/StrangeNoob/relay/internal/broker" +) + +func discardLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +// fakeSource is an in-memory snapshotSource. Every Queues call bumps a counter +// and emits a signal on polled (buffered, non-blocking) so tests can observe +// poll cadence without the source ever blocking the poller. +type fakeSource struct { + mu sync.Mutex + queues []string + failErr error + polled chan struct{} +} + +func newFakeSource(queues ...string) *fakeSource { + return &fakeSource{queues: queues, polled: make(chan struct{}, 1024)} +} + +func (f *fakeSource) setErr(err error) { + f.mu.Lock() + f.failErr = err + f.mu.Unlock() +} + +func (f *fakeSource) Queues(ctx context.Context) ([]string, error) { + f.mu.Lock() + err := f.failErr + qs := f.queues + f.mu.Unlock() + select { + case f.polled <- struct{}{}: + default: + } + if err != nil { + return nil, err + } + return qs, nil +} + +func (f *fakeSource) Stats(ctx context.Context, queue string) (broker.Stats, error) { + return broker.Stats{Ready: 1}, nil +} + +func (f *fakeSource) Counters(ctx context.Context, queue string) (broker.Counters, error) { + return broker.Counters{Processed: 7}, nil +} + +// waitForPoll blocks until the next poll signal or fails the test on timeout. +func waitForPoll(t *testing.T, polled <-chan struct{}) { + t.Helper() + select { + case <-polled: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for a poll") + } +} + +// assertNoPoll fails if any poll happens within d. +func assertNoPoll(t *testing.T, polled <-chan struct{}, d time.Duration) { + t.Helper() + select { + case <-polled: + t.Fatal("unexpected poll") + case <-time.After(d): + } +} + +// drain removes any buffered poll signals. +func drain(polled <-chan struct{}) { + for { + select { + case <-polled: + default: + return + } + } +} + +func TestHubLazyStartsOnFirstSubscribe(t *testing.T) { + f := newFakeSource("emails") + h := newHub(f, discardLogger(), 20*time.Millisecond) + assertNoPoll(t, f.polled, 80*time.Millisecond) // idle hub does not poll + sub := h.subscribe() + defer h.unsubscribe(sub) + waitForPoll(t, f.polled) // first subscribe starts the poller +} + +func TestHubStopsPollingAfterLastUnsubscribe(t *testing.T) { + f := newFakeSource("emails") + h := newHub(f, discardLogger(), 20*time.Millisecond) + sub := h.subscribe() + waitForPoll(t, f.polled) + h.unsubscribe(sub) + time.Sleep(60 * time.Millisecond) // let the poller observe cancellation and exit + drain(f.polled) // clear the straggler + any buffered signals + assertNoPoll(t, f.polled, 100*time.Millisecond) +} + +func TestHubFansOutToAllSubscribers(t *testing.T) { + f := newFakeSource("emails") + h := newHub(f, discardLogger(), 20*time.Millisecond) + a := h.subscribe() + defer h.unsubscribe(a) + b := h.subscribe() + defer h.unsubscribe(b) + c := h.subscribe() + defer h.unsubscribe(c) + for i, s := range []*subscriber{a, b, c} { + select { + case buf := <-s.ch: + if !bytes.Contains(buf, []byte(`"queue":"emails"`)) { + t.Fatalf("sub %d: snapshot %q missing queue", i, buf) + } + case <-time.After(2 * time.Second): + t.Fatalf("sub %d: no snapshot received", i) + } + } +} + +func TestHubSlowConsumerDoesNotBlockPoller(t *testing.T) { + f := newFakeSource("emails") + h := newHub(f, discardLogger(), 10*time.Millisecond) + slow := h.subscribe() // never reads slow.ch + defer h.unsubscribe(slow) + // The poller keeps polling across several ticks even though slow never drains. + waitForPoll(t, f.polled) + drain(f.polled) + waitForPoll(t, f.polled) + waitForPoll(t, f.polled) + // Latest-wins: cap-1 channel holds exactly one (the newest) snapshot. + if got := len(slow.ch); got != 1 { + t.Fatalf("slow.ch len = %d, want 1 (latest-wins, cap 1)", got) + } +} + +func TestHubLateJoinerGetsCachedSnapshot(t *testing.T) { + f := newFakeSource("emails") + // Huge interval: if a late joiner had to wait for a tick it would time out; + // getting a snapshot promptly proves it came from the cache. + h := newHub(f, discardLogger(), 10*time.Second) + first := h.subscribe() + defer h.unsubscribe(first) + <-first.ch // the immediate first poll has now produced and cached a snapshot + late := h.subscribe() + defer h.unsubscribe(late) + select { + case buf := <-late.ch: + if !bytes.Contains(buf, []byte(`"queue":"emails"`)) { + t.Fatalf("late joiner got %q", buf) + } + case <-time.After(2 * time.Second): + t.Fatal("late joiner did not receive the cached snapshot") + } +} + +func TestHubSurvivesPollError(t *testing.T) { + f := newFakeSource("emails") + f.setErr(errors.New("redis down")) + h := newHub(f, discardLogger(), 20*time.Millisecond) + sub := h.subscribe() + defer h.unsubscribe(sub) + waitForPoll(t, f.polled) // errored poll still ran + drain(f.polled) + waitForPoll(t, f.polled) // poller survived the error and polled again + f.setErr(nil) // recover + select { + case <-sub.ch: + case <-time.After(2 * time.Second): + t.Fatal("no snapshot received after recovery") + } +} +``` + +- [ ] **Step 4: Run the tests, expect PASS** + +Run: `go test -race ./internal/api/ -run TestHub -v` +Expected: all six `TestHub*` tests PASS. (These are pure in-memory tests; they run even without Redis.) + +- [ ] **Step 5: Commit** + +```bash +git add internal/api/hub.go internal/api/hub_test.go +git commit -m "Add SSE fan-out hub with lazy single poller" +``` + +--- + +## Task 2: Wire the hub into the API and simplify the stream handler + +**Files:** +- Modify: `internal/api/api.go:19-41` (API struct + New) +- Modify: `internal/api/stream.go` (handler body; delete writeSnapshot) +- Test: `internal/api/api_test.go:275` (existing `TestStreamEmitsSnapshot`, real Redis — must stay green) + +- [ ] **Step 1: Add the hub field and construct it in `New`** + +In `internal/api/api.go`, change the `API` struct (currently lines 19-22) to add the field: + +```go +// API holds the dependencies shared by the handlers. +type API struct { + broker *broker.Broker + logger *slog.Logger + hub *hub +} +``` + +And in `New` (currently line 31), construct the hub after building `a`. The concrete `*broker.Broker` satisfies `snapshotSource`; `streamInterval` is defined in `stream.go` (same package): + +```go + a := &API{broker: b, logger: logger} + a.hub = newHub(b, logger, streamInterval) +``` + +Leave the rest of `New` (the mux and route registrations) unchanged. + +- [ ] **Step 2: Replace the stream handler and delete writeSnapshot** + +In `internal/api/stream.go`, keep the `streamInterval` const (line 12) and the `queueSnapshot` struct (lines 16-24). Replace the `stream` method and DELETE the entire `writeSnapshot` method. The file becomes: + +```go +package api + +import ( + "fmt" + "net/http" + "time" +) + +// streamInterval is how often the SSE hub polls Redis and pushes a fresh snapshot. +const streamInterval = time.Second + +// queueSnapshot is one queue's line in an SSE snapshot: point-in-time depths plus +// the cumulative counters the client diffs into throughput. +type queueSnapshot struct { + Queue string `json:"queue"` + Ready int64 `json:"ready"` + Inflight int64 `json:"inflight"` + Delayed int64 `json:"delayed"` + DLQ int64 `json:"dlq"` + ProcessedTotal int64 `json:"processed_total"` + DeadTotal int64 `json:"dead_total"` +} + +// stream handles GET /api/stream: a text/event-stream that subscribes to the +// shared hub and relays each broadcast snapshot to this client until it +// disconnects. All Redis polling happens once in the hub, not per connection. +func (a *API) stream(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + a.writeError(w, http.StatusInternalServerError, "streaming unsupported") + return + } + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + sub := a.hub.subscribe() + defer a.hub.unsubscribe(sub) + + ctx := r.Context() + for { + select { + case <-ctx.Done(): + return + case buf := <-sub.ch: + if _, err := fmt.Fprintf(w, "data: %s\n\n", buf); err != nil { + return // client disconnected + } + flusher.Flush() + } + } +} +``` + +Note the import block dropped `context` and `encoding/json` (now used only in `hub.go`) and kept `fmt`, `net/http`, `time`. + +- [ ] **Step 3: Verify the package builds and vets clean** + +Run: `go build ./... && go vet ./internal/api/` +Expected: success, no output. (Confirms no leftover references to `writeSnapshot` and no unused imports.) + +- [ ] **Step 4: Run the full api test suite (needs Redis on :6379)** + +Run: `go test -race ./internal/api/ -v` +Expected: all tests PASS, including `TestHub*` (Task 1) and the real-Redis `TestStreamEmitsSnapshot` — the latter still receives its first snapshot because the hub polls immediately on the first subscribe. +Note: if Redis is not running, the real-Redis tests SKIP (not fail) and the `TestHub*` tests still PASS. To exercise everything, ensure Redis is reachable (e.g., `docker run -p 6379:6379 redis:7`). + +- [ ] **Step 5: Commit** + +```bash +git add internal/api/api.go internal/api/stream.go +git commit -m "Route /api/stream through the fan-out hub" +``` + +--- + +## Task 3: Update CLAUDE.md limitation note + +**Files:** +- Modify: `CLAUDE.md` (the "SSE is per-connection" bullet under "Known limitations", currently line 119) + +- [ ] **Step 1: Rewrite the SSE limitation entry** + +In `CLAUDE.md`, replace this line: + +``` +- **SSE is per-connection.** Each open dashboard tab runs its own server-side ticker goroutine reading Redis every ~1 s. This is fine for a demo; a production deployment would fan-out from a single poller. +``` + +with: + +``` +- **SSE fan-out is per-process, single-poller.** While ≥1 dashboard is connected, one background goroutine per server process polls Redis every ~1 s, builds the snapshot, and broadcasts it to every connected dashboard (latest-wins per client, so a slow tab never blocks the poller; a late joiner is seeded from the last cached snapshot). Redis load is O(queues)/sec, independent of connection count; an idle server (no dashboards) does no polling. The poller is owned by the `hub` in `internal/api/hub.go` and is lazily started/stopped by subscriber count. Each server replica runs its own poller — there is no cross-replica fan-out (Redis Pub/Sub remains future work). +``` + +- [ ] **Step 2: Verify the edit** + +Run: `grep -n "SSE fan-out is per-process" CLAUDE.md` +Expected: one matching line. + +- [ ] **Step 3: Commit** + +```bash +git add CLAUDE.md +git commit -m "Document single-poller SSE fan-out in CLAUDE.md" +``` + +--- + +## Final verification (after all tasks) + +- [ ] `go build ./...` — succeeds. +- [ ] `go test -race ./...` — all packages pass (Redis-backed suites need Redis on :6379, else they skip). +- [ ] `golangci-lint run` — clean (note the project's errcheck convention: any `defer x.Close()` must be `defer func() { _ = x.Close() }()`; the hub adds no such defers, but keep it in mind for any incidental change). +- [ ] No `web/` change and no `web/dist` rebuild — confirm `git status` shows nothing under `web/` (wire format is unchanged by design). +- [ ] Grep confirms the old path is gone: `grep -rn "writeSnapshot" internal/` returns nothing. + +--- + +## Self-Review (against the spec) + +**Spec coverage:** +- In-process hub + single poller → Task 1 (`hub.go`). +- Lazy lifecycle (start on first subscribe, stop on last unsubscribe, zero idle polling) → Task 1 Step 1 (`subscribe`/`unsubscribe`) + tests `TestHubLazyStartsOnFirstSubscribe`, `TestHubStopsPollingAfterLastUnsubscribe`. +- `snapshotSource` interface for Redis-free unit tests → Task 1. +- Immediate first snapshot / late-joiner cache → `run`'s immediate poll + `subscribe` seeding; tests `TestHubLateJoinerGetsCachedSnapshot`, and integration `TestStreamEmitsSnapshot`. +- Latest-wins, poller never blocked → `send`; test `TestHubSlowConsumerDoesNotBlockPoller`. +- Error handling (skip tick, survive) → `pollAndBroadcast`; test `TestHubSurvivesPollError`. +- Simplified handler + delete `writeSnapshot` → Task 2. +- Wire-format unchanged, no web rebuild → Task 2 (same `data: %s\n\n` + `queueSnapshot`) + Final verification. +- Docs updated → Task 3. +- Benign restart overlap → documented in spec; implementation tolerates it (old goroutine exits on cancelled ctx without touching `h.cancel`). No dedicated test (timing-dependent, structurally safe). + +**Placeholder scan:** none — every code step shows full code; every run step shows the command and expected result. + +**Type consistency:** `snapshotSource` methods match `*broker.Broker`'s real signatures (`Queues(ctx) ([]string, error)`, `Stats(ctx, string) (broker.Stats, error)`, `Counters(ctx, string) (broker.Counters, error)`). `subscriber.ch` is `chan []byte` everywhere. `hub.last` is `[]byte`. `newHub(src, logger, interval)` call in Task 2 matches its Task 1 definition. `streamInterval`/`queueSnapshot` are defined once (in `stream.go`) and reused by `hub.go`. diff --git a/docs/superpowers/specs/2026-06-09-relay-sse-single-poller-fanout-design.md b/docs/superpowers/specs/2026-06-09-relay-sse-single-poller-fanout-design.md new file mode 100644 index 0000000..543931f --- /dev/null +++ b/docs/superpowers/specs/2026-06-09-relay-sse-single-poller-fanout-design.md @@ -0,0 +1,236 @@ +# Relay SSE Single-Poller Fan-Out — Design + +**Date:** 2026-06-09 +**Status:** Approved (brainstorming) +**Component:** `internal/api` (SSE stream) + +## Problem + +The dashboard stream at `GET /api/stream` (`internal/api/stream.go`) is **per-connection**. +Every open dashboard runs its own server-side goroutine with a 1 s ticker, and on each tick +independently calls `broker.Queues` (a Redis `SCAN`), then `broker.Stats` (≈4 round-trips) and +`broker.Counters` (2 `GET`s) **per queue**. Redis load therefore scales as +**O(connections × queues) per second**, and every connection recomputes the *identical* global +snapshot. With many dashboards open, Redis — single-threaded for command execution — saturates +long before Go's HTTP layer feels any strain. The `SCAN`-per-connection-per-second is the worst +offender. + +CLAUDE.md already documents this as an intentional demo-grade limitation: + +> "SSE is per-connection. Each open dashboard tab runs its own server-side ticker goroutine +> reading Redis every ~1 s. This is fine for a demo; a production deployment would fan-out from a +> single poller." + +This change implements that fan-out. + +## Goal + +Replace per-connection polling with a single in-process **hub**: one background goroutine polls +Redis once per interval, builds the snapshot, and broadcasts it to every connected subscriber. +Redis load becomes **O(queues) per second, independent of connection count**. + +## Non-Goals + +- **No cross-replica fan-out (Redis Pub/Sub).** An in-process hub per server replica already polls + only O(queues)/sec, which is cheap. Pub/Sub is deferred future work, not needed at this scale. +- **No wire-format change.** The emitted SSE event stays byte-identical, so the committed + `web/dist` client needs no change and no rebuild. +- **No new HTTP endpoints, flags, or dependencies.** + +## Architecture + +A new `hub` type in `internal/api/hub.go` owns all Redis polling and broadcast. It depends on a +small interface (not the concrete broker) so it can be unit-tested without Redis. + +```go +// snapshotSource is the slice of the broker the hub needs. +// Satisfied by *broker.Broker. +type snapshotSource interface { + Queues(ctx context.Context) ([]string, error) + Stats(ctx context.Context, queue string) (broker.Stats, error) + Counters(ctx context.Context, queue string) (broker.Counters, error) +} + +type subscriber struct { + ch chan []byte // buffered, cap 1; latest-wins +} + +type hub struct { + src snapshotSource + logger *slog.Logger + interval time.Duration // = streamInterval; injectable for tests + + mu sync.Mutex + subs map[*subscriber]struct{} + last []byte // most-recent marshalled snapshot, for instant populate + cancel context.CancelFunc // non-nil iff the poller goroutine is running +} +``` + +`API` (in `internal/api/api.go`) gains a `hub *hub` field, constructed in `New`. **`api.New`'s +signature is unchanged** (`New(b *broker.Broker, logger *slog.Logger) http.Handler`); it builds the +hub from `b` and the logger. + +### Lazy lifecycle (poll only while ≥1 subscriber) + +- **`newHub(src snapshotSource, logger *slog.Logger, interval time.Duration) *hub`** — constructs + an idle hub with an empty `subs` map. Does **not** start polling. +- **`subscribe() *subscriber`** (under `mu`): if `subs` is empty, start the poller — + `ctx, cancel := context.WithCancel(context.Background())`, store `cancel`, `go h.run(ctx)`. + Create the subscriber (buffered channel cap 1), register it. Capture `last` while still holding + the lock. After unlocking, if the captured `last != nil`, do a non-blocking send of it to the + subscriber's channel so a late joiner's UI populates instantly. Return the subscriber. +- **`unsubscribe(s *subscriber)`** (under `mu`): delete `s` from `subs`; if `subs` is now empty and + `cancel != nil`, call `cancel()` and set it to `nil`. The poller goroutine then returns on its + next `select`. Idle Redis load is zero. +- **`run(ctx context.Context)`**: poll **once immediately** (preserving today's "immediate first + snapshot"), then loop on a `time.Ticker(h.interval)`: + ``` + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + h.pollAndBroadcast(ctx) + } + } + ``` +- **`pollAndBroadcast(ctx)`**: build the snapshot (see Data flow); on success store it in `last` + (under `mu`) and `broadcast` it; on error, log and skip (keep `last`, keep polling). +- **`broadcast(buf []byte)`** (under `mu`): for each subscriber, non-blocking latest-wins send — + ``` + select { + case s.ch <- buf: + default: // channel full: drop the stale snapshot, enqueue the newest + select { case <-s.ch: default: } + select { case s.ch <- buf: default: } + } + ``` + The poller never blocks on a slow client. + +### Data flow (snapshot building — moved from `stream.go`) + +`pollAndBroadcast` reuses today's logic verbatim, just relocated into the hub: + +1. `queues, err := src.Queues(ctx)` — on error, log + skip the tick. +2. For each queue `q`: `src.Stats(ctx, q)` and `src.Counters(ctx, q)`; on a per-queue error, log + and `continue` (omit that queue from this snapshot). +3. Assemble `[]queueSnapshot` (`queue, ready, inflight, delayed, dlq, processed_total, dead_total`) + — the existing struct in `stream.go`, unchanged. +4. `json.Marshal`; on error, log + skip. +5. Store as `last`, then `broadcast`. + +### The simplified `stream` handler + +`stream.go` keeps the `streamInterval` constant and the `queueSnapshot` type, and reduces the +handler to: + +```go +func (a *API) stream(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + a.writeError(w, http.StatusInternalServerError, "streaming unsupported") + return + } + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + sub := a.hub.subscribe() + defer a.hub.unsubscribe(sub) + + ctx := r.Context() + for { + select { + case <-ctx.Done(): + return + case buf := <-sub.ch: + if _, err := fmt.Fprintf(w, "data: %s\n\n", buf); err != nil { + return // client disconnected + } + flusher.Flush() + } + } +} +``` + +## Concurrency & safety + +- `mu` guards `subs`, `last`, and `cancel`. `subscribe`, `unsubscribe`, `broadcast`, and the + `last` update in `pollAndBroadcast` all take it. +- The immediate-populate send in `subscribe` happens **after** unlocking (it targets the new + subscriber's own channel, which no one else holds yet), avoiding holding `mu` across a channel op + that interacts with broadcast. +- **Benign restart overlap:** a `subscribe` arriving in the instant after the last `unsubscribe` + cancels the poller starts a fresh poller while the old goroutine may not have returned yet. The + old goroutine's `ctx` is already cancelled, so its next `select` returns; it touches no shared + state on the way out (it does not modify `cancel`). At worst it performs one final harmless + broadcast of fresh data. This is documented and acceptable; no generation counter is needed. +- All tests run under `-race`. + +## Wire-format compatibility + +The event remains `data: \n\n`, all queues per snapshot, same field +names and types. The deployed `web/dist` dashboard is unaffected — **no frontend change, no dist +rebuild**. This is a pure server-side internal refactor. + +## Error handling + +| Condition | Behavior | +|---|---| +| `Queues` Redis error | Log, skip the tick, keep poller alive and `last` cached (matches current "skip this tick"). | +| Per-queue `Stats`/`Counters` error | Log, omit that queue from this snapshot, continue others. | +| `json.Marshal` error | Log, skip the tick. | +| Slow/non-reading subscriber | Latest-wins drop (silent — per-drop logging would be noisy); poller never blocks. | +| Client disconnect | `r.Context().Done()` or a write error returns from the handler; `defer` unsubscribes. | +| Server shutdown | `srv.Shutdown` closes connections → handlers return → unsubscribe → last unsubscribe stops the poller. No explicit hub `Close` needed. | + +## Testing + +### Unit — `internal/api/hub_test.go` (no Redis) + +A fake `snapshotSource` records call counts and signals on a channel each time `Queues` is called, +so tests synchronize deterministically (no `time.Sleep`, `-race`-clean). The fake returns a fixed +queue list and canned `Stats`/`Counters`, and can be switched to return an error. + +1. **Lazy start** — a freshly constructed hub performs no polls until the first `subscribe`; after + `subscribe`, at least one poll occurs. +2. **Lazy stop** — after the last `unsubscribe`, polling ceases (no further `Queues` calls within a + few intervals). +3. **Fan-out** — with N subscribers, a single poll cycle delivers the same snapshot to all N; the + source is called **once per cycle**, not N times. +4. **Latest-wins / slow consumer** — a subscriber that never reads does not block the poller; other + subscribers keep receiving; the slow subscriber's channel only ever holds the newest snapshot. +5. **Late joiner** — subscribing after `last` is populated delivers the cached snapshot + immediately (before the next tick). +6. **Redis-error tick** — when the fake returns an error, the poller logs, skips, and stays alive + (subsequent successful ticks resume broadcasting). + +Tests pass a small `interval` (e.g., a few ms) and a discard logger. + +### Integration — `internal/api` against real Redis (DB 12) + +Keep/adapt the existing `/api/stream` test: start the API over a real broker, connect, enqueue a +job, read one SSE snapshot, and assert the queue's fields (now served through the hub). Confirms +end-to-end behavior is unchanged from the client's perspective. + +## Files + +- **Create** `internal/api/hub.go` — `hub`, `subscriber`, `snapshotSource`, `newHub`, `subscribe`, + `unsubscribe`, `run`, `pollAndBroadcast`, `broadcast`. +- **Create** `internal/api/hub_test.go` — the six unit tests above with a fake source. +- **Modify** `internal/api/api.go` — add `hub *hub` field to `API`; construct it in `New`. +- **Modify** `internal/api/stream.go` — keep `streamInterval` + `queueSnapshot`; replace the + handler body with subscribe/relay/unsubscribe; remove `writeSnapshot` (logic moves to the hub). +- **Modify** existing `internal/api` stream integration test if needed to remain green. +- **Modify** `CLAUDE.md` — update the "SSE is per-connection" limitation entry to describe the + single-poller fan-out: one background poller while ≥1 dashboard is connected, broadcasting to all + subscribers; Redis load is now O(queues)/sec rather than O(connections × queues)/sec; still + per-process (each server replica runs its own poller). + +## Success criteria + +- Redis is polled at most once per `interval` per server process while ≥1 dashboard is connected, + regardless of how many are connected; zero polling when none are connected. +- The SSE wire format is unchanged; the existing dashboard works without modification. +- All new unit tests and the adapted integration test pass under `-race`; `golangci-lint` clean. diff --git a/internal/api/api.go b/internal/api/api.go index c2025d7..affa221 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -19,6 +19,7 @@ import ( type API struct { broker *broker.Broker logger *slog.Logger + hub *hub } // New returns an http.Handler serving the Relay REST API over the given broker. @@ -29,6 +30,7 @@ func New(b *broker.Broker, logger *slog.Logger) http.Handler { logger = slog.Default() } a := &API{broker: b, logger: logger} + a.hub = newHub(b, logger, streamInterval) mux := http.NewServeMux() mux.HandleFunc("POST /api/queues/{queue}/jobs", a.enqueue) mux.HandleFunc("POST /api/queues/{queue}/jobs/bulk", a.enqueueBulk) diff --git a/internal/api/hub.go b/internal/api/hub.go new file mode 100644 index 0000000..78f9bb2 --- /dev/null +++ b/internal/api/hub.go @@ -0,0 +1,190 @@ +package api + +import ( + "context" + "encoding/json" + "errors" + "log/slog" + "sync" + "time" + + "github.com/StrangeNoob/relay/internal/broker" +) + +// snapshotSource is the slice of the broker the hub needs to build an SSE +// snapshot. *broker.Broker satisfies it; tests inject a fake so the hub's +// fan-out and lifecycle logic run without a real Redis. +type snapshotSource interface { + Queues(ctx context.Context) ([]string, error) + Stats(ctx context.Context, queue string) (broker.Stats, error) + Counters(ctx context.Context, queue string) (broker.Counters, error) +} + +// subscriber is one connected SSE client. ch is buffered with capacity 1 and +// written latest-wins (see send): a slow client only ever holds the newest +// snapshot and never blocks the poller. +type subscriber struct { + ch chan []byte +} + +// hub fans out one Redis poll to every connected SSE subscriber. A single +// background goroutine polls once per interval while at least one subscriber is +// connected (lazy: starts on the first subscribe, stops on the last +// unsubscribe), so Redis load is O(queues)/sec regardless of connection count. +type hub struct { + src snapshotSource + logger *slog.Logger + interval time.Duration + + mu sync.Mutex + subs map[*subscriber]struct{} + last []byte // most-recent marshalled snapshot, for instant populate + cancel context.CancelFunc // non-nil iff a poller should be running (see run for the restart-overlap window) +} + +// newHub builds an idle hub. It does not poll until the first subscribe. +func newHub(src snapshotSource, logger *slog.Logger, interval time.Duration) *hub { + return &hub{ + src: src, + logger: logger, + interval: interval, + subs: make(map[*subscriber]struct{}), + } +} + +// subscribe registers a new SSE client, lazily starting the poller when the hub +// was idle. It seeds the new subscriber with the last snapshot (if any) so a +// late joiner's UI populates without waiting for the next tick. +func (h *hub) subscribe() *subscriber { + s := &subscriber{ch: make(chan []byte, 1)} + h.mu.Lock() + if len(h.subs) == 0 { + ctx, cancel := context.WithCancel(context.Background()) + h.cancel = cancel + go h.run(ctx) + } + h.subs[s] = struct{}{} + last := h.last + h.mu.Unlock() + + // Seed from cache without blocking. If the poller already delivered a fresher + // snapshot between the unlock and here, the channel is full and we skip — + // never replace fresh with stale, never block. + if last != nil { + select { + case s.ch <- last: + default: + } + } + return s +} + +// unsubscribe removes a client. When the last subscriber leaves, the poller is +// cancelled so an idle server does no Redis work. +func (h *hub) unsubscribe(s *subscriber) { + h.mu.Lock() + defer h.mu.Unlock() + delete(h.subs, s) + if len(h.subs) == 0 && h.cancel != nil { + h.cancel() + h.cancel = nil + } +} + +// run is the single poller goroutine: an immediate snapshot, then one per +// interval until ctx is cancelled (by the last unsubscribe or server shutdown). +// +// Restart-overlap safety: if a subscribe arrives just after the last +// unsubscribe cancelled this goroutine, subscribe starts a NEW run with its own +// context while this one may not have returned yet. That is safe — this +// goroutine's only post-cancel access to shared state is the broadcast block in +// pollAndBroadcast, which takes h.mu; subscribe holds h.mu while it installs the +// new cancel and starts the new run, so an old in-flight broadcast either ran +// before subscribe took the lock (harmless: it writes a fresh snapshot to the +// current subs) or blocks until subscribe unlocks, then completes one final +// broadcast to the current subs (including the new subscriber) before its next +// select returns on its own ctx.Done(). The old context is independent of the +// new one, so a stale poller can never cancel the new one — at worst it does one +// extra poll and broadcast, which the new poller's latest-wins delivery absorbs. +func (h *hub) run(ctx context.Context) { + h.pollAndBroadcast(ctx) + ticker := time.NewTicker(h.interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + h.pollAndBroadcast(ctx) + } + } +} + +// pollAndBroadcast reads every queue's depths and counters once, marshals the +// snapshot, caches it, and fans it out. A Redis or marshal error skips this +// tick (the poller and the cached snapshot survive). +func (h *hub) pollAndBroadcast(ctx context.Context) { + queues, err := h.src.Queues(ctx) + if err != nil { + // A cancelled context means the last subscriber just left and the poller + // is shutting down; that is expected, not an error worth logging. + if !errors.Is(err, context.Canceled) { + h.logger.Error("api: stream listing queues", "err", err) + } + return + } + snaps := make([]queueSnapshot, 0, len(queues)) + for _, q := range queues { + st, err := h.src.Stats(ctx, q) + if err != nil { + // A cancelled context means the poller is shutting down (last + // subscriber left): abandon the whole tick silently, like the Queues + // path above. A genuine per-queue error just omits that queue. + if errors.Is(err, context.Canceled) { + return + } + h.logger.Error("api: stream stats", "queue", q, "err", err) + continue + } + ct, err := h.src.Counters(ctx, q) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + h.logger.Error("api: stream counters", "queue", q, "err", err) + continue + } + snaps = append(snaps, queueSnapshot{ + Queue: q, Ready: st.Ready, Inflight: st.Inflight, Delayed: st.Delayed, + DLQ: st.DLQ, ProcessedTotal: ct.Processed, DeadTotal: ct.Dead, + }) + } + buf, err := json.Marshal(snaps) + if err != nil { + h.logger.Error("api: stream marshal", "err", err) + return + } + h.mu.Lock() + h.last = buf + for s := range h.subs { + send(s, buf) + } + h.mu.Unlock() +} + +// send pushes buf to s latest-wins: if the buffer already holds a stale +// snapshot, drop it and enqueue the newest. Never blocks the caller. +func send(s *subscriber, buf []byte) { + select { + case s.ch <- buf: + default: + select { + case <-s.ch: + default: + } + select { + case s.ch <- buf: + default: + } + } +} diff --git a/internal/api/hub_test.go b/internal/api/hub_test.go new file mode 100644 index 0000000..bbfa4b2 --- /dev/null +++ b/internal/api/hub_test.go @@ -0,0 +1,186 @@ +package api + +import ( + "bytes" + "context" + "errors" + "io" + "log/slog" + "sync" + "testing" + "time" + + "github.com/StrangeNoob/relay/internal/broker" +) + +func discardLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +// fakeSource is an in-memory snapshotSource. Every Queues call bumps a counter +// and emits a signal on polled (buffered, non-blocking) so tests can observe +// poll cadence without the source ever blocking the poller. +type fakeSource struct { + mu sync.Mutex + queues []string + failErr error + polled chan struct{} +} + +func newFakeSource(queues ...string) *fakeSource { + return &fakeSource{queues: queues, polled: make(chan struct{}, 1024)} +} + +func (f *fakeSource) setErr(err error) { + f.mu.Lock() + f.failErr = err + f.mu.Unlock() +} + +func (f *fakeSource) Queues(ctx context.Context) ([]string, error) { + f.mu.Lock() + err := f.failErr + qs := f.queues + f.mu.Unlock() + select { + case f.polled <- struct{}{}: + default: + } + if err != nil { + return nil, err + } + return qs, nil +} + +func (f *fakeSource) Stats(ctx context.Context, queue string) (broker.Stats, error) { + return broker.Stats{Ready: 1}, nil +} + +func (f *fakeSource) Counters(ctx context.Context, queue string) (broker.Counters, error) { + return broker.Counters{Processed: 7}, nil +} + +// waitForPoll blocks until the next poll signal or fails the test on timeout. +func waitForPoll(t *testing.T, polled <-chan struct{}) { + t.Helper() + select { + case <-polled: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for a poll") + } +} + +// assertNoPoll fails if any poll happens within d. +func assertNoPoll(t *testing.T, polled <-chan struct{}, d time.Duration) { + t.Helper() + select { + case <-polled: + t.Fatal("unexpected poll") + case <-time.After(d): + } +} + +// drain removes any buffered poll signals. +func drain(polled <-chan struct{}) { + for { + select { + case <-polled: + default: + return + } + } +} + +func TestHubLazyStartsOnFirstSubscribe(t *testing.T) { + f := newFakeSource("emails") + h := newHub(f, discardLogger(), 20*time.Millisecond) + assertNoPoll(t, f.polled, 80*time.Millisecond) // idle hub does not poll + sub := h.subscribe() + defer h.unsubscribe(sub) + waitForPoll(t, f.polled) // first subscribe starts the poller +} + +func TestHubStopsPollingAfterLastUnsubscribe(t *testing.T) { + f := newFakeSource("emails") + h := newHub(f, discardLogger(), 20*time.Millisecond) + sub := h.subscribe() + waitForPoll(t, f.polled) + h.unsubscribe(sub) + time.Sleep(60 * time.Millisecond) // let the poller observe cancellation and exit + drain(f.polled) // clear the straggler + any buffered signals + assertNoPoll(t, f.polled, 100*time.Millisecond) +} + +func TestHubFansOutToAllSubscribers(t *testing.T) { + f := newFakeSource("emails") + h := newHub(f, discardLogger(), 20*time.Millisecond) + a := h.subscribe() + defer h.unsubscribe(a) + b := h.subscribe() + defer h.unsubscribe(b) + c := h.subscribe() + defer h.unsubscribe(c) + for i, s := range []*subscriber{a, b, c} { + select { + case buf := <-s.ch: + if !bytes.Contains(buf, []byte(`"queue":"emails"`)) { + t.Fatalf("sub %d: snapshot %q missing queue", i, buf) + } + case <-time.After(2 * time.Second): + t.Fatalf("sub %d: no snapshot received", i) + } + } +} + +func TestHubSlowConsumerDoesNotBlockPoller(t *testing.T) { + f := newFakeSource("emails") + h := newHub(f, discardLogger(), 10*time.Millisecond) + slow := h.subscribe() // never reads slow.ch + defer h.unsubscribe(slow) + // The poller keeps polling across several ticks even though slow never drains. + waitForPoll(t, f.polled) + drain(f.polled) + waitForPoll(t, f.polled) + waitForPoll(t, f.polled) + // Latest-wins: cap-1 channel holds exactly one (the newest) snapshot. + if got := len(slow.ch); got != 1 { + t.Fatalf("slow.ch len = %d, want 1 (latest-wins, cap 1)", got) + } +} + +func TestHubLateJoinerGetsCachedSnapshot(t *testing.T) { + f := newFakeSource("emails") + // Huge interval: if a late joiner had to wait for a tick it would time out; + // getting a snapshot promptly proves it came from the cache. + h := newHub(f, discardLogger(), 10*time.Second) + first := h.subscribe() + defer h.unsubscribe(first) + <-first.ch // the immediate first poll has now produced and cached a snapshot + late := h.subscribe() + defer h.unsubscribe(late) + select { + case buf := <-late.ch: + if !bytes.Contains(buf, []byte(`"queue":"emails"`)) { + t.Fatalf("late joiner got %q", buf) + } + case <-time.After(2 * time.Second): + t.Fatal("late joiner did not receive the cached snapshot") + } +} + +func TestHubSurvivesPollError(t *testing.T) { + f := newFakeSource("emails") + f.setErr(errors.New("redis down")) + h := newHub(f, discardLogger(), 20*time.Millisecond) + sub := h.subscribe() + defer h.unsubscribe(sub) + waitForPoll(t, f.polled) // errored poll still ran + drain(f.polled) + waitForPoll(t, f.polled) // poller survived the error and polled again + f.setErr(nil) // recover + select { + case <-sub.ch: + case <-time.After(2 * time.Second): + t.Fatal("no snapshot received after recovery") + } +} diff --git a/internal/api/stream.go b/internal/api/stream.go index d6e19d3..6d9865f 100644 --- a/internal/api/stream.go +++ b/internal/api/stream.go @@ -1,18 +1,16 @@ package api import ( - "context" - "encoding/json" "fmt" "net/http" "time" ) -// streamInterval is how often the SSE stream pushes a fresh snapshot. +// streamInterval is how often the SSE hub polls Redis and pushes a fresh snapshot. const streamInterval = time.Second // queueSnapshot is one queue's line in an SSE snapshot: point-in-time depths plus -// the cumulative counters the client rate-computes into throughput. +// the cumulative counters the client diffs into throughput. type queueSnapshot struct { Queue string `json:"queue"` Ready int64 `json:"ready"` @@ -23,9 +21,9 @@ type queueSnapshot struct { DeadTotal int64 `json:"dead_total"` } -// stream handles GET /api/stream: a text/event-stream that pushes a snapshot of -// every queue immediately and then once per streamInterval until the client -// disconnects. A Redis hiccup skips a tick rather than tearing down the stream. +// stream handles GET /api/stream: a text/event-stream that subscribes to the +// shared hub and relays each broadcast snapshot to this client until it +// disconnects. All Redis polling happens once in the hub, not per connection. func (a *API) stream(w http.ResponseWriter, r *http.Request) { flusher, ok := w.(http.Flusher) if !ok { @@ -36,58 +34,19 @@ func (a *API) stream(w http.ResponseWriter, r *http.Request) { w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") + sub := a.hub.subscribe() + defer a.hub.unsubscribe(sub) + ctx := r.Context() - // Immediate first snapshot so the UI populates without waiting a tick. - if !a.writeSnapshot(ctx, w, flusher) { - return - } - ticker := time.NewTicker(streamInterval) - defer ticker.Stop() for { select { case <-ctx.Done(): return - case <-ticker.C: - if !a.writeSnapshot(ctx, w, flusher) { - return + case buf := <-sub.ch: + if _, err := fmt.Fprintf(w, "data: %s\n\n", buf); err != nil { + return // client disconnected } + flusher.Flush() } } } - -// writeSnapshot composes and writes one SSE event. It returns false when the -// client connection is gone (write failed), signalling the caller to stop. -func (a *API) writeSnapshot(ctx context.Context, w http.ResponseWriter, flusher http.Flusher) bool { - queues, err := a.broker.Queues(ctx) - if err != nil { - a.logger.Error("api: stream listing queues", "err", err) - return true // skip this tick, keep the stream open - } - snaps := make([]queueSnapshot, 0, len(queues)) - for _, q := range queues { - st, err := a.broker.Stats(ctx, q) - if err != nil { - a.logger.Error("api: stream stats", "queue", q, "err", err) - continue - } - ct, err := a.broker.Counters(ctx, q) - if err != nil { - a.logger.Error("api: stream counters", "queue", q, "err", err) - continue - } - snaps = append(snaps, queueSnapshot{ - Queue: q, Ready: st.Ready, Inflight: st.Inflight, Delayed: st.Delayed, - DLQ: st.DLQ, ProcessedTotal: ct.Processed, DeadTotal: ct.Dead, - }) - } - buf, err := json.Marshal(snaps) - if err != nil { - a.logger.Error("api: stream marshal", "err", err) - return true - } - if _, err := fmt.Fprintf(w, "data: %s\n\n", buf); err != nil { - return false // client disconnected - } - flusher.Flush() - return true -}