Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
type Backend interface {
Started() bool
Version() string
Logs() <-chan string
SubscribeLogs(context.Context) <-chan string
Restart() error
Shutdown()
SyncUser(context.Context, *common.User) error
Expand Down
135 changes: 135 additions & 0 deletions backend/logstream/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package logstream

import (
"context"
"sync"
)

type Buffer struct {
mu sync.Mutex
lines []string
next int
count int
subscribers map[chan string]struct{}
done chan struct{}
closed bool
}

func NewBuffer(size int) *Buffer {
if size <= 0 {
size = 1
}

return &Buffer{
lines: make([]string, size),
subscribers: make(map[chan string]struct{}),
done: make(chan struct{}),
}
}

func (b *Buffer) Publish(line string) {
b.mu.Lock()
defer b.mu.Unlock()

if b.closed {
return
}

b.lines[b.next] = line
b.next = (b.next + 1) % len(b.lines)
if b.count < len(b.lines) {
b.count++
}

for subscriber := range b.subscribers {
select {
case subscriber <- line:
default:
}
}
}

func (b *Buffer) Subscribe(ctx context.Context) <-chan string {
if ctx == nil {
ctx = context.Background()
}

ch := make(chan string, b.capacity())

b.mu.Lock()
if b.closed {
close(ch)
b.mu.Unlock()
return ch
}

for _, line := range b.snapshotLocked() {
ch <- line
}
b.subscribers[ch] = struct{}{}
b.mu.Unlock()

done := b.done
go func() {
select {
case <-ctx.Done():
b.unsubscribe(ch)
case <-done:
}
}()

return ch
}

func (b *Buffer) Close() {
b.mu.Lock()
defer b.mu.Unlock()

if b.closed {
return
}

b.closed = true
close(b.done)
for subscriber := range b.subscribers {
close(subscriber)
delete(b.subscribers, subscriber)
}
}

func (b *Buffer) capacity() int {
if b == nil || len(b.lines) == 0 {
return 1
}
return len(b.lines)
}

func (b *Buffer) snapshotLocked() []string {
snapshot := make([]string, 0, b.count)
if b.count == 0 {
return snapshot
}

start := 0
if b.count == len(b.lines) {
start = b.next
}

for i := 0; i < b.count; i++ {
snapshot = append(snapshot, b.lines[(start+i)%len(b.lines)])
}

return snapshot
}

func (b *Buffer) unsubscribe(ch chan string) {
b.mu.Lock()
defer b.mu.Unlock()

if _, ok := b.subscribers[ch]; !ok {
return
}

delete(b.subscribers, ch)
close(ch)
}
120 changes: 120 additions & 0 deletions backend/logstream/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package logstream

import (
"context"
"testing"
"time"
)

func TestBufferReplaysTailAndFansOut(t *testing.T) {
buffer := NewBuffer(3)
buffer.Publish("one")
buffer.Publish("two")
buffer.Publish("three")
buffer.Publish("four")

ctxA, cancelA := context.WithCancel(context.Background())
defer cancelA()
subA := buffer.Subscribe(ctxA)

for _, want := range []string{"two", "three", "four"} {
if got := receiveLog(t, subA); got != want {
t.Fatalf("expected replayed log %q, got %q", want, got)
}
}

ctxB, cancelB := context.WithCancel(context.Background())
defer cancelB()
subB := buffer.Subscribe(ctxB)

for _, want := range []string{"two", "three", "four"} {
if got := receiveLog(t, subB); got != want {
t.Fatalf("expected second subscriber replayed log %q, got %q", want, got)
}
}

buffer.Publish("five")

if got := receiveLog(t, subA); got != "five" {
t.Fatalf("expected first subscriber live log, got %q", got)
}
if got := receiveLog(t, subB); got != "five" {
t.Fatalf("expected second subscriber live log, got %q", got)
}

cancelA()
assertClosed(t, subA)

buffer.Publish("six")
if got := receiveLog(t, subB); got != "six" {
t.Fatalf("expected remaining subscriber live log, got %q", got)
}
}

func TestBufferDoesNotBlockOnSlowSubscribers(t *testing.T) {
buffer := NewBuffer(1)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sub := buffer.Subscribe(ctx)

buffer.Publish("one")
buffer.Publish("two")

if got := receiveLog(t, sub); got != "one" {
t.Fatalf("expected slow subscriber to keep first queued log, got %q", got)
}

freshCtx, freshCancel := context.WithCancel(context.Background())
defer freshCancel()
freshSub := buffer.Subscribe(freshCtx)
if got := receiveLog(t, freshSub); got != "two" {
t.Fatalf("expected fresh subscriber to receive latest buffered log, got %q", got)
}
}

func TestBufferCloseClosesSubscribers(t *testing.T) {
buffer := NewBuffer(2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sub := buffer.Subscribe(ctx)

buffer.Publish("line")
buffer.Close()

if got := receiveLog(t, sub); got != "line" {
t.Fatalf("expected buffered line before close, got %q", got)
}
assertClosed(t, sub)

closedSub := buffer.Subscribe(context.Background())
assertClosed(t, closedSub)
}

func receiveLog(t *testing.T, ch <-chan string) string {
t.Helper()

select {
case got, ok := <-ch:
if !ok {
t.Fatal("expected log channel to be open")
}
return got
case <-time.After(time.Second):
t.Fatal("timed out waiting for log")
return ""
}
}

func assertClosed(t *testing.T, ch <-chan string) {
t.Helper()

select {
case _, ok := <-ch:
if ok {
t.Fatal("expected log channel to be closed")
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for log channel to close")
}
}
9 changes: 3 additions & 6 deletions backend/wireguard/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,11 @@ func (wg *WireGuard) emitLog(severity logSeverity, message string) {
wg.emitLogLocked(severity, message)
}

// emitLogLocked sends to the backend log channel while the caller already holds wg.mu.
// emitLogLocked publishes to the backend log buffer while the caller already holds wg.mu.
func (wg *WireGuard) emitLogLocked(severity logSeverity, message string) {
if wg.logChan == nil {
if wg.logs == nil {
return
}

select {
case wg.logChan <- formatWireGuardLogLine(severity, message):
default:
}
wg.logs.Publish(formatWireGuardLogLine(severity, message))
}
28 changes: 18 additions & 10 deletions backend/wireguard/wireguard.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/pasarguard/node/backend/logstream"
"github.com/pasarguard/node/common"
"github.com/pasarguard/node/config"
"github.com/pasarguard/node/pkg/stats"
Expand Down Expand Up @@ -53,7 +54,7 @@ type WireGuard struct {
// Stats update config
updateInterval time.Duration

logChan chan string
logs *logstream.Buffer
cancelFunc context.CancelFunc
startTime time.Time
version string
Expand Down Expand Up @@ -133,7 +134,7 @@ func newWithManagerFactory(cfg *config.Config, wgConfig *Config, users []*common
statsTracker: stats.New(),
interfaceStats: stats.NewInterfaceCountersTracker(),
peerStore: NewPeerStore(),
logChan: make(chan string, cfg.LogBufferSize),
logs: logstream.NewBuffer(cfg.LogBufferSize),
startTime: time.Now(),
version: version,
newManager: managerFactory,
Expand Down Expand Up @@ -218,11 +219,18 @@ func (wg *WireGuard) Version() string {
return wg.version
}

// Logs returns the log channel as a receive-only channel.
// The channel is closed when Shutdown is called; callers should use range
// so they naturally stop reading once it is closed.
func (wg *WireGuard) Logs() <-chan string {
return wg.logChan
func (wg *WireGuard) SubscribeLogs(ctx context.Context) <-chan string {
wg.mu.RLock()
logs := wg.logs
wg.mu.RUnlock()

if logs == nil {
ch := make(chan string)
close(ch)
return ch
}

return logs.Subscribe(ctx)
}

// Restart applies a new configuration dynamically to the WireGuard interface without tearing it down.
Expand Down Expand Up @@ -345,9 +353,9 @@ func (wg *WireGuard) shutdownLocked() {
wg.state = lifecycleStopped
wg.version = ""
wg.emitLogLocked(logSeverityInfo, "wireguard shutdown complete")
if wg.logChan != nil {
close(wg.logChan)
wg.logChan = nil
if wg.logs != nil {
wg.logs.Close()
wg.logs = nil
}

log.Println("wireguard shutdown complete")
Expand Down
Loading