Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion internal/ai/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@
workQueue chan storage.Log
workerPool int
wg sync.WaitGroup

// parentCtx is the application-level context. Workers derive their
// per-call timeout from this so an in-flight LLM call is cancelled
// when the application is shutting down — rather than blocking
// shutdown for up to 30s on each worker. Defaults to context.Background
// when SetParentContext isn't called (preserves legacy behaviour for
// embedded callers).
parentCtx context.Context

Check warning on line 32 in internal/ai/service.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'context.Context' field and pass context as a parameter to methods that need it.

See more on https://sonarcloud.io/project/issues?id=RandomCodeSpace_otelcontext&issues=AZ3TH3gCI103DEeFLMYL&open=AZ3TH3gCI103DEeFLMYL&pullRequest=68
}

func NewService(repo *storage.Repository) *Service {
Expand Down Expand Up @@ -78,13 +86,25 @@
return s
}

// SetParentContext wires the application-level context so worker LLM calls
// inherit cancellation on shutdown. Call once during boot before
// EnqueueLog starts taking traffic — the parentCtx is read on every
// dequeued log without locking.
func (s *Service) SetParentContext(ctx context.Context) {
s.parentCtx = ctx
}

func (s *Service) startWorkers() {
for i := 0; i < s.workerPool; i++ {
s.wg.Add(1)
go func(workerID int) {
defer s.wg.Done()
for logEntry := range s.workQueue {
s.analyzeLog(context.Background(), logEntry)
ctx := s.parentCtx
if ctx == nil {
ctx = context.Background()
}
s.analyzeLog(ctx, logEntry)
}
}(i)
}
Expand Down
13 changes: 10 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ type Config struct {
DLQMaxFiles int
DLQMaxDiskMB int
DLQMaxRetries int
// DLQMaxReplayPerTick caps how many DLQ files the replay worker attempts
// in a single tick. Without it, an outage that filled the DLQ with 10k
// files would replay all of them in the first post-restart tick,
// hammering the (just-restarted) DB and exhausting connections.
// 0 = unlimited (legacy default).
DLQMaxReplayPerTick int

// API Protection
APIRateLimitRPS int
Expand Down Expand Up @@ -243,9 +249,10 @@ func Load(customPath string) (*Config, error) {
MetricMaxCardinalityPerTenant: getEnvInt("METRIC_MAX_CARDINALITY_PER_TENANT", 0),

// DLQ
DLQMaxFiles: getEnvInt("DLQ_MAX_FILES", 1000),
DLQMaxDiskMB: getEnvInt("DLQ_MAX_DISK_MB", 500),
DLQMaxRetries: getEnvInt("DLQ_MAX_RETRIES", 10),
DLQMaxFiles: getEnvInt("DLQ_MAX_FILES", 1000),
DLQMaxDiskMB: getEnvInt("DLQ_MAX_DISK_MB", 500),
DLQMaxRetries: getEnvInt("DLQ_MAX_RETRIES", 10),
DLQMaxReplayPerTick: getEnvInt("DLQ_MAX_REPLAY_PER_TICK", 100),

// API
APIRateLimitRPS: getEnvInt("API_RATE_LIMIT_RPS", 100),
Expand Down
32 changes: 32 additions & 0 deletions internal/queue/dlq.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ type DeadLetterQueue struct {
maxDiskMB int64 // 0 = unlimited
maxRetries int // 0 = unlimited

// maxReplayPerTick caps the number of files replayed per tick. Without
// this, an outage that filled the DLQ with 10k files would replay all
// of them in the first post-restart tick, hammering the (just-restarted)
// DB. 0 = unlimited (legacy default), set via SetMaxReplayPerTick or
// the DLQ_MAX_REPLAY_PER_TICK env var.
maxReplayPerTick int

// Per-file retry tracking (in-memory; resets on restart)
retries map[string]int

Expand Down Expand Up @@ -94,6 +101,18 @@ func (d *DeadLetterQueue) SetTelemetryMetrics(m *telemetry.Metrics) {
d.metricsTel = m
}

// SetMaxReplayPerTick caps how many files the replay worker will attempt in
// one tick. n <= 0 disables the cap (unlimited). Safe to call after
// construction; the next tick observes the new value.
func (d *DeadLetterQueue) SetMaxReplayPerTick(n int) {
d.mu.Lock()
defer d.mu.Unlock()
if n < 0 {
n = 0
}
d.maxReplayPerTick = n
}

// EvictedCount reports the cumulative number of DLQ files dropped due to
// MaxFiles/MaxDiskMB caps. Exposed for tests; see otelcontext_dlq_evicted_total.
func (d *DeadLetterQueue) EvictedCount() int64 { return d.evicted.Load() }
Expand Down Expand Up @@ -309,11 +328,23 @@ func (d *DeadLetterQueue) processFiles() {
return
}

d.mu.Lock()
replayCap := d.maxReplayPerTick
d.mu.Unlock()

replayed := 0
attempts := 0
for _, entry := range entries {
if entry.IsDir() || filepath.Ext(entry.Name()) != ".json" {
continue
}
// Cap actual replayFn calls per tick so a 10k-file backlog after an
// outage doesn't hammer the just-restarted DB. Backoff-skipped files
// don't count — they cost nothing.
if replayCap > 0 && attempts >= replayCap {
slog.Debug("DLQ: max replay-per-tick cap reached", "cap", replayCap)
break
}

name := entry.Name()

Expand Down Expand Up @@ -353,6 +384,7 @@ func (d *DeadLetterQueue) processFiles() {
continue
}

attempts++
if err := d.replayFn(data); err != nil {
d.mu.Lock()
d.retries[name]++
Expand Down
99 changes: 99 additions & 0 deletions internal/queue/dlq_replay_cap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package queue

import (
"errors"
"sync/atomic"
"testing"
"time"
)

// errReplayFailed is the sentinel used by tests below to force replay failure.
var errReplayFailed = errors.New("simulated replay failure")

// TestDLQ_MaxReplayPerTick_BoundsAttempts verifies that SetMaxReplayPerTick
// caps replayFn invocations within a single processFiles call. Without
// this cap, a 10k-file backlog after an outage would replay every file in
// the first post-restart tick and hammer the just-restarted DB.
func TestDLQ_MaxReplayPerTick_BoundsAttempts(t *testing.T) {
dir := t.TempDir()

var attempts atomic.Int64
failingReplay := func([]byte) error {
attempts.Add(1)
return errReplayFailed
}

// Long interval so the background ticker doesn't fire during the test;
// we drive processFiles manually to make assertions deterministic.
q, err := NewDLQWithLimits(dir, time.Hour, failingReplay, 0, 0, 0)
if err != nil {
t.Fatalf("NewDLQWithLimits: %v", err)
}
defer q.Stop()

const total = 50
const replayCap = 10
q.SetMaxReplayPerTick(replayCap)

for i := range total {
if err := q.Enqueue(map[string]int{"i": i}); err != nil {
t.Fatalf("Enqueue: %v", err)
}
}

q.processFiles()
if got := attempts.Load(); got != int64(replayCap) {
t.Fatalf("expected %d replay attempts (cap=%d), got %d", replayCap, replayCap, got)
}
}

// TestDLQ_MaxReplayPerTick_DisabledByDefault verifies that with the cap
// unset (0) processFiles attempts every queued file in a single tick —
// preserving legacy behaviour for callers that don't opt in.
func TestDLQ_MaxReplayPerTick_DisabledByDefault(t *testing.T) {
dir := t.TempDir()

var attempts atomic.Int64
failingReplay := func([]byte) error {
attempts.Add(1)
return errReplayFailed
}

q, err := NewDLQWithLimits(dir, time.Hour, failingReplay, 0, 0, 0)
if err != nil {
t.Fatalf("NewDLQWithLimits: %v", err)
}
defer q.Stop()

const total = 25
for i := range total {
if err := q.Enqueue(map[string]int{"i": i}); err != nil {
t.Fatalf("Enqueue: %v", err)
}
}

q.processFiles()
if got := attempts.Load(); got != int64(total) {
t.Fatalf("with no cap expected %d attempts, got %d", total, got)
}
}

// TestDLQ_MaxReplayPerTick_NegativeNormalisesToZero ensures a negative
// argument is treated as "unlimited" rather than blocking all replay.
func TestDLQ_MaxReplayPerTick_NegativeNormalisesToZero(t *testing.T) {
dir := t.TempDir()
noop := func([]byte) error { return nil }
q, err := NewDLQWithLimits(dir, time.Hour, noop, 0, 0, 0)
if err != nil {
t.Fatalf("NewDLQWithLimits: %v", err)
}
defer q.Stop()

q.SetMaxReplayPerTick(-5)
q.mu.Lock()
got := q.maxReplayPerTick
q.mu.Unlock()
if got != 0 {
t.Fatalf("expected -5 to clamp to 0 (unlimited), got %d", got)
}
}
32 changes: 26 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ func main() {
func(b int64) { metrics.DLQDiskBytes.Set(float64(b)) },
)
dlq.SetTelemetryMetrics(metrics)
slog.Info("🔁 DLQ initialized", "path", cfg.DLQPath, "interval", replayInterval)
dlq.SetMaxReplayPerTick(cfg.DLQMaxReplayPerTick)
slog.Info("🔁 DLQ initialized", "path", cfg.DLQPath, "interval", replayInterval,
"max_replay_per_tick", cfg.DLQMaxReplayPerTick)

// 4. Initialize Real-Time WebSocket Hub
hub := realtime.NewHub(func(count int) {
Expand Down Expand Up @@ -405,8 +407,14 @@ func main() {
slog.Error("Failed to migrate GraphRAG models", "error", err)
}

// 5. Initialize AI Service
// 5. Initialize AI Service.
// Workers inherit aiCtx so an in-flight LLM call (30s timeout) is
// cancelled the moment shutdown begins — without this, aiService.Stop()
// blocks for up to 30s per in-flight worker waiting on the upstream
// HTTP call to finish.
aiCtx, aiCancel := context.WithCancel(appCtx)
aiService := ai.NewService(repo)
aiService.SetParentContext(aiCtx)

// 6. Initialize API Server
apiServer := api.NewServer(repo, hub, eventHub, metrics)
Expand Down Expand Up @@ -535,13 +543,22 @@ func main() {
graphRAG.OnMetricIngested(m)
})

// Update DLQ size metric periodically
// Update DLQ size metric periodically. Tied to appCtx so the goroutine
// exits before dlq.Stop() — otherwise it keeps polling Size()/DiskBytes()
// on a stopped DLQ and races with the file-handle close in repo.Close().
bootWG.Add(1)
go func() {
defer bootWG.Done()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
metrics.SetDLQSize(dlq.Size())
metrics.DLQDiskBytes.Set(float64(dlq.DiskBytes()))
for {
select {
case <-appCtx.Done():
return
case <-ticker.C:
metrics.SetDLQSize(dlq.Size())
metrics.DLQDiskBytes.Set(float64(dlq.DiskBytes()))
}
}
}()

Expand Down Expand Up @@ -836,6 +853,9 @@ func main() {
// 2. Stop real-time hubs and event processing
hub.Stop()
cancelEvents()
// Cancel in-flight LLM calls BEFORE Stop so workers don't burn the
// 30s LLM deadline waiting on a half-dead upstream during shutdown.
aiCancel()
aiService.Stop()

// 3. Stop processing engines (TSDB flush, graph, GraphRAG)
Expand Down
Loading