Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ type Config struct {
// being used outside dev/test. Without it, a production Env + SQLite
// combination refuses to start.
AllowSqliteProd bool

// WSMaxClients caps simultaneous WebSocket connections to /ws*
// endpoints. 0 = unlimited (default). When set, new connections past
// the cap receive HTTP 503. Sized for the operator's expected dashboard
// audience — small for ops dashboards, larger for read-heavy public UIs.
WSMaxClients int
}

func Load(customPath string) (*Config, error) {
Expand Down Expand Up @@ -285,6 +291,9 @@ func Load(customPath string) (*Config, error) {
// OTel self-instrumentation
OTelExporterEndpoint: getEnv("OTEL_EXPORTER_OTLP_ENDPOINT", ""),

// WebSocket admission cap
WSMaxClients: getEnvInt("WS_MAX_CLIENTS", 0),

// Multi-tenancy
DefaultTenant: getEnv("DEFAULT_TENANT", "default"),
OTLPTrustResourceTenant: parseTruthy(getEnv("OTLP_TRUST_RESOURCE_TENANT", "")),
Expand Down
71 changes: 71 additions & 0 deletions internal/realtime/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@
maxBufferSize int
flushInterval time.Duration

// maxClients caps simultaneous WebSocket connections. 0 = unlimited
// (legacy). When set, HandleWebSocket rejects new connects past the cap
// with HTTP 503 instead of admitting unbounded clients that would
// exhaust file descriptors and per-client send-channel memory.
maxClients int
clientCount atomic.Int64

stopCh chan struct{}
stopped atomic.Bool
wg sync.WaitGroup
Expand Down Expand Up @@ -111,7 +118,7 @@
}

// Run starts the hub's main event loop. Should be called in a goroutine.
func (h *Hub) Run() {

Check failure on line 121 in internal/realtime/hub.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 20 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=RandomCodeSpace_otelcontext&issues=AZ3TFgLyCtVUJ16NrP_J&open=AZ3TFgLyCtVUJ16NrP_J&pullRequest=67
h.wg.Add(1)
defer h.wg.Done()

Expand All @@ -122,6 +129,16 @@
select {
case <-h.stopCh:
h.flush()
// Close every client's send channel so the writer goroutines
// (blocked on `for msg := range c.send`) wake up and exit.
// Without this, writerWg.Wait() in Stop() hangs whenever any
// connected client is idle. CAS guard mirrors the unregister
// handler so concurrent close paths can't double-close.
for c := range h.clients {
if c.closed.CompareAndSwap(false, true) {
close(c.send)
}
}
return

case c := <-h.register:
Expand Down Expand Up @@ -243,6 +260,20 @@
h.devMode = devMode
}

// SetMaxClients caps simultaneous WebSocket connections. 0 disables the cap
// (default). Configure once at startup before HandleWebSocket starts taking
// traffic — the cap is read concurrently from each upgrade attempt.
func (h *Hub) SetMaxClients(n int) {
if n < 0 {
n = 0
}
h.maxClients = n
}

// ActiveClients reports the count of currently-connected WebSocket clients.
// Updated atomically as connections are accepted and torn down.
func (h *Hub) ActiveClients() int64 { return h.clientCount.Load() }

// SetWSMetrics wires WebSocket metric callbacks.
func (h *Hub) SetWSMetrics(onMessageSent func(string), onSlowClientDrop func()) {
h.onMessageSent = onMessageSent
Expand Down Expand Up @@ -277,11 +308,39 @@
}

// HandleWebSocket is the HTTP handler that upgrades connections to WebSocket.
func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request) {

Check failure on line 311 in internal/realtime/hub.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 20 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=RandomCodeSpace_otelcontext&issues=AZ3TFgLyCtVUJ16NrP_K&open=AZ3TFgLyCtVUJ16NrP_K&pullRequest=67
// Cap admission BEFORE the WebSocket upgrade so a flood of new clients
// can't exhaust file descriptors / per-client send-channel memory.
// CompareAndSwap-ish reservation: increment optimistically, roll back
// if we exceeded the cap. Race-free because clientCount is atomic and
// every cleanup path decrements it.
if h.maxClients > 0 {
if n := h.clientCount.Add(1); n > int64(h.maxClients) {
h.clientCount.Add(-1)
slog.Warn("WebSocket connection rejected: max-clients cap reached",
"max_clients", h.maxClients,
"current", n-1,
"remote", r.RemoteAddr,
)
http.Error(w, "WebSocket connections at capacity, retry later", http.StatusServiceUnavailable)
return
}
} else {
h.clientCount.Add(1)
}
clientCounted := true
releaseSlot := func() {
if clientCounted {
clientCounted = false
h.clientCount.Add(-1)
}
}

conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
InsecureSkipVerify: h.devMode, // Allow cross-origin in dev mode only
})
if err != nil {
releaseSlot()
slog.Error("WebSocket upgrade failed", "error", err)
return
}
Expand All @@ -297,6 +356,10 @@
h.writerWg.Add(1)
go func() { // #nosec G118 -- long-lived WS writer goroutine outlives HTTP request intentionally
defer h.writerWg.Done()
// Release the admission slot when the writer exits — the writer
// outlives the HandleWebSocket reader loop, so this is the last
// goroutine alive for this client.
defer releaseSlot()
defer func() {
if !h.stopped.Load() {
h.unregister <- c
Expand Down Expand Up @@ -326,4 +389,12 @@
break
}
}
// Force the writer goroutine to exit once the conn is dead, otherwise
// it stays blocked on `for msg := range c.send` until the next broadcast
// happens to be selected for this client — which leaks the admission
// slot and the goroutine indefinitely under low traffic. CAS guard
// mirrors every other close site.
if c.closed.CompareAndSwap(false, true) {
close(c.send)
}
}
140 changes: 140 additions & 0 deletions internal/realtime/hub_maxclients_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package realtime

