From f6bd6ce5806fd3af5882f517f3c7a9c34f667c15 Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Tue, 28 Apr 2026 07:54:27 +0000 Subject: [PATCH] fix(realtime): cap WebSocket clients + plug writer-goroutine leaks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes two production-readiness gaps: 1. **Max-clients cap** (codex round 2 finding B): Hub had no admission limit, so 10k connecting clients would exhaust file descriptors and per-client send-channel memory before any backpressure kicked in. New WS_MAX_CLIENTS env var (0 = unlimited, default) and atomic reservation in HandleWebSocket — over-cap connects return HTTP 503. 2. **Writer-goroutine leak on idle disconnect / Stop**: Pre-existing bug exposed by the cap test. The writer goroutine blocks on `for msg := range c.send` and never wakes up when: - The client disconnects without traffic (no conn.Write to fail on) - Hub.Stop() is called with idle clients (stopCh return doesn't close any send channels, so writerWg.Wait() hangs forever) Fixed by force-closing c.send (CAS-guarded) at two sites: - HandleWebSocket after reader loop exits — wakes writer on idle-disconnect so the admission slot releases promptly - Hub.Run stopCh handler — wakes all writers on Stop so writerWg.Wait() returns Tests cover: cap rejection with 503, slot release after disconnect, unlimited path when cap unset, negative cap coerced to 0, race-detector clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/config/config.go | 9 ++ internal/realtime/hub.go | 71 ++++++++++++ internal/realtime/hub_maxclients_test.go | 140 +++++++++++++++++++++++ main.go | 1 + 4 files changed, 221 insertions(+) create mode 100644 internal/realtime/hub_maxclients_test.go diff --git a/internal/config/config.go b/internal/config/config.go index 02696d0..8d443e1 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -181,6 +181,12 @@ type Config struct { // being used outside dev/test. Without it, a production Env + SQLite // combination refuses to start. AllowSqliteProd bool + + // WSMaxClients caps simultaneous WebSocket connections to /ws* + // endpoints. 0 = unlimited (default). When set, new connections past + // the cap receive HTTP 503. Sized for the operator's expected dashboard + // audience — small for ops dashboards, larger for read-heavy public UIs. + WSMaxClients int } func Load(customPath string) (*Config, error) { @@ -285,6 +291,9 @@ func Load(customPath string) (*Config, error) { // OTel self-instrumentation OTelExporterEndpoint: getEnv("OTEL_EXPORTER_OTLP_ENDPOINT", ""), + // WebSocket admission cap + WSMaxClients: getEnvInt("WS_MAX_CLIENTS", 0), + // Multi-tenancy DefaultTenant: getEnv("DEFAULT_TENANT", "default"), OTLPTrustResourceTenant: parseTruthy(getEnv("OTLP_TRUST_RESOURCE_TENANT", "")), diff --git a/internal/realtime/hub.go b/internal/realtime/hub.go index 79266b9..8275cf7 100644 --- a/internal/realtime/hub.go +++ b/internal/realtime/hub.go @@ -59,6 +59,13 @@ type Hub struct { maxBufferSize int flushInterval time.Duration + // maxClients caps simultaneous WebSocket connections. 0 = unlimited + // (legacy). When set, HandleWebSocket rejects new connects past the cap + // with HTTP 503 instead of admitting unbounded clients that would + // exhaust file descriptors and per-client send-channel memory. + maxClients int + clientCount atomic.Int64 + stopCh chan struct{} stopped atomic.Bool wg sync.WaitGroup @@ -122,6 +129,16 @@ func (h *Hub) Run() { select { case <-h.stopCh: h.flush() + // Close every client's send channel so the writer goroutines + // (blocked on `for msg := range c.send`) wake up and exit. + // Without this, writerWg.Wait() in Stop() hangs whenever any + // connected client is idle. CAS guard mirrors the unregister + // handler so concurrent close paths can't double-close. + for c := range h.clients { + if c.closed.CompareAndSwap(false, true) { + close(c.send) + } + } return case c := <-h.register: @@ -243,6 +260,20 @@ func (h *Hub) SetDevMode(devMode bool) { h.devMode = devMode } +// SetMaxClients caps simultaneous WebSocket connections. 0 disables the cap +// (default). Configure once at startup before HandleWebSocket starts taking +// traffic — the cap is read concurrently from each upgrade attempt. +func (h *Hub) SetMaxClients(n int) { + if n < 0 { + n = 0 + } + h.maxClients = n +} + +// ActiveClients reports the count of currently-connected WebSocket clients. +// Updated atomically as connections are accepted and torn down. +func (h *Hub) ActiveClients() int64 { return h.clientCount.Load() } + // SetWSMetrics wires WebSocket metric callbacks. func (h *Hub) SetWSMetrics(onMessageSent func(string), onSlowClientDrop func()) { h.onMessageSent = onMessageSent @@ -278,10 +309,38 @@ func (h *Hub) Stop() { // HandleWebSocket is the HTTP handler that upgrades connections to WebSocket. func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request) { + // Cap admission BEFORE the WebSocket upgrade so a flood of new clients + // can't exhaust file descriptors / per-client send-channel memory. + // CompareAndSwap-ish reservation: increment optimistically, roll back + // if we exceeded the cap. Race-free because clientCount is atomic and + // every cleanup path decrements it. + if h.maxClients > 0 { + if n := h.clientCount.Add(1); n > int64(h.maxClients) { + h.clientCount.Add(-1) + slog.Warn("WebSocket connection rejected: max-clients cap reached", + "max_clients", h.maxClients, + "current", n-1, + "remote", r.RemoteAddr, + ) + http.Error(w, "WebSocket connections at capacity, retry later", http.StatusServiceUnavailable) + return + } + } else { + h.clientCount.Add(1) + } + clientCounted := true + releaseSlot := func() { + if clientCounted { + clientCounted = false + h.clientCount.Add(-1) + } + } + conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{ InsecureSkipVerify: h.devMode, // Allow cross-origin in dev mode only }) if err != nil { + releaseSlot() slog.Error("WebSocket upgrade failed", "error", err) return } @@ -297,6 +356,10 @@ func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request) { h.writerWg.Add(1) go func() { // #nosec G118 -- long-lived WS writer goroutine outlives HTTP request intentionally defer h.writerWg.Done() + // Release the admission slot when the writer exits — the writer + // outlives the HandleWebSocket reader loop, so this is the last + // goroutine alive for this client. + defer releaseSlot() defer func() { if !h.stopped.Load() { h.unregister <- c @@ -326,4 +389,12 @@ func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request) { break } } + // Force the writer goroutine to exit once the conn is dead, otherwise + // it stays blocked on `for msg := range c.send` until the next broadcast + // happens to be selected for this client — which leaks the admission + // slot and the goroutine indefinitely under low traffic. CAS guard + // mirrors every other close site. + if c.closed.CompareAndSwap(false, true) { + close(c.send) + } } diff --git a/internal/realtime/hub_maxclients_test.go b/internal/realtime/hub_maxclients_test.go new file mode 100644 index 0000000..73a18ed --- /dev/null +++ b/internal/realtime/hub_maxclients_test.go @@ -0,0 +1,140 @@ +package realtime + +import ( + "context" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/coder/websocket" +) + +// TestHub_MaxClientsCap verifies HandleWebSocket rejects new connections +// once the cap is reached and the cap returns to enforcing again as old +// connections drop. Without the cap, a flood of connects exhausts file +// descriptors and per-client send-channel memory. +func TestHub_MaxClientsCap(t *testing.T) { + hub := NewHub(nil) + hub.SetMaxClients(2) + go hub.Run() + defer hub.Stop() + + srv := httptest.NewServer(http.HandlerFunc(hub.HandleWebSocket)) + defer srv.Close() + + wsURL := "ws" + srv.URL[len("http"):] + + dial := func(t *testing.T) (*websocket.Conn, *http.Response, error) { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + return websocket.Dial(ctx, wsURL, nil) + } + + // First two connections should succeed. + c1, _, err := dial(t) + if err != nil { + t.Fatalf("client 1 dial: %v", err) + } + c2, _, err := dial(t) + if err != nil { + t.Fatalf("client 2 dial: %v", err) + } + + // Wait for the hub goroutine to register the connections — ActiveClients + // is incremented in HandleWebSocket BEFORE the upgrade, so the count is + // already accurate by the time dial returns. + if got := hub.ActiveClients(); got != 2 { + t.Fatalf("ActiveClients after 2 connects: got %d, want 2", got) + } + + // Third connection MUST be rejected with 503. + _, resp, err := dial(t) + if err == nil { + t.Fatalf("client 3 dial: expected error from 503 rejection, got success") + } + if resp == nil { + t.Fatalf("client 3: expected non-nil response on rejection, got nil — err=%v", err) + } + if resp.StatusCode != http.StatusServiceUnavailable { + t.Fatalf("client 3: got status %d, want %d", resp.StatusCode, http.StatusServiceUnavailable) + } + resp.Body.Close() + + // ActiveClients must NOT have leaked into the count for the rejected one. + if got := hub.ActiveClients(); got != 2 { + t.Fatalf("ActiveClients after rejection: got %d, want 2 (slot must be released on reject)", got) + } + + // Drop client 1, wait for the writer goroutine to release the slot, then + // verify a new connect succeeds. + c1.Close(websocket.StatusNormalClosure, "test") + deadline := time.Now().Add(2 * time.Second) + for hub.ActiveClients() > 1 && time.Now().Before(deadline) { + time.Sleep(10 * time.Millisecond) + } + if got := hub.ActiveClients(); got != 1 { + t.Fatalf("ActiveClients after client 1 drop: got %d, want 1", got) + } + + c3, _, err := dial(t) + if err != nil { + t.Fatalf("client 3 (retry after drop): %v", err) + } + + c2.Close(websocket.StatusNormalClosure, "test") + c3.Close(websocket.StatusNormalClosure, "test") +} + +// TestHub_MaxClientsZeroIsUnlimited verifies the legacy unlimited path +// still works when no cap is configured. +func TestHub_MaxClientsZeroIsUnlimited(t *testing.T) { + hub := NewHub(nil) + // SetMaxClients NOT called → maxClients=0 → unlimited + go hub.Run() + defer hub.Stop() + + srv := httptest.NewServer(http.HandlerFunc(hub.HandleWebSocket)) + defer srv.Close() + wsURL := "ws" + srv.URL[len("http"):] + + const N = 10 + conns := make([]*websocket.Conn, 0, N) + var mu sync.Mutex + var wg sync.WaitGroup + for range N { + wg.Add(1) + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + c, _, err := websocket.Dial(ctx, wsURL, nil) + if err != nil { + t.Errorf("dial: %v", err) + return + } + mu.Lock() + conns = append(conns, c) + mu.Unlock() + }() + } + wg.Wait() + + if got := hub.ActiveClients(); got != int64(N) { + t.Fatalf("ActiveClients: got %d, want %d", got, N) + } + + for _, c := range conns { + c.Close(websocket.StatusNormalClosure, "test") + } +} + +func TestHub_SetMaxClients_NegativeCoercesToZero(t *testing.T) { + hub := NewHub(nil) + hub.SetMaxClients(-5) + if hub.maxClients != 0 { + t.Fatalf("negative cap should coerce to 0 (unlimited), got %d", hub.maxClients) + } +} diff --git a/main.go b/main.go index b14180e..045e0f9 100644 --- a/main.go +++ b/main.go @@ -294,6 +294,7 @@ func main() { metrics.SetActiveConnections(count) }) hub.SetDevMode(cfg.DevMode) + hub.SetMaxClients(cfg.WSMaxClients) hub.SetWSMetrics( func(msgType string) { metrics.WSMessagesSent.WithLabelValues(msgType).Inc() }, func() { metrics.WSSlowClientsRemoved.Inc() },