Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions internal/api/health_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,17 @@
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
)

// readySaturationThreshold is the fullness fraction at which a saturation
// probe (DLQ disk, ingest pipeline) flips /ready to 503. Set high enough
// that brief spikes don't cause restart loops, low enough that orchestrators
// stop sending traffic before the system fails outright.
const readySaturationThreshold = 0.95

// handleLive is a Kubernetes-style liveness probe.
// Returns 200 OK as long as the process is up. Does not check dependencies.
func (s *Server) handleLive(w http.ResponseWriter, r *http.Request) {
Expand All @@ -19,7 +26,7 @@
// Returns 200 only if the service can serve traffic: DB ping succeeds and
// the GraphRAG coordinator is running. Returns 503 with a per-check breakdown
// on failure.
func (s *Server) handleReady(w http.ResponseWriter, r *http.Request) {

Check failure on line 29 in internal/api/health_handlers.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 20 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=RandomCodeSpace_otelcontext&issues=AZ3SkStR--0k5ZV_4JG9&open=AZ3SkStR--0k5ZV_4JG9&pullRequest=61
checks := map[string]string{
"database": "ok",
"graphrag": "ok",
Expand Down Expand Up @@ -55,6 +62,31 @@
ready = false
}

// Saturation probes — flip to 503 when downstream buffers are full so
// orchestrators (k8s, load balancers) stop routing fresh traffic before
// the pipeline starts hard-rejecting (gRPC RESOURCE_EXHAUSTED / HTTP 429)
// or DLQ starts FIFO-evicting unflushed batches.
if s.dlqSaturation != nil {
if sat := s.dlqSaturation(); sat >= readySaturationThreshold {
checks["dlq_disk"] = fmt.Sprintf("saturated %.0f%%", sat*100)
ready = false
} else {
checks["dlq_disk"] = "ok"
}
} else {
checks["dlq_disk"] = "skipped"
}
if s.pipelineSaturation != nil {
if sat := s.pipelineSaturation(); sat >= readySaturationThreshold {
checks["pipeline"] = fmt.Sprintf("saturated %.0f%%", sat*100)
ready = false
} else {
checks["pipeline"] = "ok"
}
} else {
checks["pipeline"] = "skipped"
}

status := http.StatusOK
if !ready {
status = http.StatusServiceUnavailable
Expand Down
22 changes: 22 additions & 0 deletions internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ type Server struct {
graph *graph.Graph // in-memory service dependency graph (may be nil before first build)
graphRAG *graphrag.GraphRAG // layered GraphRAG for advanced queries
vectorIdx *vectordb.Index // TF-IDF semantic log search index

// Saturation probes consulted by /ready. Each returns a fullness
// fraction in [0.0, 1.0]; nil disables the corresponding check.
// Decoupling via callbacks keeps the api package free of queue/ingest
// imports and lets tests inject deterministic values.
dlqSaturation func() float64
pipelineSaturation func() float64
}

// NewServer creates a new API server.
Expand Down Expand Up @@ -51,6 +58,21 @@ func (s *Server) SetVectorIndex(idx *vectordb.Index) {
s.vectorIdx = idx
}

// SetDLQSaturationProbe registers a callback returning DLQ disk fullness as
// a fraction in [0.0, 1.0]. Used by /ready to flip to 503 when DLQ is at
// risk of FIFO-evicting unflushed batches. Pass nil to disable the check.
func (s *Server) SetDLQSaturationProbe(fn func() float64) {
s.dlqSaturation = fn
}

// SetPipelineSaturationProbe registers a callback returning ingest pipeline
// queue fullness as a fraction in [0.0, 1.0]. Used by /ready to flip to 503
// when the pipeline is at hard capacity (already returning 429/RESOURCE_EXHAUSTED
// to clients). Pass nil to disable the check.
func (s *Server) SetPipelineSaturationProbe(fn func() float64) {
s.pipelineSaturation = fn
}

// RegisterRoutes registers API endpoints on the provided mux.
func (s *Server) RegisterRoutes(mux *http.ServeMux) {
// Metadata & Discovery
Expand Down
15 changes: 12 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ type Config struct {
IngestAsyncEnabled bool // default true; opt out via INGEST_ASYNC_ENABLED=false
IngestPipelineQueueSize int // default 50000 batches; per-deployment tunable
IngestPipelineWorkers int // default 8 worker goroutines
// IngestPipelinePerTenantCap caps in-flight batches per tenant so a noisy
// tenant cannot starve siblings of fresh queue slots when fullness is
// below the soft-backpressure threshold. 0 (default) disables — single-
// tenant deployments need no cap. Operators on multi-tenant deployments
// should set INGEST_PIPELINE_PER_TENANT_CAP to roughly Capacity/N where
// N is the expected number of concurrently-active tenants, with some
// headroom (e.g. 2× the fair-share value) for short bursts.
IngestPipelinePerTenantCap int

// TLS (HTTP + gRPC). When both paths are set, TLS is enabled on both servers.
// Empty values (default) keep plaintext behavior.
Expand Down Expand Up @@ -260,9 +268,10 @@ func Load(customPath string) (*Config, error) {
GraphRAGEventQueueSize: getEnvInt("GRAPHRAG_EVENT_QUEUE_SIZE", 100000),

// Async ingest pipeline
IngestAsyncEnabled: getEnvBool("INGEST_ASYNC_ENABLED", true),
IngestPipelineQueueSize: getEnvInt("INGEST_PIPELINE_QUEUE_SIZE", 50000),
IngestPipelineWorkers: getEnvInt("INGEST_PIPELINE_WORKERS", 8),
IngestAsyncEnabled: getEnvBool("INGEST_ASYNC_ENABLED", true),
IngestPipelineQueueSize: getEnvInt("INGEST_PIPELINE_QUEUE_SIZE", 50000),
IngestPipelineWorkers: getEnvInt("INGEST_PIPELINE_WORKERS", 8),
IngestPipelinePerTenantCap: getEnvInt("INGEST_PIPELINE_PER_TENANT_CAP", 0),

// TLS
TLSCertFile: getEnv("TLS_CERT_FILE", ""),
Expand Down
32 changes: 31 additions & 1 deletion internal/graphrag/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,16 @@ func (g *GraphRAG) takeSnapshotForTenant(_ context.Context, tenant string, store
)
}

// pruneOldSnapshots removes snapshots older than 7 days.
// maxSnapshotRows is a row-count backstop on `graph_snapshots` to prevent
// unbounded disk growth when the write rate outruns the 7-day age prune.
// Steady state at 15-min cadence × 100 tenants is ~67k rows/week, so 100k
// gives ~50% headroom — high enough to never trigger under normal operation,
// low enough to bound disk if a misconfig or tenant explosion runs the
// snapshotter hot.
const maxSnapshotRows = 100_000

// pruneOldSnapshots removes snapshots older than 7 days, then enforces a
// row-count backstop in case the by-age prune isn't keeping up.
func (g *GraphRAG) pruneOldSnapshots() {
if g.repo == nil || g.repo.DB() == nil {
return
Expand All @@ -154,6 +163,27 @@ func (g *GraphRAG) pruneOldSnapshots() {
} else if result.RowsAffected > 0 {
slog.Info("Pruned old graph snapshots", "count", result.RowsAffected)
}

var count int64
if err := g.repo.DB().Model(&GraphSnapshot{}).Count(&count).Error; err != nil {
slog.Error("Failed to count snapshots for row-cap prune", "error", err)
return
}
if count <= maxSnapshotRows {
return
}
excess := count - maxSnapshotRows
// Subquery selects the N oldest IDs, then deletes that set. Portable
// across SQLite and Postgres; avoids a multi-statement transaction.
sub := g.repo.DB().Model(&GraphSnapshot{}).Select("id").Order("created_at ASC").Limit(int(excess))
if err := g.repo.DB().Where("id IN (?)", sub).Delete(&GraphSnapshot{}).Error; err != nil {
slog.Error("Failed to row-cap prune snapshots", "error", err)
return
}
slog.Warn("graphrag: row-cap pruned snapshots (write rate exceeded by-age prune)",
"deleted", excess,
"cap", maxSnapshotRows,
)
}

// GetGraphSnapshot retrieves the snapshot closest to the requested time,
Expand Down
124 changes: 119 additions & 5 deletions internal/ingest/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@
SoftThreshold float64 // fullness fraction above which healthy batches are dropped (0.0–1.0)
}

// Defensive upper bounds on operator-supplied capacity/workers. Env-var
// inputs go directly into a make(chan ...) and into goroutine launches;
// without a sanity cap a typo like INGEST_PIPELINE_QUEUE_SIZE=10_000_000_000
// would OOM the process. These caps are well above any reasonable
// production deployment (50k is the default queue, 8 the default workers)
// while still keeping the allocation finite.
const (
maxPipelineCapacity = 1_000_000
maxPipelineWorkers = 256
)

// DefaultPipelineConfig returns production-sized defaults.
func DefaultPipelineConfig() PipelineConfig {
return PipelineConfig{
Expand Down Expand Up @@ -130,6 +141,16 @@
droppedHealthy atomic.Int64
rejectedFull atomic.Int64
processFailures atomic.Int64
tenantDropped atomic.Int64

// Per-tenant in-flight cap — bounds the queue slots a single tenant
// can consume so a noisy tenant cannot starve siblings of fresh
// healthy submissions when fullness is below the soft threshold.
// Priority batches (errors/slow) bypass the cap because diagnostic
// data must always land. perTenantCap == 0 disables the check.
perTenantCap int
tenantMu sync.Mutex
tenantInFlight map[string]int

stopCh chan struct{}
once sync.Once
Expand All @@ -147,6 +168,30 @@
if cfg.Workers <= 0 {
cfg.Workers = d.Workers
}
// Sanitize operator-supplied capacity/workers BEFORE the make()/Workers
// loop. CodeQL's taint-tracking treats env-var-derived values as untrusted
// for go/uncontrolled-allocation-size; only an explicit comparison guard
// is recognized as a BarrierGuard sanitizer. Both ceilings are well above
// any reasonable deployment (50k default queue, 8 default workers) but
// keep the allocation bounded against a misconfigured env var.
capacity := cfg.Capacity
if capacity > maxPipelineCapacity {
slog.Warn("ingest pipeline: capacity clamped to defensive ceiling",
"requested", capacity,
"max", maxPipelineCapacity,
)
capacity = maxPipelineCapacity
}
workers := cfg.Workers
if workers > maxPipelineWorkers {
slog.Warn("ingest pipeline: workers clamped to defensive ceiling",
"requested", workers,
"max", maxPipelineWorkers,
)
workers = maxPipelineWorkers
}
cfg.Capacity = capacity
cfg.Workers = workers
// Zero-value config falls back to defaults — the field is internal
// (no env-var surface) and TestPipeline_DefaultsApplied enforces this.
// Priority-only mode (always-soft-drop) is not a supported configuration
Expand All @@ -155,14 +200,37 @@
cfg.SoftThreshold = d.SoftThreshold
}
return &Pipeline{
writer: writer,
metrics: metrics,
cfg: cfg,
queue: make(chan *Batch, cfg.Capacity),
stopCh: make(chan struct{}),
writer: writer,
metrics: metrics,
cfg: cfg,
queue: make(chan *Batch, capacity),

Check failure

Code scanning / CodeQL

Slice memory allocation with excessive size value High

This memory allocation depends on a
user-provided value
.
Comment thread
aksOps marked this conversation as resolved.
Dismissed
tenantInFlight: make(map[string]int),
stopCh: make(chan struct{}),
}
}

// SetPerTenantCap configures the maximum in-flight batches one tenant may
// hold in the queue (and currently being processed). 0 disables the cap.
// Once a tenant hits the cap, further healthy submissions from that tenant
// are dropped at Submit() time with reason "tenant_backpressure". Priority
// batches (errors/slow traces) bypass the cap.
//
// Sized as a fraction of Capacity, e.g. Capacity/4 keeps any single tenant
// to 25% of queue capacity. Operators tune via INGEST_PIPELINE_PER_TENANT_CAP.
// Startup-only — call before Start().
func (p *Pipeline) SetPerTenantCap(n int) {
if n < 0 {
n = 0
}
p.perTenantCap = n
}

// TenantDropped reports the cumulative number of healthy submissions
// rejected because the submitting tenant was at the per-tenant cap.
// Distinct from RejectedFull (queue at hard capacity) and
// DroppedHealthy (soft-backpressure across the whole queue).
func (p *Pipeline) TenantDropped() int64 { return p.tenantDropped.Load() }

// Start spawns the worker pool. Safe to call once. Subsequent calls are
// no-ops; tests rely on this for reset semantics.
func (p *Pipeline) Start(ctx context.Context) {
Expand Down Expand Up @@ -215,18 +283,57 @@
return nil
}

// Per-tenant cap — only enforced for healthy batches (priority bypasses,
// same as soft-backpressure). Reserve the slot under the lock so the
// counter and the channel send are coherent: if the channel is full,
// undo the reservation in the default branch below.
tenantReserved := false
if p.perTenantCap > 0 && b.Tenant != "" && !b.Priority() {
p.tenantMu.Lock()
if p.tenantInFlight[b.Tenant] >= p.perTenantCap {
p.tenantMu.Unlock()
p.tenantDropped.Add(1)
p.observeDrop(b.Type, "tenant_backpressure")
return nil
}
p.tenantInFlight[b.Tenant]++
tenantReserved = true
p.tenantMu.Unlock()
}

select {
case p.queue <- b:
p.enqueuedTotal.Add(1)
p.observeQueueDepth(b.Type)
return nil
default:
if tenantReserved {
p.releaseTenantSlot(b.Tenant)
}
p.rejectedFull.Add(1)
p.observeDrop(b.Type, "queue_full")
return ErrQueueFull
}
}

// releaseTenantSlot decrements the in-flight count for a tenant, removing
// the map entry when it hits zero so the map doesn't grow unboundedly with
// short-lived tenant IDs. Safe to call with an empty tenant or when the
// cap is disabled — both no-op.
func (p *Pipeline) releaseTenantSlot(tenant string) {
if p.perTenantCap <= 0 || tenant == "" {
return
}
p.tenantMu.Lock()
n := p.tenantInFlight[tenant] - 1
if n <= 0 {
delete(p.tenantInFlight, tenant)
} else {
p.tenantInFlight[tenant] = n
}
p.tenantMu.Unlock()
}

// Stop signals workers to exit and blocks until in-flight batches have
// been drained from the channel. Idempotent.
func (p *Pipeline) Stop() {
Expand Down Expand Up @@ -302,6 +409,13 @@
if b == nil {
return
}
// Release the per-tenant slot reserved at Submit time. Registered as
// a defer so it runs even if the batch panics. Priority batches don't
// reserve at submit, so they don't release here either — the conditions
// must mirror exactly to keep the in-flight count balanced.
if !b.Priority() {
defer p.releaseTenantSlot(b.Tenant)
}
defer func() {
if r := recover(); r != nil {
slog.Error("ingest pipeline process panic",
Expand Down
Loading
Loading