Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions e2e/default_tier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ func Test_DefaultTier_GracefulShutdown(t *testing.T) {
output := runner.Output()

// Shutdown
assert.Contains(t, output, "signal signal=terminated")
assert.Contains(t, output, "Received signal terminated, shutting down services")
assert.Contains(t, output, "phase=stopping")
assert.Contains(t, output, "All services stopped")
assert.Contains(t, output, "phase=stopped")
Comment thread
tab marked this conversation as resolved.
}

func Test_DefaultTier_LogsCommand(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions fuku.override.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ server:
listen: "localhost:1234"
auth:
token: "secret-token"
streaming:
connections: 10
buffer: 1000

services:
auth:
Expand Down
47 changes: 47 additions & 0 deletions internal/app/api/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package api

import "sync/atomic"

// ConnectionTracker tracks concurrent streaming connections against a limit
type ConnectionTracker struct {
count atomic.Int32
max int32
}

// NewConnectionTracker creates a new connection tracker with the given limit
func NewConnectionTracker(limit int) *ConnectionTracker {
return &ConnectionTracker{max: int32(limit)} //nolint:gosec // config-validated positive int, no overflow risk
}

// Acquire attempts to claim a connection slot, returns false if at limit
func (t *ConnectionTracker) Acquire() bool {
for {
current := t.count.Load()
if current >= t.max {
return false
}

if t.count.CompareAndSwap(current, current+1) {
return true
}
}
}

// Release frees a connection slot
func (t *ConnectionTracker) Release() {
for {
current := t.count.Load()
if current <= 0 {
return
}

if t.count.CompareAndSwap(current, current-1) {
return
}
}
}

// Count returns the current number of active connections
func (t *ConnectionTracker) Count() int {
return int(t.count.Load())
}
119 changes: 119 additions & 0 deletions internal/app/api/connection_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package api

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_ConnectionTracker_Acquire(t *testing.T) {
tests := []struct {
name string
max int
acquire int
want bool
}{
{
name: "within limit",
max: 3,
acquire: 1,
want: true,
},
{
name: "at limit",
max: 2,
acquire: 3,
want: false,
},
{
name: "single slot",
max: 1,
acquire: 1,
want: true,
},
{
name: "single slot exceeded",
max: 1,
acquire: 2,
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tracker := NewConnectionTracker(tt.max)

var ok bool
for range tt.acquire {
ok = tracker.Acquire()
}

assert.Equal(t, tt.want, ok)
})
}
}

func Test_ConnectionTracker_Release(t *testing.T) {
tracker := NewConnectionTracker(1)

require.True(t, tracker.Acquire())
require.False(t, tracker.Acquire())

tracker.Release()

assert.True(t, tracker.Acquire())
}

func Test_ConnectionTracker_DoubleRelease(t *testing.T) {
tracker := NewConnectionTracker(1)

require.True(t, tracker.Acquire())
tracker.Release()
tracker.Release()

assert.Equal(t, 0, tracker.Count())

require.True(t, tracker.Acquire())
assert.False(t, tracker.Acquire())
}

func Test_ConnectionTracker_ReleaseWithoutAcquire(t *testing.T) {
tracker := NewConnectionTracker(1)

tracker.Release()

assert.Equal(t, 0, tracker.Count())
}

func Test_ConnectionTracker_Count(t *testing.T) {
tracker := NewConnectionTracker(5)

assert.Equal(t, 0, tracker.Count())

tracker.Acquire()
tracker.Acquire()
assert.Equal(t, 2, tracker.Count())

tracker.Release()
assert.Equal(t, 1, tracker.Count())
}

func Test_ConnectionTracker_Concurrent(t *testing.T) {
tracker := NewConnectionTracker(10)

var wg sync.WaitGroup

for range 20 {
wg.Go(func() {
if tracker.Acquire() {
defer tracker.Release()
}
})
}

wg.Wait()

assert.Equal(t, 0, tracker.Count())
}
13 changes: 12 additions & 1 deletion internal/app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@ func Test_Register_OnStop_CancelsContextAndUnblocksApp(t *testing.T) {
})

mockSentry := sentry.NewMockSentry(ctrl)
mockSentry.EXPECT().Flush()

flushed := make(chan struct{})

mockSentry.EXPECT().Flush().Do(func() {
close(flushed)
})

root := NewRoot()
app := NewApp(mockTUI, mockSentry, &noopShutdowner{})
Expand Down Expand Up @@ -170,6 +175,12 @@ func Test_Register_OnStop_CancelsContextAndUnblocksApp(t *testing.T) {
case <-time.After(time.Second):
t.Fatal("OnStop did not return after cancelling root context")
}

select {
case <-flushed:
case <-time.After(time.Second):
t.Fatal("Flush was not called")
}
}

