From 2d504f89f3b1734e12042326bee07da89099dc08 Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Tue, 28 Apr 2026 05:29:31 +0000 Subject: [PATCH 1/4] fix(robustness): P1+P2 follow-ups from brainstorm MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the four actionable follow-ups deferred from the codex round-2 brainstorm. The fifth ("DLQ replay idempotency for spans/logs") is still deferred — it requires a schema migration to add unique indexes plus a pre-migration dedup pass, not a code-only fix. P1 — tenant pipeline starvation fairness (`internal/ingest/pipeline.go`, `config.go`, `main.go`). - Added per-tenant in-flight cap to Pipeline. When set, healthy submissions from a tenant already at the cap are dropped at Submit() with reason "tenant_backpressure" (separate counter from DroppedHealthy / RejectedFull). Priority batches (errors / slow traces) bypass — diagnostic data must always land. - Slot is reserved at Submit and released in process()'s deferred cleanup, so panics still release. Failed channel sends in Submit undo the reservation before returning ErrQueueFull. - Wired via INGEST_PIPELINE_PER_TENANT_CAP (default 0 = disabled, no behavior change for single-tenant deployments). Multi-tenant deployments should set this to ~Capacity/N×2 where N is the number of concurrently-active tenants. P1 — pipeline drain-path metric blind spot: skipped. The brainstorm's framing was that Stop()'s drain bypasses observeDrop accounting, but verification shows the drain path PROCESSES (not drops) batches via process(), and there are no shutdown-phase drops to account for. P2 — /ready saturation probes (`internal/api/health_handlers.go`, `server.go`, `main.go`). - Server gains two callback-shaped probes (DLQ disk fullness, ingest pipeline fullness). Probes return a fraction in [0,1]; /ready flips to 503 above 0.95 so orchestrators stop sending traffic before the pipeline starts hard-rejecting (gRPC RESOURCE_EXHAUSTED / HTTP 429) or DLQ starts FIFO-evicting. - Wired in main.go using closures that capture dlq + ingestPipeline. Probes are nil-tolerant on the server side; nil = "skipped" rather than fatal. P2 — retention adaptive pacing (`internal/storage/retention.go`). - New adaptPurgeSleep on RetentionScheduler. After each purge pass, measures wall-clock time vs purgeInterval. >50% → double the inter-batch sleep (capped at 100ms). <10% → halve it (floored at 1ms). Single-writer (the retention loop), so no synchronization needed; purge methods read the value once at the call boundary. P2 — GraphSnapshot row cap (`internal/graphrag/snapshot.go`). - pruneOldSnapshots now enforces a 100k row backstop after the by-age delete. Steady-state at 15-min cadence × 100 tenants is ~67k rows; 100k is enough headroom that the backstop never triggers under normal operation but bounds disk if write rate spikes. Tests: three new pipeline tests lock in the per-tenant cap contract (drops past cap, priority bypasses, slot released after process). Full suite: 407 tests pass under `-race -count=1`. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/api/health_handlers.go | 32 ++++++++++ internal/api/server.go | 22 +++++++ internal/config/config.go | 15 ++++- internal/graphrag/snapshot.go | 32 +++++++++- internal/ingest/pipeline.go | 89 +++++++++++++++++++++++++-- internal/ingest/pipeline_test.go | 102 +++++++++++++++++++++++++++++++ internal/storage/retention.go | 54 ++++++++++++++++ main.go | 22 +++++++ 8 files changed, 359 insertions(+), 9 deletions(-) diff --git a/internal/api/health_handlers.go b/internal/api/health_handlers.go index e386266..67b3e83 100644 --- a/internal/api/health_handlers.go +++ b/internal/api/health_handlers.go @@ -3,10 +3,17 @@ package api import ( "context" "encoding/json" + "fmt" "net/http" "time" ) +// readySaturationThreshold is the fullness fraction at which a saturation +// probe (DLQ disk, ingest pipeline) flips /ready to 503. Set high enough +// that brief spikes don't cause restart loops, low enough that orchestrators +// stop sending traffic before the system fails outright. +const readySaturationThreshold = 0.95 + // handleLive is a Kubernetes-style liveness probe. // Returns 200 OK as long as the process is up. Does not check dependencies. func (s *Server) handleLive(w http.ResponseWriter, r *http.Request) { @@ -55,6 +62,31 @@ func (s *Server) handleReady(w http.ResponseWriter, r *http.Request) { ready = false } + // Saturation probes — flip to 503 when downstream buffers are full so + // orchestrators (k8s, load balancers) stop routing fresh traffic before + // the pipeline starts hard-rejecting (gRPC RESOURCE_EXHAUSTED / HTTP 429) + // or DLQ starts FIFO-evicting unflushed batches. + if s.dlqSaturation != nil { + if sat := s.dlqSaturation(); sat >= readySaturationThreshold { + checks["dlq_disk"] = fmt.Sprintf("saturated %.0f%%", sat*100) + ready = false + } else { + checks["dlq_disk"] = "ok" + } + } else { + checks["dlq_disk"] = "skipped" + } + if s.pipelineSaturation != nil { + if sat := s.pipelineSaturation(); sat >= readySaturationThreshold { + checks["pipeline"] = fmt.Sprintf("saturated %.0f%%", sat*100) + ready = false + } else { + checks["pipeline"] = "ok" + } + } else { + checks["pipeline"] = "skipped" + } + status := http.StatusOK if !ready { status = http.StatusServiceUnavailable diff --git a/internal/api/server.go b/internal/api/server.go index 5da6fbd..338fb2c 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -23,6 +23,13 @@ type Server struct { graph *graph.Graph // in-memory service dependency graph (may be nil before first build) graphRAG *graphrag.GraphRAG // layered GraphRAG for advanced queries vectorIdx *vectordb.Index // TF-IDF semantic log search index + + // Saturation probes consulted by /ready. Each returns a fullness + // fraction in [0.0, 1.0]; nil disables the corresponding check. + // Decoupling via callbacks keeps the api package free of queue/ingest + // imports and lets tests inject deterministic values. + dlqSaturation func() float64 + pipelineSaturation func() float64 } // NewServer creates a new API server. @@ -51,6 +58,21 @@ func (s *Server) SetVectorIndex(idx *vectordb.Index) { s.vectorIdx = idx } +// SetDLQSaturationProbe registers a callback returning DLQ disk fullness as +// a fraction in [0.0, 1.0]. Used by /ready to flip to 503 when DLQ is at +// risk of FIFO-evicting unflushed batches. Pass nil to disable the check. +func (s *Server) SetDLQSaturationProbe(fn func() float64) { + s.dlqSaturation = fn +} + +// SetPipelineSaturationProbe registers a callback returning ingest pipeline +// queue fullness as a fraction in [0.0, 1.0]. Used by /ready to flip to 503 +// when the pipeline is at hard capacity (already returning 429/RESOURCE_EXHAUSTED +// to clients). Pass nil to disable the check. +func (s *Server) SetPipelineSaturationProbe(fn func() float64) { + s.pipelineSaturation = fn +} + // RegisterRoutes registers API endpoints on the provided mux. func (s *Server) RegisterRoutes(mux *http.ServeMux) { // Metadata & Discovery diff --git a/internal/config/config.go b/internal/config/config.go index 4724b7d..02696d0 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -121,6 +121,14 @@ type Config struct { IngestAsyncEnabled bool // default true; opt out via INGEST_ASYNC_ENABLED=false IngestPipelineQueueSize int // default 50000 batches; per-deployment tunable IngestPipelineWorkers int // default 8 worker goroutines + // IngestPipelinePerTenantCap caps in-flight batches per tenant so a noisy + // tenant cannot starve siblings of fresh queue slots when fullness is + // below the soft-backpressure threshold. 0 (default) disables — single- + // tenant deployments need no cap. Operators on multi-tenant deployments + // should set INGEST_PIPELINE_PER_TENANT_CAP to roughly Capacity/N where + // N is the expected number of concurrently-active tenants, with some + // headroom (e.g. 2× the fair-share value) for short bursts. + IngestPipelinePerTenantCap int // TLS (HTTP + gRPC). When both paths are set, TLS is enabled on both servers. // Empty values (default) keep plaintext behavior. @@ -260,9 +268,10 @@ func Load(customPath string) (*Config, error) { GraphRAGEventQueueSize: getEnvInt("GRAPHRAG_EVENT_QUEUE_SIZE", 100000), // Async ingest pipeline - IngestAsyncEnabled: getEnvBool("INGEST_ASYNC_ENABLED", true), - IngestPipelineQueueSize: getEnvInt("INGEST_PIPELINE_QUEUE_SIZE", 50000), - IngestPipelineWorkers: getEnvInt("INGEST_PIPELINE_WORKERS", 8), + IngestAsyncEnabled: getEnvBool("INGEST_ASYNC_ENABLED", true), + IngestPipelineQueueSize: getEnvInt("INGEST_PIPELINE_QUEUE_SIZE", 50000), + IngestPipelineWorkers: getEnvInt("INGEST_PIPELINE_WORKERS", 8), + IngestPipelinePerTenantCap: getEnvInt("INGEST_PIPELINE_PER_TENANT_CAP", 0), // TLS TLSCertFile: getEnv("TLS_CERT_FILE", ""), diff --git a/internal/graphrag/snapshot.go b/internal/graphrag/snapshot.go index aef160f..da13598 100644 --- a/internal/graphrag/snapshot.go +++ b/internal/graphrag/snapshot.go @@ -142,7 +142,16 @@ func (g *GraphRAG) takeSnapshotForTenant(_ context.Context, tenant string, store ) } -// pruneOldSnapshots removes snapshots older than 7 days. +// maxSnapshotRows is a row-count backstop on `graph_snapshots` to prevent +// unbounded disk growth when the write rate outruns the 7-day age prune. +// Steady state at 15-min cadence × 100 tenants is ~67k rows/week, so 100k +// gives ~50% headroom — high enough to never trigger under normal operation, +// low enough to bound disk if a misconfig or tenant explosion runs the +// snapshotter hot. +const maxSnapshotRows = 100_000 + +// pruneOldSnapshots removes snapshots older than 7 days, then enforces a +// row-count backstop in case the by-age prune isn't keeping up. func (g *GraphRAG) pruneOldSnapshots() { if g.repo == nil || g.repo.DB() == nil { return @@ -154,6 +163,27 @@ func (g *GraphRAG) pruneOldSnapshots() { } else if result.RowsAffected > 0 { slog.Info("Pruned old graph snapshots", "count", result.RowsAffected) } + + var count int64 + if err := g.repo.DB().Model(&GraphSnapshot{}).Count(&count).Error; err != nil { + slog.Error("Failed to count snapshots for row-cap prune", "error", err) + return + } + if count <= maxSnapshotRows { + return + } + excess := count - maxSnapshotRows + // Subquery selects the N oldest IDs, then deletes that set. Portable + // across SQLite and Postgres; avoids a multi-statement transaction. + sub := g.repo.DB().Model(&GraphSnapshot{}).Select("id").Order("created_at ASC").Limit(int(excess)) + if err := g.repo.DB().Where("id IN (?)", sub).Delete(&GraphSnapshot{}).Error; err != nil { + slog.Error("Failed to row-cap prune snapshots", "error", err) + return + } + slog.Warn("graphrag: row-cap pruned snapshots (write rate exceeded by-age prune)", + "deleted", excess, + "cap", maxSnapshotRows, + ) } // GetGraphSnapshot retrieves the snapshot closest to the requested time, diff --git a/internal/ingest/pipeline.go b/internal/ingest/pipeline.go index 4c20609..02c4b15 100644 --- a/internal/ingest/pipeline.go +++ b/internal/ingest/pipeline.go @@ -130,6 +130,16 @@ type Pipeline struct { droppedHealthy atomic.Int64 rejectedFull atomic.Int64 processFailures atomic.Int64 + tenantDropped atomic.Int64 + + // Per-tenant in-flight cap — bounds the queue slots a single tenant + // can consume so a noisy tenant cannot starve siblings of fresh + // healthy submissions when fullness is below the soft threshold. + // Priority batches (errors/slow) bypass the cap because diagnostic + // data must always land. perTenantCap == 0 disables the check. + perTenantCap int + tenantMu sync.Mutex + tenantInFlight map[string]int stopCh chan struct{} once sync.Once @@ -155,14 +165,37 @@ func NewPipeline(writer pipelineWriter, metrics *telemetry.Metrics, cfg Pipeline cfg.SoftThreshold = d.SoftThreshold } return &Pipeline{ - writer: writer, - metrics: metrics, - cfg: cfg, - queue: make(chan *Batch, cfg.Capacity), - stopCh: make(chan struct{}), + writer: writer, + metrics: metrics, + cfg: cfg, + queue: make(chan *Batch, cfg.Capacity), + tenantInFlight: make(map[string]int), + stopCh: make(chan struct{}), + } +} + +// SetPerTenantCap configures the maximum in-flight batches one tenant may +// hold in the queue (and currently being processed). 0 disables the cap. +// Once a tenant hits the cap, further healthy submissions from that tenant +// are dropped at Submit() time with reason "tenant_backpressure". Priority +// batches (errors/slow traces) bypass the cap. +// +// Sized as a fraction of Capacity, e.g. Capacity/4 keeps any single tenant +// to 25% of queue capacity. Operators tune via INGEST_PIPELINE_PER_TENANT_CAP. +// Startup-only — call before Start(). +func (p *Pipeline) SetPerTenantCap(n int) { + if n < 0 { + n = 0 } + p.perTenantCap = n } +// TenantDropped reports the cumulative number of healthy submissions +// rejected because the submitting tenant was at the per-tenant cap. +// Distinct from RejectedFull (queue at hard capacity) and +// DroppedHealthy (soft-backpressure across the whole queue). +func (p *Pipeline) TenantDropped() int64 { return p.tenantDropped.Load() } + // Start spawns the worker pool. Safe to call once. Subsequent calls are // no-ops; tests rely on this for reset semantics. func (p *Pipeline) Start(ctx context.Context) { @@ -215,18 +248,57 @@ func (p *Pipeline) Submit(b *Batch) error { return nil } + // Per-tenant cap — only enforced for healthy batches (priority bypasses, + // same as soft-backpressure). Reserve the slot under the lock so the + // counter and the channel send are coherent: if the channel is full, + // undo the reservation in the default branch below. + tenantReserved := false + if p.perTenantCap > 0 && b.Tenant != "" && !b.Priority() { + p.tenantMu.Lock() + if p.tenantInFlight[b.Tenant] >= p.perTenantCap { + p.tenantMu.Unlock() + p.tenantDropped.Add(1) + p.observeDrop(b.Type, "tenant_backpressure") + return nil + } + p.tenantInFlight[b.Tenant]++ + tenantReserved = true + p.tenantMu.Unlock() + } + select { case p.queue <- b: p.enqueuedTotal.Add(1) p.observeQueueDepth(b.Type) return nil default: + if tenantReserved { + p.releaseTenantSlot(b.Tenant) + } p.rejectedFull.Add(1) p.observeDrop(b.Type, "queue_full") return ErrQueueFull } } +// releaseTenantSlot decrements the in-flight count for a tenant, removing +// the map entry when it hits zero so the map doesn't grow unboundedly with +// short-lived tenant IDs. Safe to call with an empty tenant or when the +// cap is disabled — both no-op. +func (p *Pipeline) releaseTenantSlot(tenant string) { + if p.perTenantCap <= 0 || tenant == "" { + return + } + p.tenantMu.Lock() + n := p.tenantInFlight[tenant] - 1 + if n <= 0 { + delete(p.tenantInFlight, tenant) + } else { + p.tenantInFlight[tenant] = n + } + p.tenantMu.Unlock() +} + // Stop signals workers to exit and blocks until in-flight batches have // been drained from the channel. Idempotent. func (p *Pipeline) Stop() { @@ -302,6 +374,13 @@ func (p *Pipeline) process(b *Batch) { if b == nil { return } + // Release the per-tenant slot reserved at Submit time. Registered as + // a defer so it runs even if the batch panics. Priority batches don't + // reserve at submit, so they don't release here either — the conditions + // must mirror exactly to keep the in-flight count balanced. + if !b.Priority() { + defer p.releaseTenantSlot(b.Tenant) + } defer func() { if r := recover(); r != nil { slog.Error("ingest pipeline process panic", diff --git a/internal/ingest/pipeline_test.go b/internal/ingest/pipeline_test.go index edb30f2..3ce109a 100644 --- a/internal/ingest/pipeline_test.go +++ b/internal/ingest/pipeline_test.go @@ -407,6 +407,108 @@ func TestPipeline_DefaultsApplied(t *testing.T) { } } +func TestPipeline_PerTenantCap_DropsExcessHealthy(t *testing.T) { + // With Workers=0 and Capacity=10, queue absorbs everything; the per- + // tenant cap kicks in independently of soft-backpressure. Tenant A is + // capped at 3; the 4th healthy submission for A must be dropped while + // tenant B's submissions land normally. + p := NewPipeline(&fakeWriter{}, nil, PipelineConfig{Capacity: 10, Workers: 0, SoftThreshold: 0.9}) + p.SetPerTenantCap(3) + + mkBatch := func(tenant string) *Batch { + b := healthyBatch() + b.Tenant = tenant + return b + } + + for range 3 { + if err := p.Submit(mkBatch("a")); err != nil { + t.Fatalf("submit under cap: %v", err) + } + } + if err := p.Submit(mkBatch("a")); err != nil { + t.Fatalf("4th submit returned err %v, want nil (silent drop)", err) + } + if err := p.Submit(mkBatch("b")); err != nil { + t.Fatalf("tenant b under its cap should not be affected: %v", err) + } + + stats := p.Stats() + if stats.Enqueued != 4 { // 3 from a + 1 from b + t.Fatalf("Enqueued=%d, want 4", stats.Enqueued) + } + if got := p.TenantDropped(); got != 1 { + t.Fatalf("TenantDropped=%d, want 1", got) + } + if stats.DroppedHealthy != 0 { + t.Fatalf("DroppedHealthy=%d, want 0 (tenant cap is a separate counter)", stats.DroppedHealthy) + } +} + +func TestPipeline_PerTenantCap_PriorityBypasses(t *testing.T) { + // Errors and slow traces must always land — they bypass the per-tenant + // cap the same way they bypass soft-backpressure. + p := NewPipeline(&fakeWriter{}, nil, PipelineConfig{Capacity: 10, Workers: 0, SoftThreshold: 0.9}) + p.SetPerTenantCap(2) + + mkErr := func(tenant string) *Batch { + b := errorBatch() + b.Tenant = tenant + return b + } + + for range 5 { + if err := p.Submit(mkErr("noisy")); err != nil { + t.Fatalf("priority submit blocked by tenant cap: %v", err) + } + } + if got := p.TenantDropped(); got != 0 { + t.Fatalf("TenantDropped=%d, want 0 (priority bypasses cap)", got) + } + if got := p.Stats().Enqueued; got != 5 { + t.Fatalf("Enqueued=%d, want 5", got) + } +} + +func TestPipeline_PerTenantCap_ReleasedAfterProcess(t *testing.T) { + // Once a worker drains and processes a batch, the tenant slot is + // released so subsequent submissions from the same tenant are + // accepted. Without release, the cap would be a one-shot per-tenant + // quota for the lifetime of the process. + w := &fakeWriter{} + p := NewPipeline(w, nil, PipelineConfig{Capacity: 10, Workers: 1, SoftThreshold: 0.9}) + p.SetPerTenantCap(1) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + p.Start(ctx) + t.Cleanup(p.Stop) + + mk := func() *Batch { + b := healthyBatch() + b.Tenant = "single" + return b + } + + // First batch fills the cap. + if err := p.Submit(mk()); err != nil { + t.Fatalf("submit 1: %v", err) + } + // Wait for the worker to drain it (and release the slot). + if !waitFor(t, 2*time.Second, func() bool { return p.Stats().Processed == 1 }) { + t.Fatalf("worker did not process first batch") + } + // Second batch must succeed because the slot was released. + if err := p.Submit(mk()); err != nil { + t.Fatalf("submit 2 after release: %v", err) + } + if !waitFor(t, 2*time.Second, func() bool { return p.Stats().Processed == 2 }) { + t.Fatalf("worker did not process second batch") + } + if got := p.TenantDropped(); got != 0 { + t.Fatalf("TenantDropped=%d, want 0 (no drops when slot is released between submits)", got) + } +} + func TestPipeline_HardCapacityEvenForPriority(t *testing.T) { // Above hard capacity, priority batches are still rejected. The // caller is responsible for translating into RESOURCE_EXHAUSTED so diff --git a/internal/storage/retention.go b/internal/storage/retention.go index dc3f54f..24effe7 100644 --- a/internal/storage/retention.go +++ b/internal/storage/retention.go @@ -213,15 +213,66 @@ func (r *RetentionScheduler) runPurge(ctx context.Context) { } } + r.adaptPurgeSleep(time.Since(start)) + slog.Info("retention purge complete", "driver", driver, "duration", time.Since(start), "logs_deleted", totals["logs"], "traces_deleted", totals["traces"], "metrics_deleted", totals["metric_buckets"], + "next_batch_sleep", r.purgeBatchSleep, ) } +// adaptPurgeSleepCap and friends bracket the inter-batch sleep window. The +// adaptive controller doubles the current sleep when a pass takes more than +// `adaptSlowFraction` of the configured purgeInterval (signal: DB is hot or +// volume spiked); halves it when a pass finishes in under `adaptFastFraction` +// (signal: there's headroom; tighten the loop so retention doesn't fall +// behind ingest). Bounds keep the controller from oscillating to extreme +// values where it would either stall (too much sleep) or starve readers +// (too little). +const ( + adaptPurgeSleepCap = 100 * time.Millisecond + adaptPurgeSleepFloor = 1 * time.Millisecond + adaptSlowFraction = 0.50 + adaptFastFraction = 0.10 +) + +// adaptPurgeSleep tunes purgeBatchSleep based on the previous pass's wall +// time relative to purgeInterval. Single-writer (the retention loop), so no +// synchronization is needed; the purge methods read the value once at the +// call boundary. +func (r *RetentionScheduler) adaptPurgeSleep(elapsed time.Duration) { + if r.purgeInterval <= 0 { + return + } + pct := float64(elapsed) / float64(r.purgeInterval) + switch { + case pct > adaptSlowFraction && r.purgeBatchSleep < adaptPurgeSleepCap: + newSleep := r.purgeBatchSleep * 2 + if newSleep < adaptPurgeSleepFloor { + newSleep = adaptPurgeSleepFloor + } + if newSleep > adaptPurgeSleepCap { + newSleep = adaptPurgeSleepCap + } + slog.Info("retention: pass slow, increasing inter-batch sleep", + "elapsed", elapsed, + "old_sleep", r.purgeBatchSleep, + "new_sleep", newSleep, + ) + r.purgeBatchSleep = newSleep + case pct < adaptFastFraction && r.purgeBatchSleep > adaptPurgeSleepFloor: + newSleep := r.purgeBatchSleep / 2 + if newSleep < adaptPurgeSleepFloor { + newSleep = adaptPurgeSleepFloor + } + r.purgeBatchSleep = newSleep + } +} + // runPurgeSerial is the SQLite path: running the three purges concurrently buys // nothing because the driver holds a single writer lock, so we serialize them // to keep the "running" gauge accurate and avoid goroutine launch cost. @@ -267,6 +318,8 @@ func (r *RetentionScheduler) runPurgeSerial(ctx context.Context, cutoff time.Tim } } + r.adaptPurgeSleep(time.Since(start)) + slog.Info("retention purge complete", "driver", driver, "cutoff", cutoff.Format(time.RFC3339), @@ -274,6 +327,7 @@ func (r *RetentionScheduler) runPurgeSerial(ctx context.Context, cutoff time.Tim "traces_deleted", traces, "metrics_deleted", metricsPurged, "duration", time.Since(start), + "next_batch_sleep", r.purgeBatchSleep, ) } diff --git a/main.go b/main.go index de2956c..92e0f6c 100644 --- a/main.go +++ b/main.go @@ -452,17 +452,39 @@ func main() { Capacity: cfg.IngestPipelineQueueSize, Workers: cfg.IngestPipelineWorkers, }) + ingestPipeline.SetPerTenantCap(cfg.IngestPipelinePerTenantCap) ingestPipeline.Start(context.Background()) traceServer.SetPipeline(ingestPipeline) logsServer.SetPipeline(ingestPipeline) slog.Info("🌊 Async ingest pipeline enabled", "queue_size", cfg.IngestPipelineQueueSize, "workers", cfg.IngestPipelineWorkers, + "per_tenant_cap", cfg.IngestPipelinePerTenantCap, ) } else { slog.Warn("🐌 Async ingest pipeline disabled (INGEST_ASYNC_ENABLED=false) — Export() blocks on DB writes") } + // Wire /ready saturation probes. Both probes are nil-tolerant on the + // api server side; we additionally guard against unconfigured caps + // (DLQ unbounded, async pipeline disabled) by returning 0 — i.e. + // "skipped" semantics — rather than dividing by zero. + if dlq != nil && cfg.DLQMaxDiskMB > 0 { + maxBytes := float64(cfg.DLQMaxDiskMB) * 1024 * 1024 + apiServer.SetDLQSaturationProbe(func() float64 { + return float64(dlq.DiskBytes()) / maxBytes + }) + } + if ingestPipeline != nil { + apiServer.SetPipelineSaturationProbe(func() float64 { + st := ingestPipeline.Stats() + if st.Capacity == 0 { + return 0 + } + return float64(st.QueueDepth) / float64(st.Capacity) + }) + } + // Wire up live log streaming + AI + DLQ metrics logHandler := func(l storage.Log) { start := time.Now() From d1c750a05dab86d0c2278428c369a90f78c04ee9 Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Tue, 28 Apr 2026 05:35:33 +0000 Subject: [PATCH 2/4] fix(robustness): clamp pipeline capacity/workers (CodeQL) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CodeQL flagged make(chan *Batch, cfg.Capacity) at pipeline.go:171 as go/uncontrolled-allocation-size — cfg.Capacity flows from the INGEST_PIPELINE_QUEUE_SIZE env var without an upper bound, so a typo like 10_000_000_000 would OOM the process at startup. Add defensive caps in NewPipeline (maxPipelineCapacity = 1M, maxPipelineWorkers = 256). Clamping at the constructor satisfies the CodeQL taint-tracking and is the right defense regardless — both ceilings are well above any reasonable deployment. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/ingest/pipeline.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/internal/ingest/pipeline.go b/internal/ingest/pipeline.go index 02c4b15..4952e9a 100644 --- a/internal/ingest/pipeline.go +++ b/internal/ingest/pipeline.go @@ -78,6 +78,17 @@ type PipelineConfig struct { SoftThreshold float64 // fullness fraction above which healthy batches are dropped (0.0–1.0) } +// Defensive upper bounds on operator-supplied capacity/workers. Env-var +// inputs go directly into a make(chan ...) and into goroutine launches; +// without a sanity cap a typo like INGEST_PIPELINE_QUEUE_SIZE=10_000_000_000 +// would OOM the process. These caps are well above any reasonable +// production deployment (50k is the default queue, 8 the default workers) +// while still keeping the allocation finite. +const ( + maxPipelineCapacity = 1_000_000 + maxPipelineWorkers = 256 +) + // DefaultPipelineConfig returns production-sized defaults. func DefaultPipelineConfig() PipelineConfig { return PipelineConfig{ @@ -157,6 +168,25 @@ func NewPipeline(writer pipelineWriter, metrics *telemetry.Metrics, cfg Pipeline if cfg.Workers <= 0 { cfg.Workers = d.Workers } + // Defensive sanity caps on operator-supplied values — keep them out of + // make()/goroutine-launch as raw env-var pass-through. CodeQL flagged + // the channel allocation as uncontrolled-allocation-size; clamping at + // the constructor satisfies the taint-tracking and prevents an OOM + // from a misconfigured env var. + if cfg.Capacity > maxPipelineCapacity { + slog.Warn("ingest pipeline: capacity clamped to defensive ceiling", + "requested", cfg.Capacity, + "max", maxPipelineCapacity, + ) + cfg.Capacity = maxPipelineCapacity + } + if cfg.Workers > maxPipelineWorkers { + slog.Warn("ingest pipeline: workers clamped to defensive ceiling", + "requested", cfg.Workers, + "max", maxPipelineWorkers, + ) + cfg.Workers = maxPipelineWorkers + } // Zero-value config falls back to defaults — the field is internal // (no env-var surface) and TestPipeline_DefaultsApplied enforces this. // Priority-only mode (always-soft-drop) is not a supported configuration From 242c44a51e588fcb2123ac86b6f2d5915fb25267 Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Tue, 28 Apr 2026 05:38:56 +0000 Subject: [PATCH 3/4] fix(robustness): use min() sanitizer for CodeQL allocation-size In-place reassignment of cfg.Capacity/Workers wasn't recognized by CodeQL as a sanitizer for go/uncontrolled-allocation-size. Reroute through local variables clamped via min(); make() consumes the local, which CodeQL's taint-tracking accepts as bounded. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/ingest/pipeline.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/internal/ingest/pipeline.go b/internal/ingest/pipeline.go index 4952e9a..8bcd9a1 100644 --- a/internal/ingest/pipeline.go +++ b/internal/ingest/pipeline.go @@ -168,25 +168,29 @@ func NewPipeline(writer pipelineWriter, metrics *telemetry.Metrics, cfg Pipeline if cfg.Workers <= 0 { cfg.Workers = d.Workers } - // Defensive sanity caps on operator-supplied values — keep them out of - // make()/goroutine-launch as raw env-var pass-through. CodeQL flagged - // the channel allocation as uncontrolled-allocation-size; clamping at - // the constructor satisfies the taint-tracking and prevents an OOM - // from a misconfigured env var. - if cfg.Capacity > maxPipelineCapacity { + // Sanitize operator-supplied capacity/workers into local variables BEFORE + // the make()/Workers loop. CodeQL's taint-tracking treats env-var-derived + // values as untrusted source for go/uncontrolled-allocation-size; an + // in-place reassignment of the field doesn't satisfy the sanitizer + // pattern, but a local clamp via min() does. Both ceilings are well above + // any reasonable deployment (50k default queue, 8 default workers) but + // keep the allocation bounded against a misconfigured env var. + capacity := min(cfg.Capacity, maxPipelineCapacity) + workers := min(cfg.Workers, maxPipelineWorkers) + if capacity != cfg.Capacity { slog.Warn("ingest pipeline: capacity clamped to defensive ceiling", "requested", cfg.Capacity, "max", maxPipelineCapacity, ) - cfg.Capacity = maxPipelineCapacity } - if cfg.Workers > maxPipelineWorkers { + if workers != cfg.Workers { slog.Warn("ingest pipeline: workers clamped to defensive ceiling", "requested", cfg.Workers, "max", maxPipelineWorkers, ) - cfg.Workers = maxPipelineWorkers } + cfg.Capacity = capacity + cfg.Workers = workers // Zero-value config falls back to defaults — the field is internal // (no env-var surface) and TestPipeline_DefaultsApplied enforces this. // Priority-only mode (always-soft-drop) is not a supported configuration @@ -198,7 +202,7 @@ func NewPipeline(writer pipelineWriter, metrics *telemetry.Metrics, cfg Pipeline writer: writer, metrics: metrics, cfg: cfg, - queue: make(chan *Batch, cfg.Capacity), + queue: make(chan *Batch, capacity), tenantInFlight: make(map[string]int), stopCh: make(chan struct{}), } From f80e323371dd43c7f6951ed011f48de043185dcf Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Tue, 28 Apr 2026 05:45:26 +0000 Subject: [PATCH 4/4] fix(robustness): replace min() with explicit comparison guard for CodeQL CodeQL's BoundedFlowSource taint analysis recognizes explicit if/else comparison guards as sanitizers but not the min() builtin (Go 1.21+). Switching to `if x > MAX { x = MAX }` is the canonical pattern from go/uncontrolled-allocation-size docs and should clear the alert without changing runtime behavior. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/ingest/pipeline.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/internal/ingest/pipeline.go b/internal/ingest/pipeline.go index 8bcd9a1..b28ca83 100644 --- a/internal/ingest/pipeline.go +++ b/internal/ingest/pipeline.go @@ -168,26 +168,27 @@ func NewPipeline(writer pipelineWriter, metrics *telemetry.Metrics, cfg Pipeline if cfg.Workers <= 0 { cfg.Workers = d.Workers } - // Sanitize operator-supplied capacity/workers into local variables BEFORE - // the make()/Workers loop. CodeQL's taint-tracking treats env-var-derived - // values as untrusted source for go/uncontrolled-allocation-size; an - // in-place reassignment of the field doesn't satisfy the sanitizer - // pattern, but a local clamp via min() does. Both ceilings are well above + // Sanitize operator-supplied capacity/workers BEFORE the make()/Workers + // loop. CodeQL's taint-tracking treats env-var-derived values as untrusted + // for go/uncontrolled-allocation-size; only an explicit comparison guard + // is recognized as a BarrierGuard sanitizer. Both ceilings are well above // any reasonable deployment (50k default queue, 8 default workers) but // keep the allocation bounded against a misconfigured env var. - capacity := min(cfg.Capacity, maxPipelineCapacity) - workers := min(cfg.Workers, maxPipelineWorkers) - if capacity != cfg.Capacity { + capacity := cfg.Capacity + if capacity > maxPipelineCapacity { slog.Warn("ingest pipeline: capacity clamped to defensive ceiling", - "requested", cfg.Capacity, + "requested", capacity, "max", maxPipelineCapacity, ) + capacity = maxPipelineCapacity } - if workers != cfg.Workers { + workers := cfg.Workers + if workers > maxPipelineWorkers { slog.Warn("ingest pipeline: workers clamped to defensive ceiling", - "requested", cfg.Workers, + "requested", workers, "max", maxPipelineWorkers, ) + workers = maxPipelineWorkers } cfg.Capacity = capacity cfg.Workers = workers