diff --git a/backend/backend.go b/backend/backend.go index cc055a7..eb13e96 100644 --- a/backend/backend.go +++ b/backend/backend.go @@ -9,7 +9,7 @@ import ( type Backend interface { Started() bool Version() string - Logs() <-chan string + SubscribeLogs(context.Context) <-chan string Restart() error Shutdown() SyncUser(context.Context, *common.User) error diff --git a/backend/logstream/buffer.go b/backend/logstream/buffer.go new file mode 100644 index 0000000..0dc0070 --- /dev/null +++ b/backend/logstream/buffer.go @@ -0,0 +1,135 @@ +package logstream + +import ( + "context" + "sync" +) + +type Buffer struct { + mu sync.Mutex + lines []string + next int + count int + subscribers map[chan string]struct{} + done chan struct{} + closed bool +} + +func NewBuffer(size int) *Buffer { + if size <= 0 { + size = 1 + } + + return &Buffer{ + lines: make([]string, size), + subscribers: make(map[chan string]struct{}), + done: make(chan struct{}), + } +} + +func (b *Buffer) Publish(line string) { + b.mu.Lock() + defer b.mu.Unlock() + + if b.closed { + return + } + + b.lines[b.next] = line + b.next = (b.next + 1) % len(b.lines) + if b.count < len(b.lines) { + b.count++ + } + + for subscriber := range b.subscribers { + select { + case subscriber <- line: + default: + } + } +} + +func (b *Buffer) Subscribe(ctx context.Context) <-chan string { + if ctx == nil { + ctx = context.Background() + } + + ch := make(chan string, b.capacity()) + + b.mu.Lock() + if b.closed { + close(ch) + b.mu.Unlock() + return ch + } + + for _, line := range b.snapshotLocked() { + ch <- line + } + b.subscribers[ch] = struct{}{} + b.mu.Unlock() + + done := b.done + go func() { + select { + case <-ctx.Done(): + b.unsubscribe(ch) + case <-done: + } + }() + + return ch +} + +func (b *Buffer) Close() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.closed { + return + } + + b.closed = true + close(b.done) + for subscriber := range b.subscribers { + close(subscriber) + delete(b.subscribers, subscriber) + } +} + +func (b *Buffer) capacity() int { + if b == nil || len(b.lines) == 0 { + return 1 + } + return len(b.lines) +} + +func (b *Buffer) snapshotLocked() []string { + snapshot := make([]string, 0, b.count) + if b.count == 0 { + return snapshot + } + + start := 0 + if b.count == len(b.lines) { + start = b.next + } + + for i := 0; i < b.count; i++ { + snapshot = append(snapshot, b.lines[(start+i)%len(b.lines)]) + } + + return snapshot +} + +func (b *Buffer) unsubscribe(ch chan string) { + b.mu.Lock() + defer b.mu.Unlock() + + if _, ok := b.subscribers[ch]; !ok { + return + } + + delete(b.subscribers, ch) + close(ch) +} diff --git a/backend/logstream/buffer_test.go b/backend/logstream/buffer_test.go new file mode 100644 index 0000000..2dcdee2 --- /dev/null +++ b/backend/logstream/buffer_test.go @@ -0,0 +1,120 @@ +package logstream + +import ( + "context" + "testing" + "time" +) + +func TestBufferReplaysTailAndFansOut(t *testing.T) { + buffer := NewBuffer(3) + buffer.Publish("one") + buffer.Publish("two") + buffer.Publish("three") + buffer.Publish("four") + + ctxA, cancelA := context.WithCancel(context.Background()) + defer cancelA() + subA := buffer.Subscribe(ctxA) + + for _, want := range []string{"two", "three", "four"} { + if got := receiveLog(t, subA); got != want { + t.Fatalf("expected replayed log %q, got %q", want, got) + } + } + + ctxB, cancelB := context.WithCancel(context.Background()) + defer cancelB() + subB := buffer.Subscribe(ctxB) + + for _, want := range []string{"two", "three", "four"} { + if got := receiveLog(t, subB); got != want { + t.Fatalf("expected second subscriber replayed log %q, got %q", want, got) + } + } + + buffer.Publish("five") + + if got := receiveLog(t, subA); got != "five" { + t.Fatalf("expected first subscriber live log, got %q", got) + } + if got := receiveLog(t, subB); got != "five" { + t.Fatalf("expected second subscriber live log, got %q", got) + } + + cancelA() + assertClosed(t, subA) + + buffer.Publish("six") + if got := receiveLog(t, subB); got != "six" { + t.Fatalf("expected remaining subscriber live log, got %q", got) + } +} + +func TestBufferDoesNotBlockOnSlowSubscribers(t *testing.T) { + buffer := NewBuffer(1) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sub := buffer.Subscribe(ctx) + + buffer.Publish("one") + buffer.Publish("two") + + if got := receiveLog(t, sub); got != "one" { + t.Fatalf("expected slow subscriber to keep first queued log, got %q", got) + } + + freshCtx, freshCancel := context.WithCancel(context.Background()) + defer freshCancel() + freshSub := buffer.Subscribe(freshCtx) + if got := receiveLog(t, freshSub); got != "two" { + t.Fatalf("expected fresh subscriber to receive latest buffered log, got %q", got) + } +} + +func TestBufferCloseClosesSubscribers(t *testing.T) { + buffer := NewBuffer(2) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sub := buffer.Subscribe(ctx) + + buffer.Publish("line") + buffer.Close() + + if got := receiveLog(t, sub); got != "line" { + t.Fatalf("expected buffered line before close, got %q", got) + } + assertClosed(t, sub) + + closedSub := buffer.Subscribe(context.Background()) + assertClosed(t, closedSub) +} + +func receiveLog(t *testing.T, ch <-chan string) string { + t.Helper() + + select { + case got, ok := <-ch: + if !ok { + t.Fatal("expected log channel to be open") + } + return got + case <-time.After(time.Second): + t.Fatal("timed out waiting for log") + return "" + } +} + +func assertClosed(t *testing.T, ch <-chan string) { + t.Helper() + + select { + case _, ok := <-ch: + if ok { + t.Fatal("expected log channel to be closed") + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for log channel to close") + } +} diff --git a/backend/wireguard/log.go b/backend/wireguard/log.go index 2b55fa5..24c1c1f 100644 --- a/backend/wireguard/log.go +++ b/backend/wireguard/log.go @@ -43,14 +43,11 @@ func (wg *WireGuard) emitLog(severity logSeverity, message string) { wg.emitLogLocked(severity, message) } -// emitLogLocked sends to the backend log channel while the caller already holds wg.mu. +// emitLogLocked publishes to the backend log buffer while the caller already holds wg.mu. func (wg *WireGuard) emitLogLocked(severity logSeverity, message string) { - if wg.logChan == nil { + if wg.logs == nil { return } - select { - case wg.logChan <- formatWireGuardLogLine(severity, message): - default: - } + wg.logs.Publish(formatWireGuardLogLine(severity, message)) } diff --git a/backend/wireguard/wireguard.go b/backend/wireguard/wireguard.go index 777cf37..cebd840 100644 --- a/backend/wireguard/wireguard.go +++ b/backend/wireguard/wireguard.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/pasarguard/node/backend/logstream" "github.com/pasarguard/node/common" "github.com/pasarguard/node/config" "github.com/pasarguard/node/pkg/stats" @@ -53,7 +54,7 @@ type WireGuard struct { // Stats update config updateInterval time.Duration - logChan chan string + logs *logstream.Buffer cancelFunc context.CancelFunc startTime time.Time version string @@ -133,7 +134,7 @@ func newWithManagerFactory(cfg *config.Config, wgConfig *Config, users []*common statsTracker: stats.New(), interfaceStats: stats.NewInterfaceCountersTracker(), peerStore: NewPeerStore(), - logChan: make(chan string, cfg.LogBufferSize), + logs: logstream.NewBuffer(cfg.LogBufferSize), startTime: time.Now(), version: version, newManager: managerFactory, @@ -218,11 +219,18 @@ func (wg *WireGuard) Version() string { return wg.version } -// Logs returns the log channel as a receive-only channel. -// The channel is closed when Shutdown is called; callers should use range -// so they naturally stop reading once it is closed. -func (wg *WireGuard) Logs() <-chan string { - return wg.logChan +func (wg *WireGuard) SubscribeLogs(ctx context.Context) <-chan string { + wg.mu.RLock() + logs := wg.logs + wg.mu.RUnlock() + + if logs == nil { + ch := make(chan string) + close(ch) + return ch + } + + return logs.Subscribe(ctx) } // Restart applies a new configuration dynamically to the WireGuard interface without tearing it down. @@ -345,9 +353,9 @@ func (wg *WireGuard) shutdownLocked() { wg.state = lifecycleStopped wg.version = "" wg.emitLogLocked(logSeverityInfo, "wireguard shutdown complete") - if wg.logChan != nil { - close(wg.logChan) - wg.logChan = nil + if wg.logs != nil { + wg.logs.Close() + wg.logs = nil } log.Println("wireguard shutdown complete") diff --git a/backend/wireguard/wireguard_lifecycle_test.go b/backend/wireguard/wireguard_lifecycle_test.go index b7dd2d6..c6e456a 100644 --- a/backend/wireguard/wireguard_lifecycle_test.go +++ b/backend/wireguard/wireguard_lifecycle_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/pasarguard/node/backend" + "github.com/pasarguard/node/backend/logstream" "github.com/pasarguard/node/common" nodeconfig "github.com/pasarguard/node/config" "github.com/vishvananda/netlink" @@ -161,8 +162,11 @@ func TestWireGuardNewInitializesWithSingleConfigureCallIncludingPeers(t *testing t.Fatalf("expected one peer in startup configure call, got %d", len(configured.Peers)) } + logCtx, logCancel := context.WithCancel(context.Background()) + defer logCancel() + select { - case startupLog := <-wg.Logs(): + case startupLog := <-wg.SubscribeLogs(logCtx): pattern := regexp.MustCompile(`^\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} \[Info\] WireGuard interface wg-test initialized successfully$`) if !pattern.MatchString(startupLog) { t.Fatalf("expected startup log with timestamp prefix, got %q", startupLog) @@ -181,12 +185,15 @@ func TestWireGuardNewInitializesWithSingleConfigureCallIncludingPeers(t *testing } func TestWireGuardShutdownIsIdempotent(t *testing.T) { - _, cancel := context.WithCancel(context.Background()) - logChan := make(chan string) + _, shutdownCancel := context.WithCancel(context.Background()) + logs := logstream.NewBuffer(10) + logCtx, logCancel := context.WithCancel(context.Background()) + defer logCancel() + logSub := logs.Subscribe(logCtx) wg := &WireGuard{ - cancelFunc: cancel, - logChan: logChan, + cancelFunc: shutdownCancel, + logs: logs, updateTicker: time.NewTicker(time.Hour), cleanupTicker: time.NewTicker(time.Hour), state: lifecycleRunning, @@ -196,12 +203,24 @@ func TestWireGuardShutdownIsIdempotent(t *testing.T) { wg.Shutdown() select { - case _, ok := <-logChan: + case msg, ok := <-logSub: + if !ok { + t.Fatal("expected shutdown log before subscription close") + } + if !strings.Contains(msg, "wireguard shutdown complete") { + t.Fatalf("expected shutdown log, got %q", msg) + } + case <-time.After(time.Second): + t.Fatal("expected shutdown log to be emitted") + } + + select { + case _, ok := <-logSub: if ok { - t.Fatal("expected log channel to be closed after first shutdown") + t.Fatal("expected log subscription to be closed after first shutdown") } - default: - t.Fatal("expected log channel close signal after first shutdown") + case <-time.After(time.Second): + t.Fatal("expected log subscription close signal after first shutdown") } // Must not panic or alter state unexpectedly on second call. @@ -213,8 +232,8 @@ func TestWireGuardShutdownIsIdempotent(t *testing.T) { if wg.version != "" { t.Fatalf("expected empty version after shutdown, got %q", wg.version) } - if wg.logChan != nil { - t.Fatal("expected wg.logChan to be nil after shutdown") + if wg.logs != nil { + t.Fatal("expected wg.logs to be nil after shutdown") } } diff --git a/backend/xray/core.go b/backend/xray/core.go index 7ae9b28..708b89d 100644 --- a/backend/xray/core.go +++ b/backend/xray/core.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "github.com/pasarguard/node/backend/logstream" nodeLogger "github.com/pasarguard/node/logger" ) @@ -28,7 +29,7 @@ type Core struct { restarting bool stopping bool waitDone chan struct{} - logsChan chan string + logs *logstream.Buffer logPhase uint32 startupLogs *startupLogRing startupLogSize int @@ -52,7 +53,7 @@ func NewXRayCore(executablePath, assetsPath, configPath string, logBufferSize, s executablePath: executablePath, assetsPath: assetsPath, configPath: configPath, - logsChan: make(chan string, logBufferSize), + logs: logstream.NewBuffer(logBufferSize), logPhase: logPhaseRuntime, startupLogSize: startupLogTailSize, runtimeLogs: newStartupLogRing(10), @@ -366,10 +367,28 @@ func (c *Core) Restarting() bool { return c.restarting } -func (c *Core) Logs() <-chan string { +func (c *Core) SubscribeLogs(ctx context.Context) <-chan string { c.mu.Lock() - defer c.mu.Unlock() - return c.logsChan + logs := c.logs + c.mu.Unlock() + + if logs == nil { + ch := make(chan string) + close(ch) + return ch + } + + return logs.Subscribe(ctx) +} + +func (c *Core) CloseLogs() { + c.mu.Lock() + logs := c.logs + c.mu.Unlock() + + if logs != nil { + logs.Close() + } } // ProcessInfo holds information about a process diff --git a/backend/xray/log.go b/backend/xray/log.go index 409d25d..c12ff9e 100644 --- a/backend/xray/log.go +++ b/backend/xray/log.go @@ -57,25 +57,16 @@ func (c *Core) recordProcessLog(output string) { func (c *Core) captureStartupLogLine(output string) { c.RecordStartupLog(output) - - // Non-blocking send: skip if channel is full to prevent deadlock - select { - case c.logsChan <- output: - // Log sent successfully - default: - // Channel full, skip this log (prevents blocking xray process) + if c.logs != nil { + c.logs.Publish(output) } c.detectLogType(output) } func (c *Core) captureRuntimeLogLine(output string) { c.RecordRuntimeLog(output) - // Non-blocking send: skip if channel is full to prevent deadlock - select { - case c.logsChan <- output: - // Log sent successfully - default: - // Channel full, skip this log (prevents blocking xray process) + if c.logs != nil { + c.logs.Publish(output) } c.detectLogType(output) } diff --git a/backend/xray/xray.go b/backend/xray/xray.go index cc48c06..159ea5e 100644 --- a/backend/xray/xray.go +++ b/backend/xray/xray.go @@ -103,10 +103,18 @@ func New(ctx context.Context, xrayConfig *Config, users []*common.User, apiPort, return xray, nil } -func (x *Xray) Logs() <-chan string { +func (x *Xray) SubscribeLogs(ctx context.Context) <-chan string { x.mu.RLock() - defer x.mu.RUnlock() - return x.core.Logs() + core := x.core + x.mu.RUnlock() + + if core == nil { + ch := make(chan string) + close(ch) + return ch + } + + return core.SubscribeLogs(ctx) } func (x *Xray) Version() string { @@ -140,6 +148,7 @@ func (x *Xray) Shutdown() { // Stop core (this now waits for process termination) if x.core != nil { x.core.Stop() + x.core.CloseLogs() } // Close API handler diff --git a/backend/xray/xray_test.go b/backend/xray/xray_test.go index 1f4dd7b..9806517 100644 --- a/backend/xray/xray_test.go +++ b/backend/xray/xray_test.go @@ -137,7 +137,7 @@ func TestXrayBackend(t *testing.T) { ctx1, cancel = context.WithTimeout(context.Background(), time.Second*10) defer cancel() - logs := back.Logs() + logs := back.SubscribeLogs(ctx1) loop: for { select { diff --git a/controller/rest/log.go b/controller/rest/log.go index 3254b78..c6d5989 100644 --- a/controller/rest/log.go +++ b/controller/rest/log.go @@ -16,7 +16,7 @@ func (s *Service) GetLogs(w http.ResponseWriter, r *http.Request) { w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") - logChan := s.Backend().Logs() + logChan := s.Backend().SubscribeLogs(r.Context()) for { select { diff --git a/controller/rpc/log.go b/controller/rpc/log.go index 5ba726a..9228a67 100644 --- a/controller/rpc/log.go +++ b/controller/rpc/log.go @@ -8,7 +8,7 @@ import ( ) func (s *Service) GetLogs(_ *common.Empty, stream common.NodeService_GetLogsServer) error { - logChan := s.Backend().Logs() + logChan := s.Backend().SubscribeLogs(stream.Context()) for { select {