Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ Key settings in `internal/config/config.go`:
- `SAMPLING_RATE` (1.0), `SAMPLING_ALWAYS_ON_ERRORS` (true), `SAMPLING_LATENCY_THRESHOLD_MS` (500)
- `METRIC_MAX_CARDINALITY` (10000), `METRIC_MAX_CARDINALITY_PER_TENANT` (0 = unlimited), `API_RATE_LIMIT_RPS` (100). The per-tenant cap is checked first; when set, a noisy tenant cannot exhaust the global pool. Overflow is labeled by tenant via `otelcontext_tsdb_cardinality_overflow_by_tenant_total{tenant_id}` (`__global__` sentinel when the global cap was the trigger).
- `MCP_ENABLED` (true), `MCP_PATH` (/mcp)
- `MCP_MAX_CONCURRENT` (32), `MCP_CALL_TIMEOUT_MS` (30000), `MCP_CACHE_TTL_MS` (5000) — MCP HTTP streamable robustness. Counting semaphore gates concurrent `tools/call` (JSON-RPC `-32000` past the cap), per-call deadlines abort runaway handlers (JSON-RPC `-32001`), and a 5s TTL cache memoizes the cheap in-memory GraphRAG tools (`get_service_map`, `impact_analysis`, `root_cause_analysis`, `get_anomaly_timeline`, `get_service_health`). SSE GET sends a `: keep-alive\n\n` comment every 25s to keep the stream alive across reverse-proxy idle timeouts. Set any to 0 to disable.
- `VECTOR_INDEX_MAX_ENTRIES` (100000)
- `DLQ_MAX_FILES` (1000), `DLQ_MAX_DISK_MB` (500), `DLQ_MAX_RETRIES` (10)
- `GRAPHRAG_WORKER_COUNT` (16), `GRAPHRAG_EVENT_QUEUE_SIZE` (100000) — sized for 100–200 services; raise further if `otelcontext_graphrag_events_dropped_total` climbs
Expand Down
1 change: 1 addition & 0 deletions docs/OPERATIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ DB_DSN="host=my-server.postgres.database.azure.com user=my-mi@tenant.onmicrosoft
- `INGEST_ASYNC_ENABLED=true`, `INGEST_PIPELINE_QUEUE_SIZE=50000`, `INGEST_PIPELINE_WORKERS=8` — async ingest pipeline. Decouples OTLP `Export()` from DB writes. Backpressure is hybrid: silent drop of healthy traces at >=90% queue, gRPC `RESOURCE_EXHAUSTED` (HTTP `429 Too Many Requests` + `Retry-After: 1` on the OTLP HTTP receiver) at 100%. Disable only to debug the legacy synchronous write path. Watch `otelcontext_ingest_pipeline_dropped_total{signal,reason}`, `otelcontext_ingest_pipeline_queue_depth{signal}`, and `otelcontext_http_otlp_throttled_total{signal}`.
- `GRPC_MAX_RECV_MB=16`, `GRPC_MAX_CONCURRENT_STREAMS=1000` — OTLP gRPC server caps
- `RETENTION_BATCH_SIZE=50000`, `RETENTION_BATCH_SLEEP_MS=1` — purge pacing; raise the sleep for busy production DBs
- `MCP_MAX_CONCURRENT=32`, `MCP_CALL_TIMEOUT_MS=30000`, `MCP_CACHE_TTL_MS=5000` — MCP HTTP streamable robustness. Concurrent `tools/call` invocations are gated by a counting semaphore (returns JSON-RPC `-32000` "server overloaded" past the cap). Per-call deadlines abort runaway tool handlers (returns JSON-RPC `-32001` "call timeout"). Cheap GraphRAG tools (`get_service_map`, `impact_analysis`, `root_cause_analysis`, `get_anomaly_timeline`, `get_service_health`) are memoized for the TTL window, keyed by `(tenant, tool, args)`. Setting any of these to `0` disables that protection.

