Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
1b222e0
fix(graphrag): propagate real span Status into OnSpanIngested
aksOps Apr 23, 2026
75f31c6
feat(graphrag): expose event-drop metric + configurable workers/queue
aksOps Apr 23, 2026
0d0f583
fix(api): exempt OTLP /v1/* from per-IP rate limiter
aksOps Apr 23, 2026
bbb0833
docs(api): document /v1/* rate-limit exemption trade-off
aksOps Apr 23, 2026
fb85130
feat(grpc): apply message size, stream, and keepalive limits
aksOps Apr 23, 2026
3e705f3
feat(config): bound GRPC_MAX_RECV_MB and GRPC_MAX_CONCURRENT_STREAMS
aksOps Apr 23, 2026
51852c0
feat(retention): parallelize purges + tune batch size/pacing
aksOps Apr 23, 2026
7d1b99f
fix(retention): recover panics in parallel purge goroutines
aksOps Apr 23, 2026
7703438
feat(storage): refuse SQLite in APP_ENV=production without override
aksOps Apr 23, 2026
7709782
feat(telemetry): expose DB pool stats
aksOps Apr 23, 2026
93b9778
chore(deps): tidy go.mod after Task 7 telemetry test imports
aksOps Apr 23, 2026
8111d60
feat(dlq): export eviction metrics
aksOps Apr 23, 2026
a6ef0d5
feat(graphrag): dedup investigations with 5m cooldown
aksOps Apr 23, 2026
65f3742
perf(storage): driver-switched p99 computation
aksOps Apr 23, 2026
0e4da47
fix(storage): address Task 10 dashboard p99 review findings
aksOps Apr 23, 2026
6b63e0b
feat(loadsim): add 200-service OTLP load simulator
aksOps Apr 23, 2026
8cccd14
fix(loadsim): address review findings
aksOps Apr 23, 2026
e137451
test(storage): shrink p99 cap in test to survive -race
aksOps Apr 23, 2026
3e64f1a
docs(operations): document Tasks 1-11 metrics and load simulator
aksOps Apr 23, 2026
8e85a85
docs: address final review findings
aksOps Apr 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ Key settings in `internal/config/config.go`:
- `MCP_ENABLED` (true), `MCP_PATH` (/mcp)
- `VECTOR_INDEX_MAX_ENTRIES` (100000)
- `DLQ_MAX_FILES` (1000), `DLQ_MAX_DISK_MB` (500), `DLQ_MAX_RETRIES` (10)
- `GRAPHRAG_WORKER_COUNT` (4), `GRAPHRAG_EVENT_QUEUE_SIZE` (10000) — raise both at 100+ services if `otelcontext_graphrag_events_dropped_total` climbs
- `GRPC_MAX_RECV_MB` (16), `GRPC_MAX_CONCURRENT_STREAMS` (1000) — OTLP gRPC server caps, validated to 1..256 and 1..1_000_000
- `RETENTION_BATCH_SIZE` (50000), `RETENTION_BATCH_SLEEP_MS` (1) — purge pacing; raise the sleep on busy production DBs
- `APP_ENV` (`"development"`), `OTELCONTEXT_ALLOW_SQLITE_PROD` (false) — SQLite is refused when `APP_ENV=production` unless the allow flag is set

### Authentication

Expand Down
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: build test vet check setup-hooks ui-install ui-build dev-ui
.PHONY: build test vet check setup-hooks ui-install ui-build dev-ui loadtest loadtest-build

ui-install:
cd ui && npm install
Expand All @@ -21,6 +21,13 @@ check: build vet test
dev-ui:
cd ui && npm run dev

loadtest-build:
CGO_ENABLED=0 go build -tags loadtest -o bin/loadsim ./test/loadsim

loadtest: loadtest-build
@echo "Running 200-service load simulator (60s) against localhost:4317..."
./bin/loadsim

## setup-hooks installs the pre-commit hook into .git/hooks
setup-hooks:
cp scripts/pre-commit .git/hooks/pre-commit
Expand Down
45 changes: 45 additions & 0 deletions docs/OPERATIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ DB_DSN="host=my-server.postgres.database.azure.com user=my-mi@tenant.onmicrosoft
- `API_RATE_LIMIT_RPS=100`
- `VECTOR_INDEX_MAX_ENTRIES=100000`
- `SAMPLING_*` (defaults keep 100% + always-on errors)
- `GRAPHRAG_WORKER_COUNT=4`, `GRAPHRAG_EVENT_QUEUE_SIZE=10000` — raise worker count at 100+ services if you see `graphrag_events_dropped_total` climbing
- `GRPC_MAX_RECV_MB=16`, `GRPC_MAX_CONCURRENT_STREAMS=1000` — OTLP gRPC server caps
- `RETENTION_BATCH_SIZE=50000`, `RETENTION_BATCH_SLEEP_MS=1` — purge pacing; raise the sleep for busy production DBs

### SQLite in production
SQLite is rejected at startup when `APP_ENV=production` unless you explicitly opt in with `OTELCONTEXT_ALLOW_SQLITE_PROD=true`. The guard exists because SQLite uses a single writer lock — fine for < ~10 services at low QPS, miserable at scale. Prefer Postgres for anything resembling production.

---

Expand Down Expand Up @@ -190,6 +196,11 @@ Grep structured logs for `acquire entra token`. Common causes: expired managed-i
- `OtelContext_retention_consecutive_failures > 3`
- `rate(OtelContext_otlp_payload_rejected_total[5m]) > 0`
- `rate(OtelContext_dlq_replay_failure_total[5m]) > rate(OtelContext_dlq_replay_success_total[5m])`
- `rate(otelcontext_graphrag_events_dropped_total[5m]) > 0` — ingestion channel saturated; bump `GRAPHRAG_WORKER_COUNT` or `GRAPHRAG_EVENT_QUEUE_SIZE`
- `otelcontext_retention_rows_behind > 1_000_000` — purge is falling behind; tune `RETENTION_BATCH_SIZE` / `RETENTION_BATCH_SLEEP_MS`
- `otelcontext_db_pool_in_use / otelcontext_db_pool_max_open > 0.9` — pool exhausted; raise `DB_MAX_OPEN_CONNS`
- `rate(otelcontext_dlq_evicted_total[5m]) > 0` — DLQ is actively dropping entries at cap; replay target is down or slow
- `rate(otelcontext_dashboard_p99_row_cap_hits_total[1h]) > 0` on SQLite — dataset exceeds the 200k in-memory cap; migrate to Postgres for accurate p99
- **Log levels:** `LOG_LEVEL=DEBUG` for deep diagnostics, default `INFO`. `WARN` or `ERROR` is too quiet for a running system; avoid in prod.

---
Expand All @@ -204,6 +215,40 @@ Grep structured logs for `acquire entra token`. Common causes: expired managed-i

---

## Scale & Load Testing

The backend is sized for **100–200 services** emitting OTLP at commodity rates. A programmatic load simulator ships with the repo to verify this.

### Running the simulator

```bash
make loadtest-build # produces bin/loadsim
./bin/loadsim # 200 producers × 50 spans/sec × 60s against localhost:4317
./bin/loadsim --help # flags: --endpoint, --services, --rate, --duration, --tenant-id, --warmup
```

The binary is under the `loadtest` build tag — `go build ./...` and `go test ./...` ignore it. `make loadtest` runs a full 60s sweep against `localhost:4317`.

### What healthy looks like

During a 60s / 200-service run against a warm instance on Postgres:

- Ingestion: no `otlp_payload_rejected_total` samples, no `graphrag_events_dropped_total` samples.
- DB pool: `db_pool_in_use / db_pool_max_open` stays below ~0.8.
- Retention: `retention_rows_behind` stays within one hourly tick of steady state.
- DLQ: zero activity (`dlq_evicted_total`, `dlq_replay_failure_total` unchanged).
- The dashboard p99 gauge updates without hitting the SQLite row cap.

If any of those trip, use the corresponding metric alert from the Observability section above as the entry point.

### When to re-run

- Before cutting a release that touches the ingestion path or GraphRAG.
- After tuning any of: `GRAPHRAG_WORKER_COUNT`, `GRPC_MAX_CONCURRENT_STREAMS`, `RETENTION_BATCH_SIZE`, `DB_MAX_OPEN_CONNS`.
- When scaling the deployment past the current-tested envelope (e.g., 500+ services) — expand the simulator's `--services` flag to match.

---

## Upgrade Path

1. **Back up the DB** (see Backup & Restore above).
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1
github.com/coder/websocket v1.8.14
github.com/glebarez/go-sqlite v1.21.2
github.com/glebarez/sqlite v1.11.0
github.com/jackc/pgx/v5 v5.7.2
github.com/joho/godotenv v1.5.1
github.com/klauspost/compress v1.18.5
github.com/microsoft/go-mssqldb v1.9.7
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
github.com/testcontainers/testcontainers-go/modules/postgres v0.42.0
github.com/tmc/langchaingo v0.1.14
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0
Expand Down Expand Up @@ -60,7 +62,6 @@ require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/ebitengine/purego v0.10.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/glebarez/go-sqlite v1.21.2 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
Expand Down Expand Up @@ -95,7 +96,6 @@ require (
github.com/pkoukk/tiktoken-go v0.1.6 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
Expand Down
16 changes: 16 additions & 0 deletions internal/api/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,22 @@ func (rl *RateLimiter) Middleware(next http.Handler) http.Handler {
})
}

// MiddlewareExcept returns a handler chain that applies the rate limit only
// when skip(path) returns false. Intended to exempt OTLP ingestion paths from
// the per-IP API limiter.
func (rl *RateLimiter) MiddlewareExcept(skip func(path string) bool) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
wrapped := rl.Middleware(next)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if skip != nil && skip(r.URL.Path) {
next.ServeHTTP(w, r)
return
}
wrapped.ServeHTTP(w, r)
})
}
}

func (rl *RateLimiter) allow(ip string) bool {
rl.mu.Lock()
defer rl.mu.Unlock()
Expand Down
54 changes: 54 additions & 0 deletions internal/api/ratelimit_otlp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package api

import (
"net/http"
"net/http/httptest"
"strings"
"testing"
)

// TestRateLimiter_ExemptsOTLPPaths pins the /v1/* exemption contract.
//
// OTLP collectors (Otel SDK, Collector, Alloy, vector, etc.) batch aggressively
// and a single agent routinely exceeds the 100 RPS/IP default that protects
// /api/*. Without this exemption, legitimate ingestion traffic — the exact data
// this platform exists to capture — would get 429'd and dropped. This test
// locks in the carve-out so a future refactor doesn't silently re-enable
// throttling on /v1/* and regress ingestion.
//
// It also asserts the inverse — that /api/* on the *same* IP is still
// throttled — so the exemption remains narrow (path-prefix, not blanket).
func TestRateLimiter_ExemptsOTLPPaths(t *testing.T) {
rl := NewRateLimiter(1) // 1 RPS per IP — draconian
handler := rl.MiddlewareExcept(func(path string) bool {
return strings.HasPrefix(path, "/v1/")
})(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))

// 10 rapid OTLP requests should all 200.
for i := 0; i < 10; i++ {
r := httptest.NewRequest("POST", "/v1/traces", nil)
r.RemoteAddr = "10.0.0.1:1234"
w := httptest.NewRecorder()
handler.ServeHTTP(w, r)
if w.Code != 200 {
t.Fatalf("OTLP request %d got %d, expected 200 (exempt path)", i, w.Code)
}
}

// But /api/* on the same IP should get throttled after the 1st.
hits := 0
for i := 0; i < 5; i++ {
r := httptest.NewRequest("GET", "/api/logs", nil)
r.RemoteAddr = "10.0.0.1:1234"
w := httptest.NewRecorder()
handler.ServeHTTP(w, r)
if w.Code == 200 {
hits++
}
}
if hits >= 5 {
t.Fatalf("expected /api throttling, got %d/5 passes", hits)
}
}
72 changes: 71 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ type Config struct {
// Retention
HotRetentionDays int

// Retention tuning. Defaults (batch=50000, sleep=1ms) work for Postgres at
// 100k logs/sec sustained. Lower on resource-constrained hosts; raise on
// dedicated DB machines. 0/negative values use defaults.
RetentionBatchSize int
RetentionBatchSleepMs int

// TSDB
TSDBRingBufferDuration string // e.g. "1h"

Expand Down Expand Up @@ -63,6 +69,13 @@ type Config struct {
// Vector Index
VectorIndexMaxEntries int

// GraphRAG worker count (background consumers of the ingestion event channel).
// Defaults to 4 if unset or <=0. Increase under sustained high ingest.
GraphRAGWorkerCount int

// GraphRAG event channel buffer size. Defaults to 10000 if unset or <=0.
GraphRAGEventQueueSize int

// TLS (HTTP + gRPC). When both paths are set, TLS is enabled on both servers.
// Empty values (default) keep plaintext behavior.
TLSCertFile string
Expand Down Expand Up @@ -105,6 +118,15 @@ type Config struct {
// DevMode disables origin checks for WebSocket and enables dev-friendly defaults.
// Derived from APP_ENV == "development".
DevMode bool

// gRPC server tuning — protects against huge OTLP batches and connection abuse.
GRPCMaxRecvMB int
GRPCMaxConcurrentStreams int

// AllowSqliteProd lets operators explicitly acknowledge that SQLite is
// being used outside dev/test. Without it, a production Env + SQLite
// combination refuses to start.
AllowSqliteProd bool
}

func Load(customPath string) (*Config, error) {
Expand Down Expand Up @@ -145,7 +167,9 @@ func Load(customPath string) (*Config, error) {
DBConnMaxLifetime: getEnv("DB_CONN_MAX_LIFETIME", "1h"),

// Retention
HotRetentionDays: getEnvInt("HOT_RETENTION_DAYS", 7),
HotRetentionDays: getEnvInt("HOT_RETENTION_DAYS", 7),
RetentionBatchSize: getEnvInt("RETENTION_BATCH_SIZE", 50000),
RetentionBatchSleepMs: getEnvInt("RETENTION_BATCH_SLEEP_MS", 1),

// TSDB
TSDBRingBufferDuration: getEnv("TSDB_RING_BUFFER_DURATION", "1h"),
Expand Down Expand Up @@ -177,6 +201,10 @@ func Load(customPath string) (*Config, error) {
// Vector
VectorIndexMaxEntries: getEnvInt("VECTOR_INDEX_MAX_ENTRIES", 100000),

// GraphRAG
GraphRAGWorkerCount: getEnvInt("GRAPHRAG_WORKER_COUNT", 4),
GraphRAGEventQueueSize: getEnvInt("GRAPHRAG_EVENT_QUEUE_SIZE", 10000),

// TLS
TLSCertFile: getEnv("TLS_CERT_FILE", ""),
TLSKeyFile: getEnv("TLS_KEY_FILE", ""),
Expand All @@ -193,6 +221,13 @@ func Load(customPath string) (*Config, error) {
DefaultTenant: getEnv("DEFAULT_TENANT", "default"),
OTLPTrustResourceTenant: parseTruthy(getEnv("OTLP_TRUST_RESOURCE_TENANT", "")),
APITenantKeysFile: getEnv("API_TENANT_KEYS_FILE", ""),

// gRPC server tuning
GRPCMaxRecvMB: getEnvInt("GRPC_MAX_RECV_MB", 16),
GRPCMaxConcurrentStreams: getEnvInt("GRPC_MAX_CONCURRENT_STREAMS", 1000),

// Production safety guard for SQLite
AllowSqliteProd: parseTruthy(getEnv("OTELCONTEXT_ALLOW_SQLITE_PROD", "")),
}, nil
}

Expand Down Expand Up @@ -270,6 +305,12 @@ func (c *Config) Validate() error {
if c.HotRetentionDays < 1 || c.HotRetentionDays > 36500 {
return fmt.Errorf("HOT_RETENTION_DAYS must be between 1 and 36500, got %d", c.HotRetentionDays)
}
if c.RetentionBatchSize < 1 || c.RetentionBatchSize > 10_000_000 {
return fmt.Errorf("RETENTION_BATCH_SIZE must be between 1 and 10000000, got %d", c.RetentionBatchSize)
}
if c.RetentionBatchSleepMs < 0 || c.RetentionBatchSleepMs > 60_000 {
return fmt.Errorf("RETENTION_BATCH_SLEEP_MS must be between 0 and 60000, got %d", c.RetentionBatchSleepMs)
}
if c.MetricMaxCardinality < 0 {
return fmt.Errorf("METRIC_MAX_CARDINALITY must be >= 0, got %d", c.MetricMaxCardinality)
}
Expand All @@ -279,6 +320,17 @@ func (c *Config) Validate() error {
if c.APIRateLimitRPS < 0 {
return fmt.Errorf("API_RATE_LIMIT_RPS must be >= 0, got %d", c.APIRateLimitRPS)
}
// gRPC receive cap: must be positive, and capped to prevent per-message OOM
// from a bad env value (the limit pre-allocates a buffer of this size on
// the first large message). 256 MiB is far beyond any legitimate OTLP batch
// and still small enough that a 200-connection flood cannot exhaust a host
// with typical RAM.
if c.GRPCMaxRecvMB < 1 || c.GRPCMaxRecvMB > 256 {
return fmt.Errorf("GRPC_MAX_RECV_MB must be between 1 and 256, got %d", c.GRPCMaxRecvMB)
}
if c.GRPCMaxConcurrentStreams < 1 || c.GRPCMaxConcurrentStreams > 1_000_000 {
return fmt.Errorf("GRPC_MAX_CONCURRENT_STREAMS must be between 1 and 1000000, got %d", c.GRPCMaxConcurrentStreams)
}
if c.DBMaxOpenConns < 1 {
return fmt.Errorf("DB_MAX_OPEN_CONNS must be >= 1, got %d", c.DBMaxOpenConns)
}
Expand Down Expand Up @@ -352,3 +404,21 @@ func checkReadable(path string) error {
}
return f.Close()
}

// ValidateDBForEnv refuses the combination of SQLite driver + production
// environment unless AllowSqliteProd is explicitly set. SQLite's single-writer
// lock caps sustained throughput to ~5 services; using it in production will
// silently throttle ingestion.
//
// Call once during startup after Load + Validate.
func (c *Config) ValidateDBForEnv() error {
if !strings.EqualFold(c.DBDriver, "sqlite") {
return nil
}
if strings.EqualFold(c.Env, "production") && !c.AllowSqliteProd {
return fmt.Errorf("SQLite is unsuitable for APP_ENV=production " +
"(single-writer lock caps throughput at ~5 services). " +
"Use DB_DRIVER=postgres, or set OTELCONTEXT_ALLOW_SQLITE_PROD=true to acknowledge")
}
return nil
}
43 changes: 43 additions & 0 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func baseValid() *Config {
DBMaxOpenConns: 50,
DBMaxIdleConns: 10,
CompressionLevel: "default",
GRPCMaxRecvMB: 16,
GRPCMaxConcurrentStreams: 1000,
RetentionBatchSize: 50000,
RetentionBatchSleepMs: 1,
}
}

Expand Down Expand Up @@ -250,6 +254,45 @@ func TestTLSAutoSelfsigned_IgnoredWhenCertFilesSet(t *testing.T) {
}
}

func TestValidateDBForEnv_RefusesSQLiteInProduction(t *testing.T) {
c := baseValid()
c.DBDriver = "sqlite"
c.Env = "production"
c.AllowSqliteProd = false
err := c.ValidateDBForEnv()
if err == nil || !strings.Contains(err.Error(), "SQLite is unsuitable") {
t.Fatalf("expected SQLite-in-prod rejection, got %v", err)
}
}

func TestValidateDBForEnv_AllowsSQLiteWhenOptIn(t *testing.T) {
c := baseValid()
c.DBDriver = "sqlite"
c.Env = "production"
c.AllowSqliteProd = true
if err := c.ValidateDBForEnv(); err != nil {
t.Fatalf("opt-in should allow SQLite in prod, got %v", err)
}
}

func TestValidateDBForEnv_AllowsSQLiteInDev(t *testing.T) {
c := baseValid()
c.DBDriver = "sqlite"
c.Env = "development"
if err := c.ValidateDBForEnv(); err != nil {
t.Fatalf("SQLite in dev must pass, got %v", err)
}
}

func TestValidateDBForEnv_AllowsPostgresInProd(t *testing.T) {
c := baseValid()
c.DBDriver = "postgres"
c.Env = "production"
if err := c.ValidateDBForEnv(); err != nil {
t.Fatalf("Postgres in prod must pass, got %v", err)
}
}

func TestLoad_DefaultTenant_FallsBackToDefault(t *testing.T) {
// Ensure var is absent — Setenv("", "") would leave it set-but-empty, which
// the getEnv helper treats as a present value.
Expand Down
Loading
Loading