Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ Key settings in `internal/config/config.go`:
- `DEFAULT_TENANT` (`default`) — assigned to rows ingested without explicit tenant
- `HOT_RETENTION_DAYS` (7) — drives `RetentionScheduler`; range 1..36500
- `SAMPLING_RATE` (1.0), `SAMPLING_ALWAYS_ON_ERRORS` (true), `SAMPLING_LATENCY_THRESHOLD_MS` (500)
- `METRIC_MAX_CARDINALITY` (10000), `API_RATE_LIMIT_RPS` (100)
- `METRIC_MAX_CARDINALITY` (10000), `METRIC_MAX_CARDINALITY_PER_TENANT` (0 = unlimited), `API_RATE_LIMIT_RPS` (100). The per-tenant cap is checked first; when set, a noisy tenant cannot exhaust the global pool. Overflow is labeled by tenant via `otelcontext_tsdb_cardinality_overflow_by_tenant_total{tenant_id}` (`__global__` sentinel when the global cap was the trigger).
- `MCP_ENABLED` (true), `MCP_PATH` (/mcp)
- `VECTOR_INDEX_MAX_ENTRIES` (100000)
- `DLQ_MAX_FILES` (1000), `DLQ_MAX_DISK_MB` (500), `DLQ_MAX_RETRIES` (10)
Expand Down
3 changes: 2 additions & 1 deletion docs/OPERATIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ DB_DSN="host=my-server.postgres.database.azure.com user=my-mi@tenant.onmicrosoft
- `DB_AUTOMIGRATE=false` for Postgres in production.

