From a562f417a1fc5ab64b1f03188e4463e9ba948d16 Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Tue, 28 Apr 2026 08:04:51 +0000 Subject: [PATCH] fix(lifecycle): tighten shutdown ordering for DLQ metrics + AI workers + DLQ replay cap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three lifecycle fixes from codex round-2 production-readiness review: 1. DLQ metrics goroutine now exits on appCtx cancellation and is tracked in bootWG so it stops before dlq.Stop()/repo.Close() — was previously a leaked ticker that kept polling Size()/DiskBytes() on the closed DLQ, racing the file-handle close. 2. AI service workers now derive their LLM-call context from a shutdown-aware aiCtx (cancel-derived from appCtx) via SetParentContext. aiCancel() is invoked before aiService.Stop() so in-flight 30s LLM calls are cancelled immediately rather than blocking shutdown for up to 30s × workerPool. 3. DLQ replay worker now caps replayFn invocations per tick via DLQ_MAX_REPLAY_PER_TICK (default 100). Without the cap, an outage that filled the DLQ with 10k files would replay every file in the first post-restart tick, hammering the just-restarted DB and exhausting connection-pool capacity. Backoff-skipped files don't count toward the cap — they cost nothing. Tests: 3 new tests in internal/queue/ exercising cap-bounded, unlimited (legacy default), and negative-input clamping behaviour. All pass under -race. Full suite: 459/459 green. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/ai/service.go | 22 +++++- internal/config/config.go | 13 +++- internal/queue/dlq.go | 32 +++++++++ internal/queue/dlq_replay_cap_test.go | 99 +++++++++++++++++++++++++++ main.go | 32 +++++++-- 5 files changed, 188 insertions(+), 10 deletions(-) create mode 100644 internal/queue/dlq_replay_cap_test.go diff --git a/internal/ai/service.go b/internal/ai/service.go index b2908a0..883c89c 100644 --- a/internal/ai/service.go +++ b/internal/ai/service.go @@ -22,6 +22,14 @@ type Service struct { workQueue chan storage.Log workerPool int wg sync.WaitGroup + + // parentCtx is the application-level context. Workers derive their + // per-call timeout from this so an in-flight LLM call is cancelled + // when the application is shutting down — rather than blocking + // shutdown for up to 30s on each worker. Defaults to context.Background + // when SetParentContext isn't called (preserves legacy behaviour for + // embedded callers). + parentCtx context.Context } func NewService(repo *storage.Repository) *Service { @@ -78,13 +86,25 @@ func NewService(repo *storage.Repository) *Service { return s } +// SetParentContext wires the application-level context so worker LLM calls +// inherit cancellation on shutdown. Call once during boot before +// EnqueueLog starts taking traffic — the parentCtx is read on every +// dequeued log without locking. +func (s *Service) SetParentContext(ctx context.Context) { + s.parentCtx = ctx +} + func (s *Service) startWorkers() { for i := 0; i < s.workerPool; i++ { s.wg.Add(1) go func(workerID int) { defer s.wg.Done() for logEntry := range s.workQueue { - s.analyzeLog(context.Background(), logEntry) + ctx := s.parentCtx + if ctx == nil { + ctx = context.Background() + } + s.analyzeLog(ctx, logEntry) } }(i) } diff --git a/internal/config/config.go b/internal/config/config.go index 02696d0..3ae71f0 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -76,6 +76,12 @@ type Config struct { DLQMaxFiles int DLQMaxDiskMB int DLQMaxRetries int + // DLQMaxReplayPerTick caps how many DLQ files the replay worker attempts + // in a single tick. Without it, an outage that filled the DLQ with 10k + // files would replay all of them in the first post-restart tick, + // hammering the (just-restarted) DB and exhausting connections. + // 0 = unlimited (legacy default). + DLQMaxReplayPerTick int // API Protection APIRateLimitRPS int @@ -243,9 +249,10 @@ func Load(customPath string) (*Config, error) { MetricMaxCardinalityPerTenant: getEnvInt("METRIC_MAX_CARDINALITY_PER_TENANT", 0), // DLQ - DLQMaxFiles: getEnvInt("DLQ_MAX_FILES", 1000), - DLQMaxDiskMB: getEnvInt("DLQ_MAX_DISK_MB", 500), - DLQMaxRetries: getEnvInt("DLQ_MAX_RETRIES", 10), + DLQMaxFiles: getEnvInt("DLQ_MAX_FILES", 1000), + DLQMaxDiskMB: getEnvInt("DLQ_MAX_DISK_MB", 500), + DLQMaxRetries: getEnvInt("DLQ_MAX_RETRIES", 10), + DLQMaxReplayPerTick: getEnvInt("DLQ_MAX_REPLAY_PER_TICK", 100), // API APIRateLimitRPS: getEnvInt("API_RATE_LIMIT_RPS", 100), diff --git a/internal/queue/dlq.go b/internal/queue/dlq.go index 803b42a..806ca82 100644 --- a/internal/queue/dlq.go +++ b/internal/queue/dlq.go @@ -31,6 +31,13 @@ type DeadLetterQueue struct { maxDiskMB int64 // 0 = unlimited maxRetries int // 0 = unlimited + // maxReplayPerTick caps the number of files replayed per tick. Without + // this, an outage that filled the DLQ with 10k files would replay all + // of them in the first post-restart tick, hammering the (just-restarted) + // DB. 0 = unlimited (legacy default), set via SetMaxReplayPerTick or + // the DLQ_MAX_REPLAY_PER_TICK env var. + maxReplayPerTick int + // Per-file retry tracking (in-memory; resets on restart) retries map[string]int @@ -94,6 +101,18 @@ func (d *DeadLetterQueue) SetTelemetryMetrics(m *telemetry.Metrics) { d.metricsTel = m } +// SetMaxReplayPerTick caps how many files the replay worker will attempt in +// one tick. n <= 0 disables the cap (unlimited). Safe to call after +// construction; the next tick observes the new value. +func (d *DeadLetterQueue) SetMaxReplayPerTick(n int) { + d.mu.Lock() + defer d.mu.Unlock() + if n < 0 { + n = 0 + } + d.maxReplayPerTick = n +} + // EvictedCount reports the cumulative number of DLQ files dropped due to // MaxFiles/MaxDiskMB caps. Exposed for tests; see otelcontext_dlq_evicted_total. func (d *DeadLetterQueue) EvictedCount() int64 { return d.evicted.Load() } @@ -309,11 +328,23 @@ func (d *DeadLetterQueue) processFiles() { return } + d.mu.Lock() + replayCap := d.maxReplayPerTick + d.mu.Unlock() + replayed := 0 + attempts := 0 for _, entry := range entries { if entry.IsDir() || filepath.Ext(entry.Name()) != ".json" { continue } + // Cap actual replayFn calls per tick so a 10k-file backlog after an + // outage doesn't hammer the just-restarted DB. Backoff-skipped files + // don't count — they cost nothing. + if replayCap > 0 && attempts >= replayCap { + slog.Debug("DLQ: max replay-per-tick cap reached", "cap", replayCap) + break + } name := entry.Name() @@ -353,6 +384,7 @@ func (d *DeadLetterQueue) processFiles() { continue } + attempts++ if err := d.replayFn(data); err != nil { d.mu.Lock() d.retries[name]++ diff --git a/internal/queue/dlq_replay_cap_test.go b/internal/queue/dlq_replay_cap_test.go new file mode 100644 index 0000000..ea522b6 --- /dev/null +++ b/internal/queue/dlq_replay_cap_test.go @@ -0,0 +1,99 @@ +package queue + +import ( + "errors" + "sync/atomic" + "testing" + "time" +) + +// errReplayFailed is the sentinel used by tests below to force replay failure. +var errReplayFailed = errors.New("simulated replay failure") + +// TestDLQ_MaxReplayPerTick_BoundsAttempts verifies that SetMaxReplayPerTick +// caps replayFn invocations within a single processFiles call. Without +// this cap, a 10k-file backlog after an outage would replay every file in +// the first post-restart tick and hammer the just-restarted DB. +func TestDLQ_MaxReplayPerTick_BoundsAttempts(t *testing.T) { + dir := t.TempDir() + + var attempts atomic.Int64 + failingReplay := func([]byte) error { + attempts.Add(1) + return errReplayFailed + } + + // Long interval so the background ticker doesn't fire during the test; + // we drive processFiles manually to make assertions deterministic. + q, err := NewDLQWithLimits(dir, time.Hour, failingReplay, 0, 0, 0) + if err != nil { + t.Fatalf("NewDLQWithLimits: %v", err) + } + defer q.Stop() + + const total = 50 + const replayCap = 10 + q.SetMaxReplayPerTick(replayCap) + + for i := range total { + if err := q.Enqueue(map[string]int{"i": i}); err != nil { + t.Fatalf("Enqueue: %v", err) + } + } + + q.processFiles() + if got := attempts.Load(); got != int64(replayCap) { + t.Fatalf("expected %d replay attempts (cap=%d), got %d", replayCap, replayCap, got) + } +} + +// TestDLQ_MaxReplayPerTick_DisabledByDefault verifies that with the cap +// unset (0) processFiles attempts every queued file in a single tick — +// preserving legacy behaviour for callers that don't opt in. +func TestDLQ_MaxReplayPerTick_DisabledByDefault(t *testing.T) { + dir := t.TempDir() + + var attempts atomic.Int64 + failingReplay := func([]byte) error { + attempts.Add(1) + return errReplayFailed + } + + q, err := NewDLQWithLimits(dir, time.Hour, failingReplay, 0, 0, 0) + if err != nil { + t.Fatalf("NewDLQWithLimits: %v", err) + } + defer q.Stop() + + const total = 25 + for i := range total { + if err := q.Enqueue(map[string]int{"i": i}); err != nil { + t.Fatalf("Enqueue: %v", err) + } + } + + q.processFiles() + if got := attempts.Load(); got != int64(total) { + t.Fatalf("with no cap expected %d attempts, got %d", total, got) + } +} + +// TestDLQ_MaxReplayPerTick_NegativeNormalisesToZero ensures a negative +// argument is treated as "unlimited" rather than blocking all replay. +func TestDLQ_MaxReplayPerTick_NegativeNormalisesToZero(t *testing.T) { + dir := t.TempDir() + noop := func([]byte) error { return nil } + q, err := NewDLQWithLimits(dir, time.Hour, noop, 0, 0, 0) + if err != nil { + t.Fatalf("NewDLQWithLimits: %v", err) + } + defer q.Stop() + + q.SetMaxReplayPerTick(-5) + q.mu.Lock() + got := q.maxReplayPerTick + q.mu.Unlock() + if got != 0 { + t.Fatalf("expected -5 to clamp to 0 (unlimited), got %d", got) + } +} diff --git a/main.go b/main.go index b14180e..b668224 100644 --- a/main.go +++ b/main.go @@ -287,7 +287,9 @@ func main() { func(b int64) { metrics.DLQDiskBytes.Set(float64(b)) }, ) dlq.SetTelemetryMetrics(metrics) - slog.Info("🔁 DLQ initialized", "path", cfg.DLQPath, "interval", replayInterval) + dlq.SetMaxReplayPerTick(cfg.DLQMaxReplayPerTick) + slog.Info("🔁 DLQ initialized", "path", cfg.DLQPath, "interval", replayInterval, + "max_replay_per_tick", cfg.DLQMaxReplayPerTick) // 4. Initialize Real-Time WebSocket Hub hub := realtime.NewHub(func(count int) { @@ -405,8 +407,14 @@ func main() { slog.Error("Failed to migrate GraphRAG models", "error", err) } - // 5. Initialize AI Service + // 5. Initialize AI Service. + // Workers inherit aiCtx so an in-flight LLM call (30s timeout) is + // cancelled the moment shutdown begins — without this, aiService.Stop() + // blocks for up to 30s per in-flight worker waiting on the upstream + // HTTP call to finish. + aiCtx, aiCancel := context.WithCancel(appCtx) aiService := ai.NewService(repo) + aiService.SetParentContext(aiCtx) // 6. Initialize API Server apiServer := api.NewServer(repo, hub, eventHub, metrics) @@ -535,13 +543,22 @@ func main() { graphRAG.OnMetricIngested(m) }) - // Update DLQ size metric periodically + // Update DLQ size metric periodically. Tied to appCtx so the goroutine + // exits before dlq.Stop() — otherwise it keeps polling Size()/DiskBytes() + // on a stopped DLQ and races with the file-handle close in repo.Close(). + bootWG.Add(1) go func() { + defer bootWG.Done() ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() - for range ticker.C { - metrics.SetDLQSize(dlq.Size()) - metrics.DLQDiskBytes.Set(float64(dlq.DiskBytes())) + for { + select { + case <-appCtx.Done(): + return + case <-ticker.C: + metrics.SetDLQSize(dlq.Size()) + metrics.DLQDiskBytes.Set(float64(dlq.DiskBytes())) + } } }() @@ -836,6 +853,9 @@ func main() { // 2. Stop real-time hubs and event processing hub.Stop() cancelEvents() + // Cancel in-flight LLM calls BEFORE Stop so workers don't burn the + // 30s LLM deadline waiting on a half-dead upstream during shutdown. + aiCancel() aiService.Stop() // 3. Stop processing engines (TSDB flush, graph, GraphRAG)