### SQLite in production
SQLite is rejected at startup when `APP_ENV=production` unless you explicitly opt in with `OTELCONTEXT_ALLOW_SQLITE_PROD=true`. The guard exists because SQLite uses a single writer lock — fine for < ~10 services at low QPS, miserable at scale. Prefer Postgres for anything resembling production.
Expand Down
20 changes: 18 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,19 @@ type Config struct {
// MCP Server
MCPEnabled bool
MCPPath string
// MCPMaxConcurrent caps the in-flight tools/call invocations server-wide.
// Beyond this, callers receive a JSON-RPC server-overloaded error. <=0
// disables the cap. Default 32 — sized for tight agent polling loops
// without overrunning the GraphRAG in-memory store.
MCPMaxConcurrent int
// MCPCallTimeoutMs is the per-invocation deadline for tools/call. A tool
// that exceeds it gets cancelled and the client receives an RPC timeout
// error. <=0 disables the deadline. Default 30000 (30s).
MCPCallTimeoutMs int
// MCPCacheTTLMs is the lifetime of a memoized tool result for the cheap
// in-memory GraphRAG tools (get_service_map, impact_analysis, etc.).
// <=0 disables caching. Default 5000 (5s).
MCPCacheTTLMs int

// Compression
CompressionLevel string // "default", "fast", "best"
Expand Down Expand Up @@ -230,8 +243,11 @@ func Load(customPath string) (*Config, error) {
APIRateLimitRPS: getEnvInt("API_RATE_LIMIT_RPS", 100),

// MCP
MCPEnabled: getEnvBool("MCP_ENABLED", true),
MCPPath: getEnv("MCP_PATH", "/mcp"),
MCPEnabled: getEnvBool("MCP_ENABLED", true),
MCPPath: getEnv("MCP_PATH", "/mcp"),
MCPMaxConcurrent: getEnvInt("MCP_MAX_CONCURRENT", 32),
MCPCallTimeoutMs: getEnvInt("MCP_CALL_TIMEOUT_MS", 30000),
MCPCacheTTLMs: getEnvInt("MCP_CACHE_TTL_MS", 5000),

// Compression
CompressionLevel: getEnv("COMPRESSION_LEVEL", "default"),
Expand Down
169 changes: 169 additions & 0 deletions internal/mcp/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package mcp

import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"sort"
"strings"
"sync"
"time"
)

// cacheableTools is the whitelist of tool names whose results are safe to
// memoize for a short window. We cache only the "instant in-memory"
// GraphRAG tools — they are computed off the live in-memory store, so a
// short cache TTL adds no observability lag worth worrying about while
// dramatically reducing CPU under tight polling loops from agent clients.
//
// DB-backed tools (get_investigations, get_log_context, etc.) are
// deliberately NOT cached — they reflect retention/replay state that
// changes meaningfully on millisecond scales and the per-call DB cost is
// already bounded by the storage layer.
var cacheableTools = map[string]struct{}{
"get_service_map": {},
"impact_analysis": {},
"root_cause_analysis": {},
"get_anomaly_timeline": {},
"get_service_health": {},
}

// isCacheable reports whether a tool name is on the cache whitelist.
func isCacheable(name string) bool {
_, ok := cacheableTools[name]
return ok
}

// resultCache is a tiny per-tenant TTL cache for MCP tool results. It
// stores the rendered ToolCallResult so cache hits return the exact bytes
// the cold path produced. The map is bounded by maxEntries; on overflow
// the cache evicts the oldest entry deterministically.
//
// Concurrency: a single sync.RWMutex covers both the map and the LRU-ish
// timestamp used for eviction. For the expected load (≤ a few thousand
// hits/sec from agent polling), this is significantly cheaper than the
// per-tool cost we are saving and keeps the implementation auditable.
type resultCache struct {
ttl time.Duration
maxEntries int
mu sync.RWMutex
entries map[string]cachedResult
}

type cachedResult struct {
result ToolCallResult
expireAt time.Time
}

// newResultCache constructs a cache with the given TTL and max-entry cap.
// ttl <= 0 disables caching entirely (Get/Set become no-ops).
func newResultCache(ttl time.Duration, maxEntries int) *resultCache {
if maxEntries <= 0 {
maxEntries = 4096
}
return &resultCache{
ttl: ttl,
maxEntries: maxEntries,
entries: make(map[string]cachedResult, maxEntries),
}
}

// disabled reports whether the cache is a no-op.
func (c *resultCache) disabled() bool { return c == nil || c.ttl <= 0 }

// key computes a stable cache key from (tenant, tool, args). Args order
// does not affect the key — JSON serialization is normalized via a sorted
// key list so {"a":1,"b":2} and {"b":2,"a":1} hash to the same value.
func cacheKey(tenant, tool string, args map[string]any) string {
h := sha256.New()
_, _ = h.Write([]byte(tenant))
_, _ = h.Write([]byte{0})
_, _ = h.Write([]byte(tool))
_, _ = h.Write([]byte{0})
if args != nil {
// Stable serialization — Go's encoding/json doesn't guarantee map
// key order without help.
keys := make([]string, 0, len(args))
for k := range args {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
_, _ = h.Write([]byte(k))
_, _ = h.Write([]byte("="))
b, _ := json.Marshal(args[k])
_, _ = h.Write(b)
_, _ = h.Write([]byte{1})
}
}
sum := h.Sum(nil)
return strings.ToLower(hex.EncodeToString(sum))
}

// Get returns the cached result for (tenant, tool, args) and a boolean
// indicating cache hit. Expired entries return false (and are NOT lazily
// evicted here — the next Set bound check handles eviction in bulk).
func (c *resultCache) Get(tenant, tool string, args map[string]any) (ToolCallResult, bool) {
if c.disabled() || !isCacheable(tool) {
return ToolCallResult{}, false
}
k := cacheKey(tenant, tool, args)
c.mu.RLock()
defer c.mu.RUnlock()
v, ok := c.entries[k]
if !ok {
return ToolCallResult{}, false
}
if time.Now().After(v.expireAt) {
return ToolCallResult{}, false
}
return v.result, true
}

// Set records a tool result. If the cache is over capacity, ~10% of the
// oldest entries are dropped in one pass — a cheap eviction policy that
// keeps the map size bounded without dragging in a full LRU list.
func (c *resultCache) Set(tenant, tool string, args map[string]any, result ToolCallResult) {
if c.disabled() || !isCacheable(tool) {
return
}
k := cacheKey(tenant, tool, args)
exp := time.Now().Add(c.ttl)
c.mu.Lock()
defer c.mu.Unlock()
if len(c.entries) >= c.maxEntries {
c.evictBatch()
}
c.entries[k] = cachedResult{result: result, expireAt: exp}
}

// evictBatch drops ~10% of entries with the soonest expiry. Called under mu.
func (c *resultCache) evictBatch() {
if len(c.entries) == 0 {
return
}
// Collect (key, expireAt) pairs and partial-sort by expireAt.
type kv struct {
key string
exp time.Time
}
pairs := make([]kv, 0, len(c.entries))
for k, v := range c.entries {
pairs = append(pairs, kv{k, v.expireAt})
}
sort.Slice(pairs, func(i, j int) bool { return pairs[i].exp.Before(pairs[j].exp) })
drop := max(len(pairs)/10, 1)
for i := range drop {
delete(c.entries, pairs[i].key)
}
}

// Stats returns the current cache size. Test/observability hook.
func (c *resultCache) Stats() (size int) {
if c == nil {
return 0
}
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.entries)
}
Loading
Loading