diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..b7a48b1 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,7 @@ + +# Memory Context + +# [otelcontext] recent context, 2026-04-28 1:14am UTC + +No previous sessions found. + \ No newline at end of file diff --git a/docs/review/2026-04-27-7pr-robustness-review.md b/docs/review/2026-04-27-7pr-robustness-review.md new file mode 100644 index 0000000..84e2b5e --- /dev/null +++ b/docs/review/2026-04-27-7pr-robustness-review.md @@ -0,0 +1,119 @@ +# Code Review — 7-PR Robustness Initiative (2026-04-27) + +Scope: acf904d, 17f70dc, cf9c1f5, 96ec26e, 436201e, fdf2433, 050525e + +## Critical + +### C1. MCP `break` inside `select { default: }` does not escape outer switch +File: `/home/dev/projects/otelcontext/internal/mcp/server.go:283` +- `break` inside the `default:` case of the inner `select { case s.callSlots <- struct{}{}: default: ... break }` only breaks the `select`, not the outer `case "tools/call":`. This is a classic Go pitfall and the well-known reason the author added the follow-up guard at line 286-289 (`if rpcErr != nil { break }`). However, the comment at line 286 ("rpcErr was set inside the select-default") asserts intent the language does not provide; the construct only works because of the second guard. Even so, between line 285 and line 286 nothing actually depends on it - functionally correct - but: the comment "skip the call" misleadingly implies the inner `break` did the skipping. If anyone refactors out the second `if rpcErr != nil { break }` thinking it's redundant, semaphore acquisition will silently bypass the overload error and proceed to the call without ever filling a slot (because the `select` already returned). Recommend either: drop the inner `break` (it is dead) and reword the comment, or replace the construct with a labeled break. +- Severity escalation: low-likelihood-of-trip but high-impact-when-tripped, and the dead `break` actively misdirects readers about how the control flow works. +- Suggested fix: remove the inner `break` keyword (it is unreachable-effective code), and rewrite the comment as `// rpcErr was set in the inner select default; skip dispatch.` + +## High + +### H1. MCP semaphore is leaked when the call times out +File: `/home/dev/projects/otelcontext/internal/mcp/server.go:294-298` +- On timeout (`runWithTimeout` returns with `timedOut=true`), the handler does `<-s.callSlots` at line 296 to release the slot, but the goroutine spawned inside `runWithTimeout` (server.go:428) is still alive doing the actual tool work. The `s.inFlight.Add(-1)` at line 298 also under-counts: the goroutine continues to occupy DB / GraphRAG resources but the in-flight gauge says zero. Worse - the slot is freed for new admissions while the timed-out handler is still consuming connection-pool capacity, defeating the purpose of the semaphore as a backpressure control. Under sustained timeouts you can easily get N times the configured concurrency in flight. Author commented on this at line 421 ("its goroutine still runs to completion in the background") but mitigation is missing. +- Suggested fix: hold the slot until the goroutine *actually* finishes - e.g. let the goroutine itself release the slot via `defer` once `done` is sent, and use a non-blocking send on a closed `done` so the timeout path doesn't deadlock. Or, wrap the toolHandler so it must respect ctx before it touches the DB connection. + +### H2. FTS5 `external content` triggers vs PurgeLogsBatched chunked DELETE - works only because triggers fire per-row +File: `/home/dev/projects/otelcontext/internal/storage/fts5.go:60-62`, `/home/dev/projects/otelcontext/internal/storage/log_repo.go:273` +- The DELETE trigger uses `INSERT INTO logs_fts(logs_fts, rowid, body, service_name) VALUES('delete', old.id, old.body, old.service_name)`. SQLite fires AFTER DELETE triggers per-row, so `DELETE FROM logs WHERE id IN (SELECT id ...)` chunks of 50,000 rows will execute 50,000 trigger inserts under one statement - within a busy DB this materially extends the DELETE window. Plan claimed "kept in sync via AFTER INSERT/DELETE/UPDATE triggers" - true, but the trigger DDL stores `body` and `service_name` in the operation, which is correct for an `external-content` table but means each DELETE statement effectively does 2 writes (logs + fts5) and the WAL grows accordingly. No test exercises a 25k-row purge with FTS triggers loaded, so the perf characteristic is unmeasured. +- Not a correctness bug. Recommend: add an integration-style test that purges >10k rows with FTS5 enabled and asserts both `logs` and `logs_fts` are empty afterward, then watch the duration metric. If it regresses retention SLO, switch to a transaction-bracketed bulk delete or use FTS5 contentless-delete idiom. + +### H3. `getLogsV2LikeFallback` silently masks any DB error including non-FTS5 errors +File: `/home/dev/projects/otelcontext/internal/storage/log_repo.go:128-135` +- `g.Wait()` can fail for many reasons unrelated to FTS5: connection-pool exhaustion, query-cancelled, OOM, syntax in a future filter. The fallback runs `r.getLogsV2LikeFallback(...)` without inspecting `err`. So an outage that affects both paths returns whatever the fallback returns - which in the "DB really is down" case is also an error, but in the "table renamed during migration" case might return empty results without the operator ever seeing the FTS5 error in logs. CLAUDE.md root rule: "fix root causes, not paper over with silent fallbacks". +- Suggested fix: log the FTS5 error before falling back (`slog.Warn("FTS5 query failed, falling back to LIKE", "err", err)`), and only fall back on a small allow-list of error classes (`SQLITE_ERROR` with "no such table", malformed-MATCH errors). Same applies to `searchLogsFTS5` at `repository.go:310-315`. + +### H4. PartitionScheduler.Stop() races with itself; `done` channel set in the wrong place +File: `/home/dev/projects/otelcontext/internal/storage/partitions_scheduler.go:54,89-103,106` +- `done` is created once at construction (line 54) but the scheduler `loop` calls `defer close(s.done)` at line 106. If `Stop()` is called twice, both calls will see `started.Load() == true`, both will read the same `done` channel; the second `<-done` reads a closed channel (immediate return - safe). However, `s.cancel = cancel` at line 76 is mutated under `mu` only on Start; Stop reads it under `mu` (line 94), then calls `cancel()` outside the lock, which is fine. But Start does NOT reset `s.done` if called after a Stop, and `started.Store(true)` is never reset to false in Stop - so a Start/Stop/Start sequence will see `started` already true and skip work, while a fresh `done` channel never gets created and the loop never starts again. Lifecycle is one-shot in practice; not documented as such. +- Suggested fix: document one-shot behavior on the type, or reset `started`, `done`, and `cancel` at the end of `Stop()` after the goroutine has exited. Tests should cover Start-Stop-Start. + +### H5. Pipeline `SoftThreshold` floor disallows valid 0.0 (always-soft) +File: `/home/dev/projects/otelcontext/internal/ingest/pipeline.go:141` +- `if cfg.SoftThreshold <= 0 || cfg.SoftThreshold >= 1.0 { cfg.SoftThreshold = d.SoftThreshold }`. The treatment of zero as "use default" silently replaces a deliberate operator choice ("always drop healthy batches when not strictly empty") with the 0.9 default. More problematic: `SoftThreshold = 0` is a sane configuration to express "in degraded mode, only ever ship priority batches"; the code masks this. CLAUDE.md: "Don't add error handling for scenarios that can't happen" - here a real value is being clobbered. +- Suggested fix: change to `if cfg.SoftThreshold < 0 || cfg.SoftThreshold > 1.0 { cfg.SoftThreshold = d.SoftThreshold }` and explicitly allow 0 and 1. + +## Medium + +### M1. TSDB `seriesPerTenant` decrement on flush-reset is unauthenticated +File: `/home/dev/projects/otelcontext/internal/tsdb/aggregator.go:281` +- `seriesPerTenant = make(map[string]int)` resets cardinality per flush window, which is the right knob to keep the budget per-window. However, plan said "seriesPerTenant counts unique (non-overflow) bucket keys per tenant and is reset by flush()". On a flush() that fails to persist, the counters reset but the buckets may be retained on the failure path - so the next window briefly under-counts. Also, the per-tenant cap check at line 178 reads `seriesPerTenant[m.TenantID]` - this is checked under `a.mu`, fine. But `cardinalityOverflow` callback is invoked inside the same lock window (line 184, 205) which makes Prometheus counter increments lock-contending under high overflow. `Inc()` on a CounterVec is internally locked; nesting under `a.mu` is benign but worth knowing. +- Suggested fix: capture the tenantID inside the lock, then call the callback after `mu.Unlock()`. Or use `sync.RWMutex` and run the cardinality check under RLock + a separate atomic for the count. + +### M2. Pipeline `Submit` has TOCTOU between fullness sampling and channel send +File: `/home/dev/projects/otelcontext/internal/ingest/pipeline.go:198-213` +- `fullness := float64(len(p.queue)) / float64(p.cfg.Capacity)` at line 198 is racy with workers draining concurrently: the queue can drop from 95% to 50% between the read and the `select`, and a healthy batch will be dropped despite room available. Conversely workers can fall behind between read and `select` and a priority batch sees `default` and returns ErrQueueFull. The race is benign for the soft-drop direction (rare drop of a healthy batch), and for hard-drop the channel `default` is the actual gate so correctness holds. Worth documenting at the top of `Submit` so future maintainers don't try to "fix" the unsynchronized read. +- Suggested fix: docstring note. Don't add a lock - the design is "sample-and-decide" deliberately, and a lock would serialize all submitters. + +### M3. MCP cache key normalization is incomplete: nested maps are not key-sorted +File: `/home/dev/projects/otelcontext/internal/mcp/cache.go:87-92` +- `cacheKey` only sorts top-level argument keys and serializes each value with `json.Marshal`. Go's `json.Marshal` does sort map keys alphabetically since 1.12, so this is fine in practice for `map[string]any`. But for `[]any` containing maps (e.g. `{"filters": [{"b":1,"a":2}]}`), Go json sorts those too. So the keying is actually stable for stdlib JSON inputs - good. However, slices preserve order and a client that sends `["a","b"]` vs `["b","a"]` should hit different cache entries (correct semantically: order matters). Not a bug. +- However: argument values come from `params.Arguments map[string]any` parsed via `json.Unmarshal`, which gives `map[string]any` for objects and `[]any` for arrays. So the stable property holds. No fix. + +### M4. `isQueueFull` accepts ANY gRPC ResourceExhausted, including legitimate quota errors +File: `/home/dev/projects/otelcontext/internal/ingest/otlp_http.go:103-114` +- If a future change in `Export()` returns RESOURCE_EXHAUSTED for some reason other than the pipeline (e.g. tenant rate limit, gRPC max-recv-msg-size), HTTP returns 429 + Retry-After: 1 and the throttle metric increments. That is also the correct user-facing behavior (tell the client to back off), but the metric `otelcontext_http_otlp_throttled_total` will conflate two distinct backpressure causes. Not a bug, but operators reading the metric may be misled. +- Suggested fix: make the pipeline `ErrQueueFull` translate to a custom gRPC error with a metadata tag (e.g. `reason=pipeline-queue-full`) and let `isQueueFull` check for that tag, fall through to true for the bare RESOURCE_EXHAUSTED case but emit a different metric label. + +### M5. `pgLogsRelkind` "no rows" detection by string-match +File: `/home/dev/projects/otelcontext/internal/storage/partitions.go:248-251` +- `strings.Contains(err.Error(), "no rows")` is fragile across drivers. The pgx driver returns `sql.ErrNoRows`; GORM's `Row().Scan()` may wrap it. Use `errors.Is(err, sql.ErrNoRows)` instead. Today this works because the only Postgres driver in use is pgx and the message contains "no rows", but a driver upgrade can break greenfield detection - and the failure mode is loud (refuse to start), so a bug here means an operator's first deploy fails inscrutably. +- Suggested fix: `if errors.Is(err, sql.ErrNoRows) { return "", nil }`. + +## Low + +### L1. SSE writer is not protected against concurrent writes +File: `/home/dev/projects/otelcontext/internal/mcp/server.go:340-359` +- The SSE handler writes both heartbeat (`: keep-alive\n\n`) and notification frames into the same `http.ResponseWriter` from one goroutine via select - so single-writer is OK. Just confirming for the record. + +### L2. Pipeline `worker` shutdown is leaky on parent ctx cancel between drain ticks +File: `/home/dev/projects/otelcontext/internal/ingest/pipeline.go:254-273` +- When ctx is canceled (line 257) workers exit immediately without draining the buffered queue, dropping any in-flight batches. The Stop() path (line 261) drains; the ctx-cancel path does not. Production shutdown uses `Stop()` so this is not exercised, but tests using ctx cancel may mis-attribute drops. +- Suggested fix: either always drain on exit, or document that ctx cancel is the "abort" path. + +### L3. Partition cutoff comparison uses `!upper.After(cutoffUTC)` which is correct but easily misread +File: `/home/dev/projects/otelcontext/internal/storage/partitions.go:227` +- `!upper.After(cutoff)` is `upper <= cutoff`. The DROP triggers when the upper bound is less-than-or-equal to the cutoff. With daily partitions where upper is `day+24h`, this drops a partition exactly at the moment the cutoff sweeps past its upper. Correct, but the comment "Entire partition range ends at or before the cutoff" reads as `<` not `<=`. Make it `if upper.Before(cutoff) || upper.Equal(cutoff)` for legibility. + +### L4. `defaultCacheTTL` is referenced but I did not see it defined in the snippet +File: `/home/dev/projects/otelcontext/internal/mcp/server.go` (line ~40) +- Visible in the const block in earlier slice; not a finding, just confirming the const exists. + +## Plan Alignment Summary + +| Phase | Plan claim | Verdict | +|---|---|---| +| 0 | Workers 4->16, queue 10k->100k | Implemented; verified in CLAUDE.md and config.go | +| 1 | Hybrid backpressure: <90% accept, 90-100% drop healthy, 100% RESOURCE_EXHAUSTED | Implemented; race noted (M2) | +| 2 | Per-tenant cap checked first | Implemented at aggregator.go:178 (overTenantCap before overGlobalCap) | +| 3a | FTS5 + BM25 + triggers | Implemented; perf char unmeasured (H2) | +| 3b/5 | Greenfield-only partitioning, refuse pre-existing unpartitioned logs | Implemented at partitions.go:99-100 | +| 4 | HTTP 429 + Retry-After parity | Implemented; metric semantics noted (M4) | +| 6 | Concurrency cap, timeout, cache, SSE keep-alive | Implemented; semaphore leak on timeout (H1) | + +## Backwards-Compatibility Audit + +- `INGEST_ASYNC_ENABLED=true` is the new default (ingestion path changes). Plan flagged this. Acceptable. +- `DB_POSTGRES_PARTITIONING=""` correctly stays legacy. +- `METRIC_MAX_CARDINALITY_PER_TENANT=0` correctly stays unlimited. +- `MCP_CACHE_TTL_MS=0` correctly disables cache. +- No subtle default flips found. + +## Test Strength + +- Pipeline tests cover nil/empty/soft/hard. Strong assertions. +- TSDB cardinality tests should explicitly verify the per-tenant cap fires BEFORE global; needs a test where global has headroom but per-tenant is exceeded. Recommend adding. +- FTS5 tests cover BM25, prefix, stemming, tenant isolation, delete trigger - good. Missing: large purge perf regression. +- Partition tests cover greenfield refuse and DROP. Missing: PartitionScheduler Start-Stop-Start lifecycle (H4). +- MCP robustness tests cover concurrency cap and timeout. Missing: cache-key isolation with conflicting tenant args, semaphore leak on timeout (H1). + +## Adherence to Project Rules + +- Native net/http only: confirmed. +- Embedded DBs only: confirmed. +- No new frameworks introduced. +- Minimal-diff discipline: largely respected; no scope creep observed. diff --git a/docs/review/2026-04-28-second-opinion-review.md b/docs/review/2026-04-28-second-opinion-review.md new file mode 100644 index 0000000..47f8b34 --- /dev/null +++ b/docs/review/2026-04-28-second-opinion-review.md @@ -0,0 +1,159 @@ +# Second-Opinion Correctness & Security Review +**Scope:** 7 squash-merge commits (acf904d..050525e) on `origin/main` +**Date:** 2026-04-28 +**Reviewer:** independent pass (main shell) +**Tool runs:** `go vet ./...` → clean; `go test ./internal/tsdb/... ./internal/mcp/... ./internal/ingest/... ./internal/storage/... -race -count=1 -timeout 120s` → **269 passed, 0 failures, 0 races detected** + +--- + +## HIGH + +### H-1 — Data race on `droppedBatches` in TSDB aggregator +**File:** `internal/tsdb/aggregator.go:35, 263, 287` + +**Problem:** `droppedBatches int64` is a plain (non-atomic) `int64` that is written in `flush()` (the single aggregation goroutine, outside `a.mu`) and read in `DroppedBatches()` (any caller goroutine) without synchronization. + +**Root cause:** The field is incremented after `a.mu.Unlock()` returns — inside the `select default` branch — so the lock is not held at the time of the write. `DroppedBatches()` reads it without acquiring any lock. + +**Evidence:** +```go +// flush() — lock already released +select { +case a.flushChan <- batch: +default: + a.droppedBatches++ // line 287 — no lock held + ... + slog.Warn(..., "total_dropped", a.droppedBatches) // second unsynchronised read +} + +// DroppedBatches() — separate goroutine, no lock +func (a *Aggregator) DroppedBatches() int64 { + return a.droppedBatches // line 263 — bare read +} +``` + +Note: the race detector did not fire in the test run because the existing tests do not call `DroppedBatches()` concurrently with `flush()`. The race is real and will surface under load or when a metrics scrape hits the Prometheus endpoint while the aggregation goroutine is flushing. + +**Fix:** Change the field declaration to `droppedBatches atomic.Int64` and replace `a.droppedBatches++` with `a.droppedBatches.Add(1)`, and `a.droppedBatches` reads with `a.droppedBatches.Load()`. Alternatively, move the increment inside the `a.mu`-held section (but the lock scope currently ends before the channel select). + +--- + +## MEDIUM + +### M-1 — TOCTOU race in `Submit()` between fullness check and channel send +**File:** `internal/ingest/pipeline.go` (`Submit` function) + +**Problem:** The backpressure logic reads `len(p.queue)` to compute a `fullness` ratio, decides whether to drop silently or admit, then separately attempts `p.queue <- b` inside a `select/default`. A batch at 89.9% fullness passes the soft-drop gate, but if the queue reaches 100% between the check and the send, the `default` branch fires and returns `ErrQueueFull` — the caller gets a 429 instead of the intended silent drop. + +**Root cause:** Two separate operations (`len` then `send`) on a shared channel with no lock between them. The send attempt IS the only reliable fullness test. + +**Fix:** Restructure `Submit` so the channel send is the single decision point. Use a tiered select with an explicit 90%-threshold check only for the intentional-drop path, but always attempt the real send without a prior `len` check. Example shape: +```go +select { +case p.queue <- b: + return nil +default: + if shouldDrop(b) { + p.observeDrop(...) + return nil // silent drop + } + return ErrQueueFull +} +``` +This eliminates the window between check and send. + +--- + +### M-2 — `a.ring` and `a.onIngest` read outside `a.mu` in `Ingest()` +**File:** `internal/tsdb/aggregator.go` (`Ingest` and `SetRingBuffer`/`SetMetrics`) + +**Problem:** `Ingest()` reads `a.ring` and `a.onIngest` directly without holding `a.mu`, while `SetRingBuffer()` and `SetMetrics()` write them under `a.mu`. If either setter is called after `Start()`, this is a data race on the pointer values. + +**Root cause:** Pointers are treated as startup-only but no guard enforces that constraint — callers can technically call setters at any time. + +**Fix (two options):** +1. Read `a.ring` and `a.onIngest` under a short `a.mu.RLock()` inside `Ingest()`. +2. Add a `started atomic.Bool` guard that panics in setters when the aggregator is already running, and document the startup-only contract in godoc. + +Option 2 is cheaper and makes the invariant explicit. + +--- + +### M-3 — `process()` skips log callbacks when `BatchCreateSpans` fails +**File:** `internal/ingest/pipeline.go:307-320` + +**Problem:** When `BatchCreateSpans` returns an error, `process()` returns immediately, skipping the `BatchCreateLogs` write and all log/span callbacks (including `GraphRAG.OnLogIngested`). A partial batch that contains both spans and logs loses the log data silently; the error counter increments but the logs are not retried or DLQ'd. + +**Root cause:** The `return` after `BatchCreateSpans` failure was intentional ("mirrors the synchronous path's tolerance") but the comment does not acknowledge that co-batched logs are lost, and the async pipeline has no per-signal retry or DLQ path. + +**Fix:** Decouple spans and logs into separate `if` blocks that do not short-circuit each other, mirroring the treatment of `BatchCreateTraces` (which `continue`s rather than returns). The DLQ is already wired for span/log/trace typed envelopes — route the failed log slice there instead of dropping it. + +--- + +## LOW + +### L-1 — MCP config negative values accepted without validation +**File:** `internal/config/config.go` (validation block, `MCP_MAX_CONCURRENT`, `MCP_CALL_TIMEOUT_MS`, `MCP_CACHE_TTL_MS`) + +**Problem:** Negative values for the three MCP tunables are silently accepted by the config validation block and are treated as "disable" downstream. There is no documented contract for what negative means, and an operator who sets `MCP_MAX_CONCURRENT=-1` expecting a sensible default gets a fully open semaphore (no concurrency cap) without any log warning. + +**Fix:** Add explicit range checks in the validation block (same pattern used for `HOT_RETENTION_DAYS`) and either reject negative values as invalid or clamp-and-warn: +```go +if cfg.MCPMaxConcurrent < 0 { + slog.Warn("MCP_MAX_CONCURRENT < 0, treating as unlimited (no cap)") +} +``` + +--- + +### L-2 — `SetCallLimit()` replaces `callSlots` channel without draining +**File:** `internal/mcp/server.go` (`SetCallLimit`) + +**Problem:** `SetCallLimit` creates a new channel and assigns it to `s.callSlots` while in-flight callers hold permits on the old channel. When those goroutines release (`<-s.callSlots`), they release to the now-GC-eligible old channel — the new channel is unaffected and starts empty. This means the new limit takes effect cleanly, but any in-flight calls that were counted against the old semaphore are not counted against the new one, so briefly up to `oldLimit + newLimit` concurrent calls can coexist. + +**Root cause:** Live channel swap without a quiescing barrier. + +**Impact:** Low — `SetCallLimit` is only called from `main.go` before the server begins serving. It becomes a problem only if it is ever called dynamically (e.g., a future live-reload path). + +**Fix:** Document that `SetCallLimit` is startup-only and must not be called after `ServeHTTP` traffic starts. Alternatively, add a `sync.Mutex` guard and wait for in-flight slots to drain before swapping. + +--- + +### L-3 — `LogsPartitioned()` flag is not synchronized +**File:** `internal/storage/repository.go:78, 82` + +**Problem:** `logsPartitioned bool` is set once in `NewRepository` (or via `MarkLogsPartitioned` in `factory.go`) and read in `LogsPartitioned()` — all without any synchronization. Go's memory model requires explicit synchronization even for single-writer/single-reader boolean flags unless the write happens-before all reads via a channel or mutex. + +**Root cause:** Plain bool field used as a concurrent flag. + +**Fix:** Use `atomic.Bool` or ensure all callers of `LogsPartitioned()` are invoked after the startup sequence that writes the flag (a simple documented happens-before guarantee is sufficient if enforced). + +--- + +## NOT BUGS (confirmed, closed) + +| Item | Verdict | +|------|---------| +| `break` inside `select/default` in `server.go` tools/call handler | NOT a bug — explicit `if rpcErr != nil { break }` guard follows immediately | +| Goroutine leak in `runWithTimeout` | NOT a leak — bounded at `2 * maxConcurrent`; goroutine completes and slot is released | +| `retention.go` `totalRuns` channel math | Correct — `2 + logsExpected` properly accounts for the conditional logs goroutine | +| FTS5 UPDATE trigger two-step delete+insert | Correct per external-content FTS5 spec | +| `isQueueFull` gRPC error unwrapping | Correct — `grpcstatus.FromError` uses `errors.As` internally | +| `SetCallLimit` mid-flight channel swap | Only called pre-serving from `main.go`; low risk, documented above as L-2 | +| `pgLogsRelkind` partition detection | Correct — reads `pg_class.relkind = 'p'` from live schema rather than trusting config | +| LIKE query construction (`fmt.Sprintf` with `op`) | Not injectable — `op` is always `"LIKE"` or `"ILIKE"` (internal constant), user input only flows through GORM's parameterised `?` placeholders | +| Tenant isolation on all read paths | Correct — every `Repository` read method gates on `WHERE tenant_id = ?` with the context-derived tenant. Write paths stamp `TenantID` at parse time in `otlp.go`/`otlp_http.go` | +| `AutoMigrateModels` FTS5 gate | Correct — `fts5Available()` guards the virtual-table and trigger setup; FTS5 path only runs for SQLite | + +--- + +## Summary + +| Severity | Count | Items | +|----------|-------|-------| +| Critical | 0 | — | +| High | 1 | H-1 (droppedBatches data race) | +| Medium | 3 | M-1 (Submit TOCTOU), M-2 (ring/onIngest pointer read), M-3 (log loss on span failure) | +| Low | 3 | L-1 (MCP negative config), L-2 (SetCallLimit swap), L-3 (logsPartitioned bool sync) | + +**Recommended immediate action:** Fix H-1 (`atomic.Int64`) and M-3 (decouple span/log error paths in `process()`) in the next patch. M-1 and M-2 are correctness issues that have not manifested in 269 tests but are real under concurrent load. L-1/L-2/L-3 are hardening items for the next sprint. diff --git a/internal/ingest/pipeline.go b/internal/ingest/pipeline.go index c7c419e..4c20609 100644 --- a/internal/ingest/pipeline.go +++ b/internal/ingest/pipeline.go @@ -90,10 +90,19 @@ func DefaultPipelineConfig() PipelineConfig { // pipelineWriter is the slice of *storage.Repository the Pipeline depends // on. Defining it as an interface keeps the package layering clean and // lets tests inject fakes without spinning up SQLite. +// +// The async pipeline drives only BatchCreateAll. The single-signal +// methods are kept on the interface for forward-compatibility with +// callers that may construct a writer directly (e.g. backfill tools); +// they aren't on the hot ingest path anymore. type pipelineWriter interface { BatchCreateTraces(traces []storage.Trace) error BatchCreateSpans(spans []storage.Span) error BatchCreateLogs(logs []storage.Log) error + // BatchCreateAll persists all three signal slices as a single atomic + // transaction. A failure (or panic) anywhere in the chain rolls back + // the entire batch, preventing orphan FK rows. + BatchCreateAll(traces []storage.Trace, spans []storage.Span, logs []storage.Log) error } // Pipeline decouples OTLP Export() from synchronous DB writes. It holds a @@ -277,10 +286,18 @@ func (p *Pipeline) worker(ctx context.Context) { } } -// process persists a single batch in Trace→Span→Log order, mirroring the -// ordering invariant of the synchronous Export() path. Failures are -// logged and surfaced via processFailures; the batch is then dropped -// (the DLQ tier is the redundancy story for write failures). +// process persists a single batch in a single DB transaction. Trace→Span→Log +// ordering inside the transaction mirrors the FK invariant of the synchronous +// Export() path; atomicity prevents the orphan-row class of bugs where a +// panic between two BatchCreate* calls left a parent row with no children +// (or vice versa). Any failure rolls the entire batch back; the worker logs, +// increments processFailures, and drops the batch (DLQ is the redundancy +// story for sustained failures). +// +// Behavior change vs. the pre-tx implementation: trace insert errors are no +// longer "tolerated" with downstream spans/logs continuing — the whole batch +// is now atomic. This is intentional. Traces are idempotent (ON CONFLICT +// DO NOTHING), so a DLQ retry of the same envelope re-attempts cleanly. func (p *Pipeline) process(b *Batch) { if b == nil { return @@ -299,41 +316,27 @@ func (p *Pipeline) process(b *Batch) { }() p.processedTotal.Add(1) - if len(b.Traces) > 0 { - if err := p.writer.BatchCreateTraces(b.Traces); err != nil { - slog.Error("ingest pipeline: BatchCreateTraces failed", "error", err) - p.processFailures.Add(1) - // Continue — spans may still land if their trace exists from - // a prior batch. Mirrors the synchronous path's tolerance. - } + if len(b.Traces) == 0 && len(b.Spans) == 0 && len(b.Logs) == 0 { + return } - if len(b.Spans) > 0 { - if err := p.writer.BatchCreateSpans(b.Spans); err != nil { - slog.Error("ingest pipeline: BatchCreateSpans failed", "error", err) - p.processFailures.Add(1) - // Skip log insert in this batch — TestPipeline_FailedSpansSkipsLogs - // enforces the invariant that orphan logs are not persisted - // without their spans, mirroring the synchronous path. Span - // failures should be rare (DB unavailable etc.); the DLQ tier - // is the redundancy story for sustained failures. - return - } - if b.SpanCallback != nil { - for _, s := range b.Spans { - b.SpanCallback(s) - } - } + + if err := p.writer.BatchCreateAll(b.Traces, b.Spans, b.Logs); err != nil { + slog.Error("ingest pipeline: BatchCreateAll failed", "error", err) + p.processFailures.Add(1) + return } - if len(b.Logs) > 0 { - if err := p.writer.BatchCreateLogs(b.Logs); err != nil { - slog.Error("ingest pipeline: BatchCreateLogs failed", "error", err) - p.processFailures.Add(1) - return + + // Callbacks fire only after the transaction commits successfully — a + // rolled-back batch must not feed downstream consumers (GraphRAG etc.) + // data that no longer exists in the DB. + if b.SpanCallback != nil { + for _, s := range b.Spans { + b.SpanCallback(s) } - if b.LogCallback != nil { - for _, l := range b.Logs { - b.LogCallback(l) - } + } + if b.LogCallback != nil { + for _, l := range b.Logs { + b.LogCallback(l) } } } diff --git a/internal/ingest/pipeline_test.go b/internal/ingest/pipeline_test.go index 7e95450..edb30f2 100644 --- a/internal/ingest/pipeline_test.go +++ b/internal/ingest/pipeline_test.go @@ -60,6 +60,29 @@ func (f *fakeWriter) BatchCreateLogs(l []storage.Log) error { return f.logErr } +// BatchCreateAll mirrors Repository.BatchCreateAll's all-or-nothing semantics: +// each inner method is called in Trace→Span→Log order; the first error +// short-circuits and is returned. Existing tests that observe per-method call +// counts and ordering keep working without modification. +func (f *fakeWriter) BatchCreateAll(t []storage.Trace, s []storage.Span, l []storage.Log) error { + if len(t) > 0 { + if err := f.BatchCreateTraces(t); err != nil { + return err + } + } + if len(s) > 0 { + if err := f.BatchCreateSpans(s); err != nil { + return err + } + } + if len(l) > 0 { + if err := f.BatchCreateLogs(l); err != nil { + return err + } + } + return nil +} + func (f *fakeWriter) snapshotOrder() []string { f.mu.Lock() defer f.mu.Unlock() @@ -281,10 +304,11 @@ func TestPipeline_FailedSpansSkipsLogs(t *testing.T) { } } -func TestPipeline_FailedTracesContinuesToSpans(t *testing.T) { - // Trace failures are tolerated — spans may still land if the trace - // row exists from a prior batch. Must NOT short-circuit subsequent - // span/log persistence. +func TestPipeline_FailedTracesAbortsBatch(t *testing.T) { + // Trace failures roll the entire batch back — atomic batches are the + // fix for orphan FK rows when a worker crashes between BatchCreate* + // calls. Spans and logs must NOT be persisted when the trace insert + // fails. Counterpart of TestPipeline_FailedSpansSkipsLogs. w := &fakeWriter{traceErr: errors.New("transient")} p := NewPipeline(w, nil, PipelineConfig{Capacity: 2, Workers: 1}) ctx, cancel := context.WithCancel(context.Background()) @@ -295,11 +319,14 @@ func TestPipeline_FailedTracesContinuesToSpans(t *testing.T) { if err := p.Submit(healthyBatch()); err != nil { t.Fatalf("submit: %v", err) } - if !waitFor(t, 2*time.Second, func() bool { - ord := w.snapshotOrder() - return len(ord) == 3 && ord[1] == "spans" && ord[2] == "logs" - }) { - t.Fatalf("trace failure should not stop spans/logs — order=%v", w.snapshotOrder()) + if !waitFor(t, 2*time.Second, func() bool { return p.Stats().ProcessFailures > 0 }) { + t.Fatalf("expected ProcessFailures > 0, got %d", p.Stats().ProcessFailures) + } + calls := w.snapshotOrder() + for _, c := range calls { + if c == "spans" || c == "logs" { + t.Fatalf("spans/logs ran after trace failure — order=%v", calls) + } } } diff --git a/internal/queue/dlq.go b/internal/queue/dlq.go index 29aa846..803b42a 100644 --- a/internal/queue/dlq.go +++ b/internal/queue/dlq.go @@ -159,6 +159,16 @@ func (d *DeadLetterQueue) Enqueue(batch any) error { _ = os.Remove(path) return fmt.Errorf("DLQ: failed to write %s: %w", path, err) } + // fsync before close so a host crash between Write and Close cannot leave + // a torn file on disk that permanently consumes a retry slot. Without + // this, the partial JSON would unmarshal-fail every replay until + // DLQ_MAX_RETRIES evicts it — wasting the slot and emitting a steady + // stream of replay-error logs. + if err := f.Sync(); err != nil { + _ = f.Close() + _ = os.Remove(path) + return fmt.Errorf("DLQ: failed to fsync %s: %w", path, err) + } if err := f.Close(); err != nil { _ = os.Remove(path) return fmt.Errorf("DLQ: failed to close %s: %w", path, err) diff --git a/internal/storage/trace_repo.go b/internal/storage/trace_repo.go index 05eaf61..23452ad 100644 --- a/internal/storage/trace_repo.go +++ b/internal/storage/trace_repo.go @@ -63,10 +63,53 @@ func (r *Repository) BatchCreateTraces(traces []Trace) error { if len(traces) == 0 { return nil } - if strings.ToLower(r.driver) == "mysql" { - return r.db.Clauses(clause.Insert{Modifier: "IGNORE"}).Create(&traces).Error + return createTracesIdempotent(r.db, r.driver, traces) +} + +// createTracesIdempotent runs the conflict-tolerant trace insert against an +// arbitrary *gorm.DB so the same logic is reused inside a transaction by +// BatchCreateAll. MySQL takes INSERT IGNORE; SQLite/Postgres take +// ON CONFLICT DO NOTHING via the gorm clause helper. +func createTracesIdempotent(db *gorm.DB, driver string, traces []Trace) error { + if strings.ToLower(driver) == "mysql" { + return db.Clauses(clause.Insert{Modifier: "IGNORE"}).Create(&traces).Error + } + return db.Clauses(clause.OnConflict{DoNothing: true}).Create(&traces).Error +} + +// BatchCreateAll persists traces, spans, and logs in a single DB transaction. +// The async ingest pipeline uses this path so a failure (or panic) mid-batch +// rolls back any partial commit, preventing orphan FK rows from a worker that +// crashed between BatchCreateTraces and BatchCreateSpans. +// +// Trace inserts inherit BatchCreateTraces' idempotency: a trace_id clash +// within the same tenant is silently skipped. Spans and logs have no unique +// constraint, so a replay can still produce duplicate rows — that is a +// separate idempotency concern (requires schema migration to fix) and is +// out of scope for this method, whose contract is solely atomicity of the +// batch. +func (r *Repository) BatchCreateAll(traces []Trace, spans []Span, logs []Log) error { + if len(traces) == 0 && len(spans) == 0 && len(logs) == 0 { + return nil } - return r.db.Clauses(clause.OnConflict{DoNothing: true}).Create(&traces).Error + return r.db.Transaction(func(tx *gorm.DB) error { + if len(traces) > 0 { + if err := createTracesIdempotent(tx, r.driver, traces); err != nil { + return fmt.Errorf("BatchCreateAll: traces: %w", err) + } + } + if len(spans) > 0 { + if err := tx.CreateInBatches(spans, 500).Error; err != nil { + return fmt.Errorf("BatchCreateAll: spans: %w", err) + } + } + if len(logs) > 0 { + if err := tx.CreateInBatches(logs, 500).Error; err != nil { + return fmt.Errorf("BatchCreateAll: logs: %w", err) + } + } + return nil + }) } // CreateTrace inserts a new trace, skipping if it already exists.