func Test_Register_OnStop_RespectsTimeout(t *testing.T) {
Expand Down
13 changes: 13 additions & 0 deletions internal/app/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
EventWatchStarted MessageType = "watch_started"
EventWatchStopped MessageType = "watch_stopped"
EventResourceSample MessageType = "resource_sample"
EventServiceMetrics MessageType = "service_metrics"
EventAPIStarted MessageType = "api_started"
EventAPIStopped MessageType = "api_stopped"
EventAPIRequest MessageType = "api_request"
Expand Down Expand Up @@ -204,6 +205,18 @@ type ResourceSample struct {
MEM float64
}

// ServiceMetricsBatch contains per-service CPU and memory readings from a sample cycle
type ServiceMetricsBatch struct {
Samples []ServiceMetrics
}

// ServiceMetrics contains a single service's CPU and memory readings
type ServiceMetrics struct {
Service Service
CPU float64
Memory uint64
}

// APIStarted indicates the API server has started listening
type APIStarted struct {
Listen string
Expand Down
2 changes: 2 additions & 0 deletions internal/app/bus/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func (f *Formatter) Format(msgType MessageType, data any) string {
e.Str("id", d.Service.ID).Str("service", d.Service.Name).Strs("files", d.ChangedFiles)
case ResourceSample:
e.Str("cpu", fmt.Sprintf("%.1f%%", d.CPU)).Str("mem", fmt.Sprintf("%.1fMB", d.MEM))
case ServiceMetricsBatch:
e.Int("services", len(d.Samples))
case APIStarted:
e.Str("listen", d.Listen)
case APIStopped:
Expand Down
9 changes: 9 additions & 0 deletions internal/app/bus/formatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ func Test_FormatEvent(t *testing.T) {
data: ResourceSample{CPU: 2.5, MEM: 64.0},
contains: []string{"resource_sample", "cpu=2.5%", "mem=64.0MB"},
},
{
name: "ServiceMetricsBatch",
msgType: EventServiceMetrics,
data: ServiceMetricsBatch{Samples: []ServiceMetrics{
{Service: Service{ID: "test-id-api", Name: "api"}, CPU: 3.2, Memory: 67108864},
{Service: Service{ID: "test-id-web", Name: "web"}, CPU: 1.5, Memory: 33554432},
}},
contains: []string{"service_metrics", "services=2"},
},
{
name: "APIRequest",
msgType: EventAPIRequest,
Expand Down
3 changes: 3 additions & 0 deletions internal/app/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ var (
ErrAPINotRestartable = errors.New("service cannot be restarted")
ErrAPINotAccepting = errors.New("instance is not accepting actions")

ErrStreamingInvalidConnections = errors.New("streaming connections must be greater than 0")
ErrStreamingInvalidBuffer = errors.New("streaming buffer must be greater than 0")
Comment thread
tab marked this conversation as resolved.

ErrInvalidCommand = errors.New("command must not be whitespace-only when provided")
ErrWatchIncludeRequired = errors.New("watch configuration requires include field")
ErrInvalidLogsOutput = errors.New("invalid service logs output value (must be 'stdout' or 'stderr')")
Expand Down
31 changes: 26 additions & 5 deletions internal/app/registry/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,12 +534,33 @@ func (s *store) sampleStats(ctx context.Context) {
}

s.mu.Lock()
defer s.mu.Unlock()

for id, st := range stats {
if svc, exists := s.services[id]; exists && svc.pid == pids[id] {
svc.cpu = st.CPU
svc.memory = st.RawMEM
samples := make([]bus.ServiceMetrics, 0, len(stats))

for id, stat := range stats {
service, exists := s.services[id]
if !exists || service.pid != pids[id] {
continue
}

service.cpu = stat.CPU
service.memory = stat.RawMEM

if service.status.IsRunning() {
samples = append(samples, bus.ServiceMetrics{
Service: bus.Service{ID: service.id, Name: service.name},
CPU: stat.CPU,
Memory: stat.RawMEM,
})
}
}

s.mu.Unlock()

if len(samples) > 0 {
s.bus.Publish(bus.Message{
Type: bus.EventServiceMetrics,
Data: bus.ServiceMetricsBatch{Samples: samples},
})
}
Comment on lines +560 to +565
Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description focuses on streaming config/defaults, but this change also introduces a new bus event publication path for per-service metrics (and related filtering/tests elsewhere). Please update the PR description to cover these behavioral changes or split them into a separate PR to keep review scope clear.

Copilot uses AI. Check for mistakes.
}
4 changes: 4 additions & 0 deletions internal/app/relay/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (br *Bridge) forward(ctx context.Context, ch <-chan bus.Message) {
return
}

if msg.Type == bus.EventServiceMetrics {
continue
}

text := br.formatter.Format(msg.Type, msg.Data)
br.broadcaster.Broadcast(config.AppName, text)
}
Expand Down
39 changes: 39 additions & 0 deletions internal/app/relay/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,45 @@ func Test_Bridge_ForwardsMultipleEvents(t *testing.T) {
assert.Contains(t, messages[1].message, "service_ready")
}

func Test_Bridge_FiltersServiceMetrics(t *testing.T) {
cfg := config.DefaultConfig()
cfg.Logs.Buffer = 10

log := logger.NewLoggerWithOutput(cfg, io.Discard)
formatter := bus.NewFormatter(logger.NewEventLogger())
broadcaster := newCaptureBroadcaster()

b := bus.NewBus(cfg, formatter, log)
defer b.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bridge := NewBridge(b, broadcaster, formatter)
bridge.Start(ctx)

b.Publish(bus.Message{
Type: bus.EventServiceMetrics,
Data: bus.ServiceMetricsBatch{Samples: []bus.ServiceMetrics{
{Service: bus.Service{ID: "test-id-api", Name: "api"}, CPU: 2.5, Memory: 67108864},
}},
})

b.Publish(bus.Message{
Type: bus.EventServiceReady,
Data: bus.ServiceReady{
ServiceEvent: bus.ServiceEvent{Service: bus.Service{ID: "test-id-api", Name: "api"}, Tier: "platform"},
},
})

messages := broadcaster.waitForMessages(1)

cancel()

require.Len(t, messages, 1)
assert.Contains(t, messages[0].message, "service_ready")
}

func Test_Bridge_StopsCleanly(t *testing.T) {
cfg := config.DefaultConfig()
cfg.Logs.Buffer = 10
Expand Down
Loading
Loading