import (
"context"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/coder/websocket"
)

// TestHub_MaxClientsCap verifies HandleWebSocket rejects new connections
// once the cap is reached and the cap returns to enforcing again as old
// connections drop. Without the cap, a flood of connects exhausts file
// descriptors and per-client send-channel memory.
func TestHub_MaxClientsCap(t *testing.T) {
hub := NewHub(nil)
hub.SetMaxClients(2)
go hub.Run()
defer hub.Stop()

srv := httptest.NewServer(http.HandlerFunc(hub.HandleWebSocket))
defer srv.Close()

wsURL := "ws" + srv.URL[len("http"):]

dial := func(t *testing.T) (*websocket.Conn, *http.Response, error) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
return websocket.Dial(ctx, wsURL, nil)
}

// First two connections should succeed.
c1, _, err := dial(t)
if err != nil {
t.Fatalf("client 1 dial: %v", err)
}
c2, _, err := dial(t)
if err != nil {
t.Fatalf("client 2 dial: %v", err)
}

// Wait for the hub goroutine to register the connections — ActiveClients
// is incremented in HandleWebSocket BEFORE the upgrade, so the count is
// already accurate by the time dial returns.
if got := hub.ActiveClients(); got != 2 {
t.Fatalf("ActiveClients after 2 connects: got %d, want 2", got)
}

// Third connection MUST be rejected with 503.
_, resp, err := dial(t)
if err == nil {
t.Fatalf("client 3 dial: expected error from 503 rejection, got success")
}
if resp == nil {
t.Fatalf("client 3: expected non-nil response on rejection, got nil — err=%v", err)
}
if resp.StatusCode != http.StatusServiceUnavailable {
t.Fatalf("client 3: got status %d, want %d", resp.StatusCode, http.StatusServiceUnavailable)
}
resp.Body.Close()

// ActiveClients must NOT have leaked into the count for the rejected one.
if got := hub.ActiveClients(); got != 2 {
t.Fatalf("ActiveClients after rejection: got %d, want 2 (slot must be released on reject)", got)
}

// Drop client 1, wait for the writer goroutine to release the slot, then
// verify a new connect succeeds.
c1.Close(websocket.StatusNormalClosure, "test")
deadline := time.Now().Add(2 * time.Second)
for hub.ActiveClients() > 1 && time.Now().Before(deadline) {
time.Sleep(10 * time.Millisecond)
}
if got := hub.ActiveClients(); got != 1 {
t.Fatalf("ActiveClients after client 1 drop: got %d, want 1", got)
}

c3, _, err := dial(t)
if err != nil {
t.Fatalf("client 3 (retry after drop): %v", err)
}

c2.Close(websocket.StatusNormalClosure, "test")
c3.Close(websocket.StatusNormalClosure, "test")
}

// TestHub_MaxClientsZeroIsUnlimited verifies the legacy unlimited path
// still works when no cap is configured.
func TestHub_MaxClientsZeroIsUnlimited(t *testing.T) {
hub := NewHub(nil)
// SetMaxClients NOT called → maxClients=0 → unlimited
go hub.Run()
defer hub.Stop()

srv := httptest.NewServer(http.HandlerFunc(hub.HandleWebSocket))
defer srv.Close()
wsURL := "ws" + srv.URL[len("http"):]

const N = 10
conns := make([]*websocket.Conn, 0, N)
var mu sync.Mutex
var wg sync.WaitGroup
for range N {
wg.Add(1)
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
c, _, err := websocket.Dial(ctx, wsURL, nil)
if err != nil {
t.Errorf("dial: %v", err)
return
}
mu.Lock()
conns = append(conns, c)
mu.Unlock()
}()
}
wg.Wait()

if got := hub.ActiveClients(); got != int64(N) {
t.Fatalf("ActiveClients: got %d, want %d", got, N)
}

for _, c := range conns {
c.Close(websocket.StatusNormalClosure, "test")
}
}

func TestHub_SetMaxClients_NegativeCoercesToZero(t *testing.T) {
hub := NewHub(nil)
hub.SetMaxClients(-5)
if hub.maxClients != 0 {
t.Fatalf("negative cap should coerce to 0 (unlimited), got %d", hub.maxClients)
}
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func main() {
metrics.SetActiveConnections(count)
})
hub.SetDevMode(cfg.DevMode)
hub.SetMaxClients(cfg.WSMaxClients)
hub.SetWSMetrics(
func(msgType string) { metrics.WSMessagesSent.WithLabelValues(msgType).Inc() },
func() { metrics.WSSlowClientsRemoved.Inc() },
Expand Down
Loading