From 2ccd6bd45b147d4aa7ab18e982b30f3cb3cfa6d2 Mon Sep 17 00:00:00 2001 From: tab Date: Sun, 12 Apr 2026 19:58:24 +0300 Subject: [PATCH 1/4] feat(config): Add streaming configuration for SSE - Add streaming configuration and validation - Use default values when absent --- fuku.override.yaml | 3 + internal/app/errors/errors.go | 3 + internal/config/config.go | 25 ++++++++ internal/config/config_test.go | 58 +++++++++++++++++ internal/config/constants.go | 7 +++ internal/config/validate.go | 25 ++++++++ internal/config/validate_test.go | 104 ++++++++++++++++++++++++++++++- 7 files changed, 223 insertions(+), 2 deletions(-) diff --git a/fuku.override.yaml b/fuku.override.yaml index ee62623..1260f06 100644 --- a/fuku.override.yaml +++ b/fuku.override.yaml @@ -26,6 +26,9 @@ server: listen: "localhost:1234" auth: token: "secret-token" + streaming: + connections: 10 + buffer: 1000 services: auth: diff --git a/internal/app/errors/errors.go b/internal/app/errors/errors.go index 90c9e21..b027a98 100644 --- a/internal/app/errors/errors.go +++ b/internal/app/errors/errors.go @@ -41,6 +41,9 @@ var ( ErrAPINotRestartable = errors.New("service cannot be restarted") ErrAPINotAccepting = errors.New("instance is not accepting actions") + ErrStreamingInvalidConnections = errors.New("streaming connections must be greater than 0") + ErrStreamingInvalidBuffer = errors.New("streaming buffer must be greater than 0") + ErrInvalidCommand = errors.New("command must not be whitespace-only when provided") ErrWatchIncludeRequired = errors.New("watch configuration requires include field") ErrInvalidLogsOutput = errors.New("invalid service logs output value (must be 'stdout' or 'stderr')") diff --git a/internal/config/config.go b/internal/config/config.go index 6b2cbad..250f2ab 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -55,6 +55,24 @@ func (c *Config) ServerToken() string { return c.Server.Auth.Token } +// ServerStreamingConnections returns the max concurrent streaming connections +func (c *Config) ServerStreamingConnections() int { + if c.Server.Streaming.Connections == nil { + return StreamingConnections + } + + return *c.Server.Streaming.Connections +} + +// ServerStreamingBuffer returns the event stream ring buffer size +func (c *Config) ServerStreamingBuffer() int { + if c.Server.Streaming.Buffer == nil { + return StreamingBuffer + } + + return *c.Server.Streaming.Buffer +} + // ApplyDefaults applies default configuration to services func (c *Config) ApplyDefaults() { for name, service := range c.Services { @@ -172,4 +190,11 @@ type Server struct { Auth struct { Token string `yaml:"token"` } `yaml:"auth"` + Streaming Streaming `yaml:"streaming"` +} + +// Streaming represents streaming endpoint configuration (SSE + WebSocket) +type Streaming struct { + Connections *int `yaml:"connections"` + Buffer *int `yaml:"buffer"` } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index ab3db6e..9167707 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -210,6 +210,64 @@ func Test_ServerToken(t *testing.T) { } } +func intPtr(v int) *int { return &v } + +func Test_ServerStreamingConnections(t *testing.T) { + tests := []struct { + name string + max *int + want int + }{ + { + name: "configured value", + max: intPtr(20), + want: 20, + }, + { + name: "nil returns default", + max: nil, + want: StreamingConnections, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := DefaultConfig() + cfg.Server.Streaming.Connections = tt.max + + assert.Equal(t, tt.want, cfg.ServerStreamingConnections()) + }) + } +} + +func Test_ServerStreamingBuffer(t *testing.T) { + tests := []struct { + name string + buffer *int + want int + }{ + { + name: "configured value", + buffer: intPtr(2000), + want: 2000, + }, + { + name: "nil returns default", + buffer: nil, + want: StreamingBuffer, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := DefaultConfig() + cfg.Server.Streaming.Buffer = tt.buffer + + assert.Equal(t, tt.want, cfg.ServerStreamingBuffer()) + }) + } +} + func Test_NormalizeTier(t *testing.T) { tests := []struct { name string diff --git a/internal/config/constants.go b/internal/config/constants.go index 2f6ca21..ad3ad50 100644 --- a/internal/config/constants.go +++ b/internal/config/constants.go @@ -83,6 +83,13 @@ const ( StoreSampleTimeout = 200 * time.Millisecond ) +// Streaming settings (SSE + WebSocket) +const ( + StreamingConnections = 10 + StreamingBuffer = 1000 + StreamingKeepalive = 15 * time.Second +) + // Loopback hostnames (not available as stdlib constants) const ( LoopbackHostname = "localhost" diff --git a/internal/config/validate.go b/internal/config/validate.go index 2a4f15c..6452be8 100644 --- a/internal/config/validate.go +++ b/internal/config/validate.go @@ -107,6 +107,31 @@ func (c *Config) validateServer() error { return errors.ErrAPINotLoopback } + return c.validateStreaming() +} + +// validateStreaming validates and applies defaults for streaming configuration +func (c *Config) validateStreaming() error { + s := &c.Server.Streaming + + if s.Connections == nil { + v := StreamingConnections + s.Connections = &v + } + + if s.Buffer == nil { + v := StreamingBuffer + s.Buffer = &v + } + + if *s.Connections <= 0 { + return errors.ErrStreamingInvalidConnections + } + + if *s.Buffer <= 0 { + return errors.ErrStreamingInvalidBuffer + } + return nil } diff --git a/internal/config/validate_test.go b/internal/config/validate_test.go index 76543a6..cf9e2ee 100644 --- a/internal/config/validate_test.go +++ b/internal/config/validate_test.go @@ -10,6 +10,7 @@ import ( ) const testToken = "test-token" +const testListen = "127.0.0.1:9876" func Test_Validate(t *testing.T) { tests := []struct { @@ -245,7 +246,7 @@ func Test_Validate(t *testing.T) { name: "valid server configuration", config: func() *Config { cfg := DefaultConfig() - cfg.Server.Listen = "127.0.0.1:9876" + cfg.Server.Listen = testListen cfg.Server.Auth.Token = testToken return cfg @@ -283,7 +284,7 @@ func Test_Validate(t *testing.T) { name: "server with listen but no token", config: func() *Config { cfg := DefaultConfig() - cfg.Server.Listen = "127.0.0.1:9876" + cfg.Server.Listen = testListen return cfg }(), @@ -350,6 +351,92 @@ func Test_Validate(t *testing.T) { expectError: true, errorMsg: "api listen must be a valid host:port address", }, + { + name: "streaming defaults applied when section absent", + config: func() *Config { + cfg := DefaultConfig() + cfg.Server.Listen = testListen + cfg.Server.Auth.Token = testToken + + return cfg + }(), + expectError: false, + }, + { + name: "streaming with valid custom values", + config: func() *Config { + cfg := DefaultConfig() + cfg.Server.Listen = testListen + cfg.Server.Auth.Token = testToken + cfg.Server.Streaming.Connections = intPtr(20) + cfg.Server.Streaming.Buffer = intPtr(5000) + + return cfg + }(), + expectError: false, + }, + { + name: "streaming with explicit zero connections", + config: func() *Config { + cfg := DefaultConfig() + cfg.Server.Listen = testListen + cfg.Server.Auth.Token = testToken + cfg.Server.Streaming.Connections = intPtr(0) + + return cfg + }(), + expectError: true, + errorMsg: "streaming connections must be greater than 0", + }, + { + name: "streaming with explicit zero buffer", + config: func() *Config { + cfg := DefaultConfig() + cfg.Server.Listen = testListen + cfg.Server.Auth.Token = testToken + cfg.Server.Streaming.Buffer = intPtr(0) + + return cfg + }(), + expectError: true, + errorMsg: "streaming buffer must be greater than 0", + }, + { + name: "streaming with negative connections", + config: func() *Config { + cfg := DefaultConfig() + cfg.Server.Listen = testListen + cfg.Server.Auth.Token = testToken + cfg.Server.Streaming.Connections = intPtr(-1) + + return cfg + }(), + expectError: true, + errorMsg: "streaming connections must be greater than 0", + }, + { + name: "streaming with negative buffer", + config: func() *Config { + cfg := DefaultConfig() + cfg.Server.Listen = testListen + cfg.Server.Auth.Token = testToken + cfg.Server.Streaming.Buffer = intPtr(-1) + + return cfg + }(), + expectError: true, + errorMsg: "streaming buffer must be greater than 0", + }, + { + name: "streaming ignored when server disabled", + config: func() *Config { + cfg := DefaultConfig() + cfg.Server.Streaming.Connections = intPtr(-1) + + return cfg + }(), + expectError: false, + }, } for _, tt := range tests { @@ -366,6 +453,19 @@ func Test_Validate(t *testing.T) { } } +func Test_ValidateStreaming_DefaultsApplied(t *testing.T) { + cfg := DefaultConfig() + cfg.Server.Listen = testListen + cfg.Server.Auth.Token = testToken + + require.NoError(t, cfg.Validate()) + + require.NotNil(t, cfg.Server.Streaming.Connections) + require.NotNil(t, cfg.Server.Streaming.Buffer) + assert.Equal(t, StreamingConnections, *cfg.Server.Streaming.Connections) + assert.Equal(t, StreamingBuffer, *cfg.Server.Streaming.Buffer) +} + func Test_ValidateCommand(t *testing.T) { tests := []struct { name string From 3030bef0bf1250311733f4732f6fa23a8955702f Mon Sep 17 00:00:00 2001 From: tab Date: Sun, 12 Apr 2026 20:10:41 +0300 Subject: [PATCH 2/4] fix(e2e): Update shutdown output assertions --- e2e/default_tier_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/e2e/default_tier_test.go b/e2e/default_tier_test.go index 5975a0f..677da91 100644 --- a/e2e/default_tier_test.go +++ b/e2e/default_tier_test.go @@ -59,8 +59,9 @@ func Test_DefaultTier_GracefulShutdown(t *testing.T) { output := runner.Output() // Shutdown - assert.Contains(t, output, "signal signal=terminated") - assert.Contains(t, output, "Received signal terminated, shutting down services") + assert.Contains(t, output, "phase=stopping") + assert.Contains(t, output, "All services stopped") + assert.Contains(t, output, "phase=stopped") } func Test_DefaultTier_LogsCommand(t *testing.T) { From c4dbce43e17acad44fea67d99c2be77e4a4d1a7a Mon Sep 17 00:00:00 2001 From: tab Date: Sun, 12 Apr 2026 20:46:57 +0300 Subject: [PATCH 3/4] feat(bus): Add per-service metrics event with batched publishing --- internal/app/app_test.go | 13 +++++++++- internal/app/bus/bus.go | 13 ++++++++++ internal/app/bus/formatter.go | 2 ++ internal/app/bus/formatter_test.go | 9 +++++++ internal/app/registry/store.go | 31 ++++++++++++++++++++---- internal/app/relay/bridge.go | 4 +++ internal/app/relay/bridge_test.go | 39 ++++++++++++++++++++++++++++++ internal/config/constants.go | 2 +- 8 files changed, 106 insertions(+), 7 deletions(-) diff --git a/internal/app/app_test.go b/internal/app/app_test.go index d19bfbd..1496be8 100644 --- a/internal/app/app_test.go +++ b/internal/app/app_test.go @@ -140,7 +140,12 @@ func Test_Register_OnStop_CancelsContextAndUnblocksApp(t *testing.T) { }) mockSentry := sentry.NewMockSentry(ctrl) - mockSentry.EXPECT().Flush() + + flushed := make(chan struct{}) + + mockSentry.EXPECT().Flush().Do(func() { + close(flushed) + }) root := NewRoot() app := NewApp(mockTUI, mockSentry, &noopShutdowner{}) @@ -170,6 +175,12 @@ func Test_Register_OnStop_CancelsContextAndUnblocksApp(t *testing.T) { case <-time.After(time.Second): t.Fatal("OnStop did not return after cancelling root context") } + + select { + case <-flushed: + case <-time.After(time.Second): + t.Fatal("Flush was not called") + } } func Test_Register_OnStop_RespectsTimeout(t *testing.T) { diff --git a/internal/app/bus/bus.go b/internal/app/bus/bus.go index 76a58d4..4a04996 100644 --- a/internal/app/bus/bus.go +++ b/internal/app/bus/bus.go @@ -34,6 +34,7 @@ const ( EventWatchStarted MessageType = "watch_started" EventWatchStopped MessageType = "watch_stopped" EventResourceSample MessageType = "resource_sample" + EventServiceMetrics MessageType = "service_metrics" EventAPIStarted MessageType = "api_started" EventAPIStopped MessageType = "api_stopped" EventAPIRequest MessageType = "api_request" @@ -204,6 +205,18 @@ type ResourceSample struct { MEM float64 } +// ServiceMetricsBatch contains per-service CPU and memory readings from a sample cycle +type ServiceMetricsBatch struct { + Samples []ServiceMetrics +} + +// ServiceMetrics contains a single service's CPU and memory readings +type ServiceMetrics struct { + Service Service + CPU float64 + Memory uint64 +} + // APIStarted indicates the API server has started listening type APIStarted struct { Listen string diff --git a/internal/app/bus/formatter.go b/internal/app/bus/formatter.go index 34fdf68..756c547 100644 --- a/internal/app/bus/formatter.go +++ b/internal/app/bus/formatter.go @@ -69,6 +69,8 @@ func (f *Formatter) Format(msgType MessageType, data any) string { e.Str("id", d.Service.ID).Str("service", d.Service.Name).Strs("files", d.ChangedFiles) case ResourceSample: e.Str("cpu", fmt.Sprintf("%.1f%%", d.CPU)).Str("mem", fmt.Sprintf("%.1fMB", d.MEM)) + case ServiceMetricsBatch: + e.Int("services", len(d.Samples)) case APIStarted: e.Str("listen", d.Listen) case APIStopped: diff --git a/internal/app/bus/formatter_test.go b/internal/app/bus/formatter_test.go index 1cd3674..0c917cd 100644 --- a/internal/app/bus/formatter_test.go +++ b/internal/app/bus/formatter_test.go @@ -131,6 +131,15 @@ func Test_FormatEvent(t *testing.T) { data: ResourceSample{CPU: 2.5, MEM: 64.0}, contains: []string{"resource_sample", "cpu=2.5%", "mem=64.0MB"}, }, + { + name: "ServiceMetricsBatch", + msgType: EventServiceMetrics, + data: ServiceMetricsBatch{Samples: []ServiceMetrics{ + {Service: Service{ID: "test-id-api", Name: "api"}, CPU: 3.2, Memory: 67108864}, + {Service: Service{ID: "test-id-web", Name: "web"}, CPU: 1.5, Memory: 33554432}, + }}, + contains: []string{"service_metrics", "services=2"}, + }, { name: "APIRequest", msgType: EventAPIRequest, diff --git a/internal/app/registry/store.go b/internal/app/registry/store.go index 90385e0..3fcd7f9 100644 --- a/internal/app/registry/store.go +++ b/internal/app/registry/store.go @@ -534,12 +534,33 @@ func (s *store) sampleStats(ctx context.Context) { } s.mu.Lock() - defer s.mu.Unlock() - for id, st := range stats { - if svc, exists := s.services[id]; exists && svc.pid == pids[id] { - svc.cpu = st.CPU - svc.memory = st.RawMEM + samples := make([]bus.ServiceMetrics, 0, len(stats)) + + for id, stat := range stats { + service, exists := s.services[id] + if !exists || service.pid != pids[id] { + continue + } + + service.cpu = stat.CPU + service.memory = stat.RawMEM + + if service.status.IsRunning() { + samples = append(samples, bus.ServiceMetrics{ + Service: bus.Service{ID: service.id, Name: service.name}, + CPU: stat.CPU, + Memory: stat.RawMEM, + }) } } + + s.mu.Unlock() + + if len(samples) > 0 { + s.bus.Publish(bus.Message{ + Type: bus.EventServiceMetrics, + Data: bus.ServiceMetricsBatch{Samples: samples}, + }) + } } diff --git a/internal/app/relay/bridge.go b/internal/app/relay/bridge.go index d961d16..880b598 100644 --- a/internal/app/relay/bridge.go +++ b/internal/app/relay/bridge.go @@ -41,6 +41,10 @@ func (br *Bridge) forward(ctx context.Context, ch <-chan bus.Message) { return } + if msg.Type == bus.EventServiceMetrics { + continue + } + text := br.formatter.Format(msg.Type, msg.Data) br.broadcaster.Broadcast(config.AppName, text) } diff --git a/internal/app/relay/bridge_test.go b/internal/app/relay/bridge_test.go index 40be6db..6ed8c72 100644 --- a/internal/app/relay/bridge_test.go +++ b/internal/app/relay/bridge_test.go @@ -155,6 +155,45 @@ func Test_Bridge_ForwardsMultipleEvents(t *testing.T) { assert.Contains(t, messages[1].message, "service_ready") } +func Test_Bridge_FiltersServiceMetrics(t *testing.T) { + cfg := config.DefaultConfig() + cfg.Logs.Buffer = 10 + + log := logger.NewLoggerWithOutput(cfg, io.Discard) + formatter := bus.NewFormatter(logger.NewEventLogger()) + broadcaster := newCaptureBroadcaster() + + b := bus.NewBus(cfg, formatter, log) + defer b.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + bridge := NewBridge(b, broadcaster, formatter) + bridge.Start(ctx) + + b.Publish(bus.Message{ + Type: bus.EventServiceMetrics, + Data: bus.ServiceMetricsBatch{Samples: []bus.ServiceMetrics{ + {Service: bus.Service{ID: "test-id-api", Name: "api"}, CPU: 2.5, Memory: 67108864}, + }}, + }) + + b.Publish(bus.Message{ + Type: bus.EventServiceReady, + Data: bus.ServiceReady{ + ServiceEvent: bus.ServiceEvent{Service: bus.Service{ID: "test-id-api", Name: "api"}, Tier: "platform"}, + }, + }) + + messages := broadcaster.waitForMessages(1) + + cancel() + + require.Len(t, messages, 1) + assert.Contains(t, messages[0].message, "service_ready") +} + func Test_Bridge_StopsCleanly(t *testing.T) { cfg := config.DefaultConfig() cfg.Logs.Buffer = 10 diff --git a/internal/config/constants.go b/internal/config/constants.go index ad3ad50..deb0fb6 100644 --- a/internal/config/constants.go +++ b/internal/config/constants.go @@ -87,7 +87,7 @@ const ( const ( StreamingConnections = 10 StreamingBuffer = 1000 - StreamingKeepalive = 15 * time.Second + StreamingKeepalive = 15 * time.Second ) // Loopback hostnames (not available as stdlib constants) From 87d9b44806bdb0d4daeb1a6086dc7c06be3c6843 Mon Sep 17 00:00:00 2001 From: tab Date: Sun, 12 Apr 2026 22:09:15 +0300 Subject: [PATCH 4/4] feat(api): Add streaming connection tracker with bounded acquire/release --- internal/app/api/connection.go | 47 +++++++++++ internal/app/api/connection_test.go | 119 ++++++++++++++++++++++++++++ internal/config/validate.go | 5 +- internal/config/validate_test.go | 27 +++++++ 4 files changed, 196 insertions(+), 2 deletions(-) create mode 100644 internal/app/api/connection.go create mode 100644 internal/app/api/connection_test.go diff --git a/internal/app/api/connection.go b/internal/app/api/connection.go new file mode 100644 index 0000000..1495237 --- /dev/null +++ b/internal/app/api/connection.go @@ -0,0 +1,47 @@ +package api + +import "sync/atomic" + +// ConnectionTracker tracks concurrent streaming connections against a limit +type ConnectionTracker struct { + count atomic.Int32 + max int32 +} + +// NewConnectionTracker creates a new connection tracker with the given limit +func NewConnectionTracker(limit int) *ConnectionTracker { + return &ConnectionTracker{max: int32(limit)} //nolint:gosec // config-validated positive int, no overflow risk +} + +// Acquire attempts to claim a connection slot, returns false if at limit +func (t *ConnectionTracker) Acquire() bool { + for { + current := t.count.Load() + if current >= t.max { + return false + } + + if t.count.CompareAndSwap(current, current+1) { + return true + } + } +} + +// Release frees a connection slot +func (t *ConnectionTracker) Release() { + for { + current := t.count.Load() + if current <= 0 { + return + } + + if t.count.CompareAndSwap(current, current-1) { + return + } + } +} + +// Count returns the current number of active connections +func (t *ConnectionTracker) Count() int { + return int(t.count.Load()) +} diff --git a/internal/app/api/connection_test.go b/internal/app/api/connection_test.go new file mode 100644 index 0000000..e5c3212 --- /dev/null +++ b/internal/app/api/connection_test.go @@ -0,0 +1,119 @@ +package api + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_ConnectionTracker_Acquire(t *testing.T) { + tests := []struct { + name string + max int + acquire int + want bool + }{ + { + name: "within limit", + max: 3, + acquire: 1, + want: true, + }, + { + name: "at limit", + max: 2, + acquire: 3, + want: false, + }, + { + name: "single slot", + max: 1, + acquire: 1, + want: true, + }, + { + name: "single slot exceeded", + max: 1, + acquire: 2, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tracker := NewConnectionTracker(tt.max) + + var ok bool + for range tt.acquire { + ok = tracker.Acquire() + } + + assert.Equal(t, tt.want, ok) + }) + } +} + +func Test_ConnectionTracker_Release(t *testing.T) { + tracker := NewConnectionTracker(1) + + require.True(t, tracker.Acquire()) + require.False(t, tracker.Acquire()) + + tracker.Release() + + assert.True(t, tracker.Acquire()) +} + +func Test_ConnectionTracker_DoubleRelease(t *testing.T) { + tracker := NewConnectionTracker(1) + + require.True(t, tracker.Acquire()) + tracker.Release() + tracker.Release() + + assert.Equal(t, 0, tracker.Count()) + + require.True(t, tracker.Acquire()) + assert.False(t, tracker.Acquire()) +} + +func Test_ConnectionTracker_ReleaseWithoutAcquire(t *testing.T) { + tracker := NewConnectionTracker(1) + + tracker.Release() + + assert.Equal(t, 0, tracker.Count()) +} + +func Test_ConnectionTracker_Count(t *testing.T) { + tracker := NewConnectionTracker(5) + + assert.Equal(t, 0, tracker.Count()) + + tracker.Acquire() + tracker.Acquire() + assert.Equal(t, 2, tracker.Count()) + + tracker.Release() + assert.Equal(t, 1, tracker.Count()) +} + +func Test_ConnectionTracker_Concurrent(t *testing.T) { + tracker := NewConnectionTracker(10) + + var wg sync.WaitGroup + + for range 20 { + wg.Go(func() { + if tracker.Acquire() { + defer tracker.Release() + } + }) + } + + wg.Wait() + + assert.Equal(t, 0, tracker.Count()) +} diff --git a/internal/config/validate.go b/internal/config/validate.go index 6452be8..75ee061 100644 --- a/internal/config/validate.go +++ b/internal/config/validate.go @@ -2,6 +2,7 @@ package config import ( "fmt" + "math" "net" "strconv" "strings" @@ -124,11 +125,11 @@ func (c *Config) validateStreaming() error { s.Buffer = &v } - if *s.Connections <= 0 { + if *s.Connections <= 0 || *s.Connections > math.MaxInt32 { return errors.ErrStreamingInvalidConnections } - if *s.Buffer <= 0 { + if *s.Buffer <= 0 || *s.Buffer > math.MaxInt32 { return errors.ErrStreamingInvalidBuffer } diff --git a/internal/config/validate_test.go b/internal/config/validate_test.go index cf9e2ee..c7556fc 100644 --- a/internal/config/validate_test.go +++ b/internal/config/validate_test.go @@ -1,6 +1,7 @@ package config import ( + "math" "testing" "github.com/stretchr/testify/assert" @@ -427,6 +428,32 @@ func Test_Validate(t *testing.T) { expectError: true, errorMsg: "streaming buffer must be greater than 0", }, + { + name: "streaming with connections exceeding int32", + config: func() *Config { + cfg := DefaultConfig() + cfg.Server.Listen = testListen + cfg.Server.Auth.Token = testToken + cfg.Server.Streaming.Connections = intPtr(math.MaxInt32 + 1) + + return cfg + }(), + expectError: true, + errorMsg: "streaming connections must be greater than 0", + }, + { + name: "streaming with buffer exceeding int32", + config: func() *Config { + cfg := DefaultConfig() + cfg.Server.Listen = testListen + cfg.Server.Auth.Token = testToken + cfg.Server.Streaming.Buffer = intPtr(math.MaxInt32 + 1) + + return cfg + }(), + expectError: true, + errorMsg: "streaming buffer must be greater than 0", + }, { name: "streaming ignored when server disabled", config: func() *Config {