### Trust the defaults (don't tune unless you have a reason)
- `METRIC_MAX_CARDINALITY=10000`
- `METRIC_MAX_CARDINALITY=10000`, `METRIC_MAX_CARDINALITY_PER_TENANT=0` (unlimited per-tenant by default). For multi-tenant deployments, set the per-tenant cap to enforce fairness — a noisy tenant gets bounded before exhausting the global pool. Watch `otelcontext_tsdb_cardinality_overflow_by_tenant_total{tenant_id}` to identify offenders.
- `DLQ_MAX_DISK_MB=500`, `DLQ_MAX_FILES=1000`, `DLQ_MAX_RETRIES=10`
- `API_RATE_LIMIT_RPS=100`
- `VECTOR_INDEX_MAX_ENTRIES=100000`
Expand Down Expand Up @@ -201,6 +201,7 @@ Grep structured logs for `acquire entra token`. Common causes: expired managed-i
- `rate(otelcontext_ingest_pipeline_dropped_total{reason="queue_full"}[5m]) > 0` — clients are getting `RESOURCE_EXHAUSTED`; raise `INGEST_PIPELINE_QUEUE_SIZE` or `INGEST_PIPELINE_WORKERS`. Sustained drops mean the DB cannot keep up with the ingest rate.
- `rate(otelcontext_ingest_pipeline_dropped_total{reason="soft_backpressure"}[5m]) > 0` — pipeline is actively shedding healthy traces; check downstream DB latency or scale workers/queue.
- `otelcontext_ingest_pipeline_queue_depth / INGEST_PIPELINE_QUEUE_SIZE > 0.7` for >5m — queue trending toward soft drop; capacity is becoming a constraint.
- `topk(5, sum by (tenant_id) (rate(otelcontext_tsdb_cardinality_overflow_by_tenant_total[5m]))) > 0` — identifies which tenants are exhausting their metric series budget. Combine with `METRIC_MAX_CARDINALITY_PER_TENANT` to enforce fairness.
- `otelcontext_retention_rows_behind > 1_000_000` — purge is falling behind; tune `RETENTION_BATCH_SIZE` / `RETENTION_BATCH_SLEEP_MS`
- `otelcontext_db_pool_in_use / otelcontext_db_pool_max_open > 0.9` — pool exhausted; raise `DB_MAX_OPEN_CONNS`
- `rate(otelcontext_dlq_evicted_total[5m]) > 0` — DLQ is actively dropping entries at cap; replay target is down or slow
Expand Down
16 changes: 14 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ type Config struct {
MetricAttributeKeys string // comma-separated allowlist
MetricMaxCardinality int

// Per-tenant cardinality cap. 0 = unlimited (only the global cap
// applies, preserving legacy single-tenant behavior). Setting this
// gives every tenant its own series budget so a noisy tenant cannot
// starve siblings of fresh series in the in-memory TSDB. The global
// cap (MetricMaxCardinality) remains a backstop and is checked
// after the per-tenant cap.
MetricMaxCardinalityPerTenant int

// DLQ Safety
DLQMaxFiles int
DLQMaxDiskMB int
Expand Down Expand Up @@ -192,8 +200,9 @@ func Load(customPath string) (*Config, error) {
SamplingLatencyThresholdMs: getEnvInt("SAMPLING_LATENCY_THRESHOLD_MS", 500),

// Cardinality
MetricAttributeKeys: getEnv("METRIC_ATTRIBUTE_KEYS", ""),
MetricMaxCardinality: getEnvInt("METRIC_MAX_CARDINALITY", 10000),
MetricAttributeKeys: getEnv("METRIC_ATTRIBUTE_KEYS", ""),
MetricMaxCardinality: getEnvInt("METRIC_MAX_CARDINALITY", 10000),
MetricMaxCardinalityPerTenant: getEnvInt("METRIC_MAX_CARDINALITY_PER_TENANT", 0),

// DLQ
DLQMaxFiles: getEnvInt("DLQ_MAX_FILES", 1000),
Expand Down Expand Up @@ -331,6 +340,9 @@ func (c *Config) Validate() error {
if c.MetricMaxCardinality < 0 {
return fmt.Errorf("METRIC_MAX_CARDINALITY must be >= 0, got %d", c.MetricMaxCardinality)
}
if c.MetricMaxCardinalityPerTenant < 0 {
return fmt.Errorf("METRIC_MAX_CARDINALITY_PER_TENANT must be >= 0, got %d", c.MetricMaxCardinalityPerTenant)
}
if c.SamplingRate < 0 || c.SamplingRate > 1.0 {
return fmt.Errorf("SAMPLING_RATE must be between 0 and 1, got %f", c.SamplingRate)
}
Expand Down
9 changes: 9 additions & 0 deletions internal/telemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ type Metrics struct {
TSDBFlushDuration prometheus.Histogram
TSDBBatchesDropped prometheus.Counter
TSDBCardinalityOverflow prometheus.Counter
// TSDBCardinalityOverflowByTenant labels overflow events with the tenant ID
// that triggered them, or the sentinel "__global__" when the global cap
// (not a per-tenant cap) was the trigger. Use this to identify noisy
// tenants: sum by (tenant_id) (rate(otelcontext_tsdb_cardinality_overflow_by_tenant_total[5m]))
TSDBCardinalityOverflowByTenant *prometheus.CounterVec

// --- WebSocket ---
WSMessagesSent *prometheus.CounterVec
Expand Down Expand Up @@ -178,6 +183,10 @@ func New() *Metrics {
Name: "OtelContext_tsdb_cardinality_overflow_total",
Help: "Metric points routed to overflow bucket due to cardinality limit.",
}),
TSDBCardinalityOverflowByTenant: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "otelcontext_tsdb_cardinality_overflow_by_tenant_total",
Help: "Metric points routed to overflow bucket, labeled by the tenant_id that exceeded its cap (or __global__ when the global cap triggered).",
}, []string{"tenant_id"}),

// WebSocket
WSMessagesSent: promauto.NewCounterVec(prometheus.CounterOpts{
Expand Down
101 changes: 83 additions & 18 deletions internal/tsdb/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,23 @@
pool sync.Pool
droppedBatches int64

// Cardinality controls
maxCardinality int // 0 = unlimited
cardinalityOverflow func() // called when overflow bucket is used (for metrics)
overflowKey string // constant key for the overflow bucket
// Cardinality controls.
//
// maxCardinality is the GLOBAL series budget across all tenants;
// when exceeded, new series go to a single shared overflow bucket
// keyed by overflowKey ("__cardinality_overflow__"). Preserves
// backward compatibility for single-tenant deployments.
//
// perTenantCardinality is the PER-TENANT series budget. When set
// (>0), it is checked first: a tenant exceeding its own cap routes
// to a tenant-specific overflow bucket so a noisy tenant cannot
// starve siblings of fresh series. seriesPerTenant counts unique
// (non-overflow) bucket keys per tenant and is reset by flush().
maxCardinality int // 0 = unlimited
perTenantCardinality int // 0 = unlimited (global cap still applies)
cardinalityOverflow func(tenantID string) // labeled per overflow event for Prometheus
seriesPerTenant map[string]int //nolint:unused // touched only via mu
overflowKey string // constant key for the global overflow bucket

// Ring buffer accelerator (optional)
ring *RingBuffer
Expand All @@ -49,28 +62,44 @@

const persistenceWorkers = 3

// overflowSentinelGlobal is the label value emitted on the per-tenant
// overflow counter when the GLOBAL cap (not a per-tenant cap) is the
// one that triggered. Distinguishes "this tenant got too noisy" from
// "the whole instance is at series budget".
const overflowSentinelGlobal = "__global__"

// NewAggregator creates a new TSDB aggregator.
func NewAggregator(repo *storage.Repository, windowSize time.Duration) *Aggregator {
a := &Aggregator{
repo: repo,
windowSize: windowSize,
buckets: make(map[string]*storage.MetricBucket),
stopChan: make(chan struct{}),
flushChan: make(chan []storage.MetricBucket, 500),
overflowKey: "__cardinality_overflow__",
repo: repo,
windowSize: windowSize,
buckets: make(map[string]*storage.MetricBucket),
seriesPerTenant: make(map[string]int),
stopChan: make(chan struct{}),
flushChan: make(chan []storage.MetricBucket, 500),
overflowKey: "__cardinality_overflow__",
}
a.pool.New = func() any {
return make([]storage.MetricBucket, 0, 100)
}
return a
}

// SetCardinalityLimit configures the maximum number of distinct metric series.
// When exceeded, new series are routed to an overflow bucket and onOverflow is called.
func (a *Aggregator) SetCardinalityLimit(max int, onOverflow func()) {
// SetCardinalityLimit configures the global and per-tenant series caps.
//
// global — total distinct series across all tenants; 0 = unlimited.
// perTenant — distinct series per tenant; 0 = unlimited (global only).
// onOverflow — called once per overflow event with the tenant ID
// that exceeded its cap, or overflowSentinelGlobal when
// the global cap (not per-tenant) is the trigger.
//
// Pass nil for onOverflow to disable the callback. Either cap may be 0
// independently.
func (a *Aggregator) SetCardinalityLimit(global, perTenant int, onOverflow func(tenantID string)) {
a.mu.Lock()
defer a.mu.Unlock()
a.maxCardinality = max
a.maxCardinality = global
a.perTenantCardinality = perTenant
a.cardinalityOverflow = onOverflow
}

Expand Down Expand Up @@ -119,7 +148,7 @@
}

// Ingest adds a raw metric point to the current aggregator window.
func (a *Aggregator) Ingest(m RawMetric) {

Check failure on line 151 in internal/tsdb/aggregator.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 21 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=RandomCodeSpace_otelcontext&issues=AZ3PvreJqA9SeROD9Kzg&open=AZ3PvreJqA9SeROD9Kzg&pullRequest=51
// Pre-compute key outside the lock — json.Marshal is CPU-bound and must not hold mu.
attrJSON, _ := json.Marshal(m.Attributes)
// Include tenant in the key so points from different tenants never merge.
Expand All @@ -138,10 +167,42 @@

bucket, exists := a.buckets[key]
if !exists {
// Cardinality guard: if limit exceeded, route to overflow bucket.
if a.maxCardinality > 0 && len(a.buckets) >= a.maxCardinality {
// Cardinality enforcement order:
// 1. Per-tenant cap — checked first so a noisy tenant gets
// bounded BEFORE the global pool is touched. Routes to a
// tenant-specific overflow bucket so each tenant's overflow
// stats stay separate.
// 2. Global cap — backstop. When triggered, routes to the
// single shared overflow bucket and labels the metric with
// the __global__ sentinel.
overTenantCap := a.perTenantCardinality > 0 && a.seriesPerTenant[m.TenantID] >= a.perTenantCardinality
overGlobalCap := a.maxCardinality > 0 && len(a.buckets) >= a.maxCardinality

switch {
case overTenantCap:
if a.cardinalityOverflow != nil {
a.cardinalityOverflow(m.TenantID)
}
key = a.overflowKey + "|" + m.TenantID
bucket = a.buckets[key]
if bucket == nil {
windowStart := m.Timestamp.Truncate(a.windowSize)
bucket = &storage.MetricBucket{
TenantID: m.TenantID,
Name: "__overflow__",
ServiceName: m.ServiceName,
TimeBucket: windowStart,
Min: m.Value,
Max: m.Value,
Sum: m.Value,
Count: 1,
}
a.buckets[key] = bucket
}
// Fall through to update existing overflow bucket below.
case overGlobalCap:
if a.cardinalityOverflow != nil {
a.cardinalityOverflow()
a.cardinalityOverflow(overflowSentinelGlobal)
}
key = a.overflowKey
bucket = a.buckets[key]
Expand All @@ -160,7 +221,7 @@
a.buckets[key] = bucket
}
// Fall through to update existing overflow bucket below.
} else {
default:
windowStart := m.Timestamp.Truncate(a.windowSize)
bucket = &storage.MetricBucket{
TenantID: m.TenantID,
Expand All @@ -174,6 +235,7 @@
AttributesJSON: storage.CompressedText(attrJSON),
}
a.buckets[key] = bucket
a.seriesPerTenant[m.TenantID]++
return
}
}
Expand Down Expand Up @@ -214,6 +276,9 @@
batch = append(batch, *b)
}
a.buckets = make(map[string]*storage.MetricBucket)
// Per-tenant counts track only non-overflow series in the live
// buckets map. Reset alongside it so the next window starts fresh.
a.seriesPerTenant = make(map[string]int)
a.mu.Unlock()

select {
Expand Down
Loading
Loading