From 1426e6601b84db9fa5b0c81cb452322697bbeac7 Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Mon, 27 Apr 2026 16:20:24 +0000 Subject: [PATCH] feat(tsdb): per-tenant metric cardinality fairness MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 2 of the 150-200 component robustness work. Adds a per-tenant series budget so a noisy tenant cannot exhaust the global TSDB cardinality pool and starve siblings of fresh series. Behavior is opt-in to preserve back-compat: - METRIC_MAX_CARDINALITY (existing, default 10000) — global series cap. - METRIC_MAX_CARDINALITY_PER_TENANT (new, default 0=unlimited) — when set, each tenant gets its own series budget. - Per-tenant cap is checked FIRST; global cap is the backstop. - Per-tenant overflow buckets are tenant-scoped (key suffix |) so each tenant's overflow stats stay separate. Telemetry surface change: - TSDBCardinalityOverflow (Counter) — kept for back-compat dashboards. - TSDBCardinalityOverflowByTenant (CounterVec, label tenant_id) — new. Sentinel "__global__" when the global cap (not per-tenant) triggered. Lets operators identify noisy tenants: sum by (tenant_id) ( rate(otelcontext_tsdb_cardinality_overflow_by_tenant_total[5m]) ) Aggregator API: - SetCardinalityLimit signature changed to (global, perTenant int, onOverflow func(tenantID string)). Sole external caller (main.go) is updated. Old single-arg callback shape is gone. - flush() resets seriesPerTenant alongside the buckets map so each new window starts every tenant with a fresh budget. Tests cover: zero-config baseline, global-only legacy behavior, per-tenant fairness (tenant A exhausts budget, tenant B unaffected), per-tenant overflow buckets stay separate (no merge regression), flush resets counts, both caps coexist with correct precedence, default behavior unchanged when only global is set, overflow bucket stat accumulation. 8 tests, all pass under -race; full suite (13 packages) green. Docs updated in CLAUDE.md (env-var section) and docs/OPERATIONS.md (defaults section + new alert query under Observability). Co-Authored-By: Claude Opus 4.7 (1M context) --- CLAUDE.md | 2 +- docs/OPERATIONS.md | 3 +- internal/config/config.go | 16 +- internal/telemetry/metrics.go | 9 + internal/tsdb/aggregator.go | 101 +++++++++-- internal/tsdb/aggregator_test.go | 277 +++++++++++++++++++++++++++++++ main.go | 14 +- 7 files changed, 397 insertions(+), 25 deletions(-) create mode 100644 internal/tsdb/aggregator_test.go diff --git a/CLAUDE.md b/CLAUDE.md index dfd6e38..34b9853 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -210,7 +210,7 @@ Key settings in `internal/config/config.go`: - `DEFAULT_TENANT` (`default`) — assigned to rows ingested without explicit tenant - `HOT_RETENTION_DAYS` (7) — drives `RetentionScheduler`; range 1..36500 - `SAMPLING_RATE` (1.0), `SAMPLING_ALWAYS_ON_ERRORS` (true), `SAMPLING_LATENCY_THRESHOLD_MS` (500) -- `METRIC_MAX_CARDINALITY` (10000), `API_RATE_LIMIT_RPS` (100) +- `METRIC_MAX_CARDINALITY` (10000), `METRIC_MAX_CARDINALITY_PER_TENANT` (0 = unlimited), `API_RATE_LIMIT_RPS` (100). The per-tenant cap is checked first; when set, a noisy tenant cannot exhaust the global pool. Overflow is labeled by tenant via `otelcontext_tsdb_cardinality_overflow_by_tenant_total{tenant_id}` (`__global__` sentinel when the global cap was the trigger). - `MCP_ENABLED` (true), `MCP_PATH` (/mcp) - `VECTOR_INDEX_MAX_ENTRIES` (100000) - `DLQ_MAX_FILES` (1000), `DLQ_MAX_DISK_MB` (500), `DLQ_MAX_RETRIES` (10) diff --git a/docs/OPERATIONS.md b/docs/OPERATIONS.md index 895eb16..03ed766 100644 --- a/docs/OPERATIONS.md +++ b/docs/OPERATIONS.md @@ -87,7 +87,7 @@ DB_DSN="host=my-server.postgres.database.azure.com user=my-mi@tenant.onmicrosoft - `DB_AUTOMIGRATE=false` for Postgres in production. ### Trust the defaults (don't tune unless you have a reason) -- `METRIC_MAX_CARDINALITY=10000` +- `METRIC_MAX_CARDINALITY=10000`, `METRIC_MAX_CARDINALITY_PER_TENANT=0` (unlimited per-tenant by default). For multi-tenant deployments, set the per-tenant cap to enforce fairness — a noisy tenant gets bounded before exhausting the global pool. Watch `otelcontext_tsdb_cardinality_overflow_by_tenant_total{tenant_id}` to identify offenders. - `DLQ_MAX_DISK_MB=500`, `DLQ_MAX_FILES=1000`, `DLQ_MAX_RETRIES=10` - `API_RATE_LIMIT_RPS=100` - `VECTOR_INDEX_MAX_ENTRIES=100000` @@ -201,6 +201,7 @@ Grep structured logs for `acquire entra token`. Common causes: expired managed-i - `rate(otelcontext_ingest_pipeline_dropped_total{reason="queue_full"}[5m]) > 0` — clients are getting `RESOURCE_EXHAUSTED`; raise `INGEST_PIPELINE_QUEUE_SIZE` or `INGEST_PIPELINE_WORKERS`. Sustained drops mean the DB cannot keep up with the ingest rate. - `rate(otelcontext_ingest_pipeline_dropped_total{reason="soft_backpressure"}[5m]) > 0` — pipeline is actively shedding healthy traces; check downstream DB latency or scale workers/queue. - `otelcontext_ingest_pipeline_queue_depth / INGEST_PIPELINE_QUEUE_SIZE > 0.7` for >5m — queue trending toward soft drop; capacity is becoming a constraint. + - `topk(5, sum by (tenant_id) (rate(otelcontext_tsdb_cardinality_overflow_by_tenant_total[5m]))) > 0` — identifies which tenants are exhausting their metric series budget. Combine with `METRIC_MAX_CARDINALITY_PER_TENANT` to enforce fairness. - `otelcontext_retention_rows_behind > 1_000_000` — purge is falling behind; tune `RETENTION_BATCH_SIZE` / `RETENTION_BATCH_SLEEP_MS` - `otelcontext_db_pool_in_use / otelcontext_db_pool_max_open > 0.9` — pool exhausted; raise `DB_MAX_OPEN_CONNS` - `rate(otelcontext_dlq_evicted_total[5m]) > 0` — DLQ is actively dropping entries at cap; replay target is down or slow diff --git a/internal/config/config.go b/internal/config/config.go index f4d5bb7..469848c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -51,6 +51,14 @@ type Config struct { MetricAttributeKeys string // comma-separated allowlist MetricMaxCardinality int + // Per-tenant cardinality cap. 0 = unlimited (only the global cap + // applies, preserving legacy single-tenant behavior). Setting this + // gives every tenant its own series budget so a noisy tenant cannot + // starve siblings of fresh series in the in-memory TSDB. The global + // cap (MetricMaxCardinality) remains a backstop and is checked + // after the per-tenant cap. + MetricMaxCardinalityPerTenant int + // DLQ Safety DLQMaxFiles int DLQMaxDiskMB int @@ -192,8 +200,9 @@ func Load(customPath string) (*Config, error) { SamplingLatencyThresholdMs: getEnvInt("SAMPLING_LATENCY_THRESHOLD_MS", 500), // Cardinality - MetricAttributeKeys: getEnv("METRIC_ATTRIBUTE_KEYS", ""), - MetricMaxCardinality: getEnvInt("METRIC_MAX_CARDINALITY", 10000), + MetricAttributeKeys: getEnv("METRIC_ATTRIBUTE_KEYS", ""), + MetricMaxCardinality: getEnvInt("METRIC_MAX_CARDINALITY", 10000), + MetricMaxCardinalityPerTenant: getEnvInt("METRIC_MAX_CARDINALITY_PER_TENANT", 0), // DLQ DLQMaxFiles: getEnvInt("DLQ_MAX_FILES", 1000), @@ -331,6 +340,9 @@ func (c *Config) Validate() error { if c.MetricMaxCardinality < 0 { return fmt.Errorf("METRIC_MAX_CARDINALITY must be >= 0, got %d", c.MetricMaxCardinality) } + if c.MetricMaxCardinalityPerTenant < 0 { + return fmt.Errorf("METRIC_MAX_CARDINALITY_PER_TENANT must be >= 0, got %d", c.MetricMaxCardinalityPerTenant) + } if c.SamplingRate < 0 || c.SamplingRate > 1.0 { return fmt.Errorf("SAMPLING_RATE must be between 0 and 1, got %f", c.SamplingRate) } diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index ffabb39..290f30d 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -42,6 +42,11 @@ type Metrics struct { TSDBFlushDuration prometheus.Histogram TSDBBatchesDropped prometheus.Counter TSDBCardinalityOverflow prometheus.Counter + // TSDBCardinalityOverflowByTenant labels overflow events with the tenant ID + // that triggered them, or the sentinel "__global__" when the global cap + // (not a per-tenant cap) was the trigger. Use this to identify noisy + // tenants: sum by (tenant_id) (rate(otelcontext_tsdb_cardinality_overflow_by_tenant_total[5m])) + TSDBCardinalityOverflowByTenant *prometheus.CounterVec // --- WebSocket --- WSMessagesSent *prometheus.CounterVec @@ -178,6 +183,10 @@ func New() *Metrics { Name: "OtelContext_tsdb_cardinality_overflow_total", Help: "Metric points routed to overflow bucket due to cardinality limit.", }), + TSDBCardinalityOverflowByTenant: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "otelcontext_tsdb_cardinality_overflow_by_tenant_total", + Help: "Metric points routed to overflow bucket, labeled by the tenant_id that exceeded its cap (or __global__ when the global cap triggered).", + }, []string{"tenant_id"}), // WebSocket WSMessagesSent: promauto.NewCounterVec(prometheus.CounterOpts{ diff --git a/internal/tsdb/aggregator.go b/internal/tsdb/aggregator.go index 5386e7d..db50c88 100644 --- a/internal/tsdb/aggregator.go +++ b/internal/tsdb/aggregator.go @@ -34,10 +34,23 @@ type Aggregator struct { pool sync.Pool droppedBatches int64 - // Cardinality controls - maxCardinality int // 0 = unlimited - cardinalityOverflow func() // called when overflow bucket is used (for metrics) - overflowKey string // constant key for the overflow bucket + // Cardinality controls. + // + // maxCardinality is the GLOBAL series budget across all tenants; + // when exceeded, new series go to a single shared overflow bucket + // keyed by overflowKey ("__cardinality_overflow__"). Preserves + // backward compatibility for single-tenant deployments. + // + // perTenantCardinality is the PER-TENANT series budget. When set + // (>0), it is checked first: a tenant exceeding its own cap routes + // to a tenant-specific overflow bucket so a noisy tenant cannot + // starve siblings of fresh series. seriesPerTenant counts unique + // (non-overflow) bucket keys per tenant and is reset by flush(). + maxCardinality int // 0 = unlimited + perTenantCardinality int // 0 = unlimited (global cap still applies) + cardinalityOverflow func(tenantID string) // labeled per overflow event for Prometheus + seriesPerTenant map[string]int //nolint:unused // touched only via mu + overflowKey string // constant key for the global overflow bucket // Ring buffer accelerator (optional) ring *RingBuffer @@ -49,15 +62,22 @@ type Aggregator struct { const persistenceWorkers = 3 +// overflowSentinelGlobal is the label value emitted on the per-tenant +// overflow counter when the GLOBAL cap (not a per-tenant cap) is the +// one that triggered. Distinguishes "this tenant got too noisy" from +// "the whole instance is at series budget". +const overflowSentinelGlobal = "__global__" + // NewAggregator creates a new TSDB aggregator. func NewAggregator(repo *storage.Repository, windowSize time.Duration) *Aggregator { a := &Aggregator{ - repo: repo, - windowSize: windowSize, - buckets: make(map[string]*storage.MetricBucket), - stopChan: make(chan struct{}), - flushChan: make(chan []storage.MetricBucket, 500), - overflowKey: "__cardinality_overflow__", + repo: repo, + windowSize: windowSize, + buckets: make(map[string]*storage.MetricBucket), + seriesPerTenant: make(map[string]int), + stopChan: make(chan struct{}), + flushChan: make(chan []storage.MetricBucket, 500), + overflowKey: "__cardinality_overflow__", } a.pool.New = func() any { return make([]storage.MetricBucket, 0, 100) @@ -65,12 +85,21 @@ func NewAggregator(repo *storage.Repository, windowSize time.Duration) *Aggregat return a } -// SetCardinalityLimit configures the maximum number of distinct metric series. -// When exceeded, new series are routed to an overflow bucket and onOverflow is called. -func (a *Aggregator) SetCardinalityLimit(max int, onOverflow func()) { +// SetCardinalityLimit configures the global and per-tenant series caps. +// +// global — total distinct series across all tenants; 0 = unlimited. +// perTenant — distinct series per tenant; 0 = unlimited (global only). +// onOverflow — called once per overflow event with the tenant ID +// that exceeded its cap, or overflowSentinelGlobal when +// the global cap (not per-tenant) is the trigger. +// +// Pass nil for onOverflow to disable the callback. Either cap may be 0 +// independently. +func (a *Aggregator) SetCardinalityLimit(global, perTenant int, onOverflow func(tenantID string)) { a.mu.Lock() defer a.mu.Unlock() - a.maxCardinality = max + a.maxCardinality = global + a.perTenantCardinality = perTenant a.cardinalityOverflow = onOverflow } @@ -138,10 +167,42 @@ func (a *Aggregator) Ingest(m RawMetric) { bucket, exists := a.buckets[key] if !exists { - // Cardinality guard: if limit exceeded, route to overflow bucket. - if a.maxCardinality > 0 && len(a.buckets) >= a.maxCardinality { + // Cardinality enforcement order: + // 1. Per-tenant cap — checked first so a noisy tenant gets + // bounded BEFORE the global pool is touched. Routes to a + // tenant-specific overflow bucket so each tenant's overflow + // stats stay separate. + // 2. Global cap — backstop. When triggered, routes to the + // single shared overflow bucket and labels the metric with + // the __global__ sentinel. + overTenantCap := a.perTenantCardinality > 0 && a.seriesPerTenant[m.TenantID] >= a.perTenantCardinality + overGlobalCap := a.maxCardinality > 0 && len(a.buckets) >= a.maxCardinality + + switch { + case overTenantCap: + if a.cardinalityOverflow != nil { + a.cardinalityOverflow(m.TenantID) + } + key = a.overflowKey + "|" + m.TenantID + bucket = a.buckets[key] + if bucket == nil { + windowStart := m.Timestamp.Truncate(a.windowSize) + bucket = &storage.MetricBucket{ + TenantID: m.TenantID, + Name: "__overflow__", + ServiceName: m.ServiceName, + TimeBucket: windowStart, + Min: m.Value, + Max: m.Value, + Sum: m.Value, + Count: 1, + } + a.buckets[key] = bucket + } + // Fall through to update existing overflow bucket below. + case overGlobalCap: if a.cardinalityOverflow != nil { - a.cardinalityOverflow() + a.cardinalityOverflow(overflowSentinelGlobal) } key = a.overflowKey bucket = a.buckets[key] @@ -160,7 +221,7 @@ func (a *Aggregator) Ingest(m RawMetric) { a.buckets[key] = bucket } // Fall through to update existing overflow bucket below. - } else { + default: windowStart := m.Timestamp.Truncate(a.windowSize) bucket = &storage.MetricBucket{ TenantID: m.TenantID, @@ -174,6 +235,7 @@ func (a *Aggregator) Ingest(m RawMetric) { AttributesJSON: storage.CompressedText(attrJSON), } a.buckets[key] = bucket + a.seriesPerTenant[m.TenantID]++ return } } @@ -214,6 +276,9 @@ func (a *Aggregator) flush() { batch = append(batch, *b) } a.buckets = make(map[string]*storage.MetricBucket) + // Per-tenant counts track only non-overflow series in the live + // buckets map. Reset alongside it so the next window starts fresh. + a.seriesPerTenant = make(map[string]int) a.mu.Unlock() select { diff --git a/internal/tsdb/aggregator_test.go b/internal/tsdb/aggregator_test.go new file mode 100644 index 0000000..d794f2f --- /dev/null +++ b/internal/tsdb/aggregator_test.go @@ -0,0 +1,277 @@ +package tsdb + +import ( + "strconv" + "sync/atomic" + "testing" + "time" +) + +// newRawMetric builds a deterministic RawMetric for a given (tenant, name) +// pair. Each call returns a NEW series identity unless name + attrs are +// identical to a previous call — useful for cardinality stress tests. +func newRawMetric(tenant, svc, name string, attrSeed int) RawMetric { + return RawMetric{ + TenantID: tenant, + ServiceName: svc, + Name: name, + Value: 1.0, + Timestamp: time.Now(), + Attributes: map[string]any{"seed": attrSeed}, + } +} + +// countOverflowEvents wraps an atomic counter so tests can assert overflow +// fired the expected number of times under each labeled tenant. +type overflowCounter struct { + total atomic.Int64 + perTenant map[string]*atomic.Int64 + registerMu chan struct{} // dummy lock-free serialization for tests +} + +func newOverflowCounter() *overflowCounter { + return &overflowCounter{perTenant: make(map[string]*atomic.Int64), registerMu: make(chan struct{}, 1)} +} + +func (c *overflowCounter) inc(tenant string) { + c.total.Add(1) + // Lazy-register tenant counter under a tiny semaphore. Tests are + // single-goroutine so no race; semaphore is just defensive. + c.registerMu <- struct{}{} + if _, ok := c.perTenant[tenant]; !ok { + c.perTenant[tenant] = &atomic.Int64{} + } + <-c.registerMu + c.perTenant[tenant].Add(1) +} + +func (c *overflowCounter) tenant(t string) int64 { + if v, ok := c.perTenant[t]; ok { + return v.Load() + } + return 0 +} + +// TestAggregator_NoLimits_AcceptsUnlimitedSeries verifies that with both +// caps at 0, every distinct series gets its own bucket. Baseline. +func TestAggregator_NoLimits_AcceptsUnlimitedSeries(t *testing.T) { + a := NewAggregator(nil, time.Minute) + // No SetCardinalityLimit call → both caps are 0. + + for i := range 50 { + a.Ingest(newRawMetric("tenant-a", "svc", "metric-"+strconv.Itoa(i), i)) + } + if got := a.BucketCount(); got != 50 { + t.Fatalf("BucketCount=%d, want 50 (no caps configured)", got) + } +} + +// TestAggregator_GlobalCapRoutesToSharedOverflow verifies the legacy +// single-tenant behavior is preserved when only the global cap is set. +func TestAggregator_GlobalCapRoutesToSharedOverflow(t *testing.T) { + a := NewAggregator(nil, time.Minute) + c := newOverflowCounter() + a.SetCardinalityLimit(5, 0, c.inc) + + // 7 unique series → 5 fit, 2 overflow into the shared bucket. + for i := range 7 { + a.Ingest(newRawMetric("tenant-a", "svc", "metric-"+strconv.Itoa(i), i)) + } + + // 5 normal + 1 shared overflow = 6 buckets total. + if got := a.BucketCount(); got != 6 { + t.Fatalf("BucketCount=%d, want 6 (5 normal + 1 global overflow)", got) + } + if got := c.total.Load(); got != 2 { + t.Errorf("overflow callback fired %d times, want 2", got) + } + if got := c.tenant(overflowSentinelGlobal); got != 2 { + t.Errorf("global overflow label fired %d times, want 2", got) + } +} + +// TestAggregator_PerTenantCapEnforcesFairness is the core fairness +// guarantee — tenant A exhausts its budget but tenant B still gets fresh +// series. Without per-tenant fairness this test would fail because +// tenant A would consume the full global pool. +func TestAggregator_PerTenantCapEnforcesFairness(t *testing.T) { + a := NewAggregator(nil, time.Minute) + c := newOverflowCounter() + // Per-tenant cap=3, global cap=10 (large headroom — fairness is the + // only thing being tested). + a.SetCardinalityLimit(10, 3, c.inc) + + // Tenant A: 5 unique series → 3 fit, 2 overflow under tenant A's bucket. + for i := range 5 { + a.Ingest(newRawMetric("tenant-a", "svc", "metric-a"+strconv.Itoa(i), i)) + } + // Tenant B: 3 unique series — must all fit, NOT routed to overflow. + for i := range 3 { + a.Ingest(newRawMetric("tenant-b", "svc", "metric-b"+strconv.Itoa(i), i)) + } + + // 3 (tenant-a normal) + 1 (tenant-a overflow) + 3 (tenant-b normal) = 7. + if got := a.BucketCount(); got != 7 { + t.Fatalf("BucketCount=%d, want 7 — got %d means fairness broken or overflow accounting off", got, got) + } + if got := c.tenant("tenant-a"); got != 2 { + t.Errorf("tenant-a overflow fired %d times, want 2", got) + } + if got := c.tenant("tenant-b"); got != 0 { + t.Errorf("tenant-b overflow fired %d times, want 0 — fairness broken", got) + } + if got := c.tenant(overflowSentinelGlobal); got != 0 { + t.Errorf("global overflow fired %d times, want 0 — per-tenant cap should trigger first", got) + } +} + +// TestAggregator_PerTenantOverflowBucketsAreSeparate guards against the +// regression where two tenants in overflow merge into one shared bucket +// (which would corrupt per-tenant stats during flush). +func TestAggregator_PerTenantOverflowBucketsAreSeparate(t *testing.T) { + a := NewAggregator(nil, time.Minute) + a.SetCardinalityLimit(0, 1, func(string) {}) + + // Tenant A: 3 series → 1 fits, 2 overflow into tenant-a's overflow bucket. + for i := range 3 { + a.Ingest(newRawMetric("tenant-a", "svc", "metric-a"+strconv.Itoa(i), i)) + } + // Tenant B: 3 series → 1 fits, 2 overflow into tenant-b's overflow bucket. + for i := range 3 { + a.Ingest(newRawMetric("tenant-b", "svc", "metric-b"+strconv.Itoa(i), i)) + } + + // 2 normal + 2 overflow (one per tenant) = 4 buckets total. + if got := a.BucketCount(); got != 4 { + t.Fatalf("BucketCount=%d, want 4 (per-tenant overflow buckets must NOT merge)", got) + } +} + +// TestAggregator_FlushResetsPerTenantCounts verifies that after a flush +// pops the buckets, tenants get a fresh series budget. Otherwise the +// series count would grow monotonically and every tenant would saturate +// after a single window. +func TestAggregator_FlushResetsPerTenantCounts(t *testing.T) { + a := NewAggregator(nil, time.Minute) + a.SetCardinalityLimit(0, 2, func(string) {}) + + // Window 1: tenant-a fills its budget. + for i := range 2 { + a.Ingest(newRawMetric("tenant-a", "svc", "metric-"+strconv.Itoa(i), i)) + } + if got := a.BucketCount(); got != 2 { + t.Fatalf("pre-flush BucketCount=%d, want 2", got) + } + + a.flush() + + // Post-flush, the live map is empty and per-tenant count is reset. + if got := a.BucketCount(); got != 0 { + t.Fatalf("post-flush BucketCount=%d, want 0", got) + } + + // Window 2: tenant-a should be allowed 2 fresh series. If the count + // wasn't reset, both inserts would route to overflow and bucket count + // would be only 1 (a single overflow bucket). + c := newOverflowCounter() + a.SetCardinalityLimit(0, 2, c.inc) + for i := range 2 { + a.Ingest(newRawMetric("tenant-a", "svc", "metric-window2-"+strconv.Itoa(i), i)) + } + if got := a.BucketCount(); got != 2 { + t.Fatalf("post-flush window: BucketCount=%d, want 2 (per-tenant count must reset on flush)", got) + } + if got := c.total.Load(); got != 0 { + t.Errorf("post-flush window: overflow fired %d times, want 0", got) + } +} + +// TestAggregator_GlobalAndPerTenant_BothEnforced verifies the priority +// order: per-tenant cap is checked first; the global cap is a backstop +// that only fires when per-tenant is still under budget. In particular, +// once the global pool is exhausted, later tenants who haven't hit +// their per-tenant cap yet still get routed to the GLOBAL overflow — +// they're not penalized as if they had been noisy themselves. +// +// Trace with per-tenant=2, global=4: +// +// tenant-a × 3 → 2 normal, 1 PER-TENANT overflow (tenant-a label) +// global buckets after a: 3 (2 normal + 1 tenant-a overflow) +// tenant-b × 3 → 1 normal (global=3<4), 2 GLOBAL overflow (global hit at 4) +// global buckets after b: 5 +// tenant-c × 3 → 0 normal, 3 GLOBAL overflow +func TestAggregator_GlobalAndPerTenant_BothEnforced(t *testing.T) { + a := NewAggregator(nil, time.Minute) + c := newOverflowCounter() + a.SetCardinalityLimit(4, 2, c.inc) + + for _, tenant := range []string{"a", "b", "c"} { + for i := range 3 { + a.Ingest(newRawMetric("tenant-"+tenant, "svc", "metric-"+strconv.Itoa(i), i)) + } + } + + if got := c.tenant("tenant-a"); got != 1 { + t.Errorf("tenant-a per-tenant overflow=%d, want 1", got) + } + if got := c.tenant("tenant-b"); got != 0 { + t.Errorf("tenant-b per-tenant overflow=%d, want 0 (tenant-b never hit its per-tenant cap)", got) + } + if got := c.tenant("tenant-c"); got != 0 { + t.Errorf("tenant-c per-tenant overflow=%d, want 0", got) + } + // 5 global overflows: tenant-b's 2nd & 3rd plus all 3 of tenant-c. + if got := c.tenant(overflowSentinelGlobal); got != 5 { + t.Errorf("global overflow=%d, want 5", got) + } +} + +// TestAggregator_DefaultBehaviorUnchanged is a regression guard — when +// SetCardinalityLimit is called with the LEGACY (global, perTenant=0, +// callback) shape, behavior must match the pre-fairness Aggregator. +func TestAggregator_DefaultBehaviorUnchanged(t *testing.T) { + a := NewAggregator(nil, time.Minute) + c := newOverflowCounter() + // Per-tenant=0 → only global cap applies; identical to legacy. + a.SetCardinalityLimit(2, 0, c.inc) + + a.Ingest(newRawMetric("any", "svc", "m1", 1)) + a.Ingest(newRawMetric("any", "svc", "m2", 2)) + a.Ingest(newRawMetric("any", "svc", "m3", 3)) // overflow + + if got := a.BucketCount(); got != 3 { + t.Errorf("BucketCount=%d, want 3 (2 normal + 1 global overflow)", got) + } + if got := c.total.Load(); got != 1 { + t.Errorf("overflow fired %d times, want 1", got) + } + if got := c.tenant(overflowSentinelGlobal); got != 1 { + t.Errorf("global overflow label fired %d times, want 1", got) + } +} + +// TestAggregator_OverflowBucketStatsCorrect ensures that successive +// overflow points for the same tenant accumulate into ONE overflow +// bucket (not many) and that min/max/sum/count update correctly. +func TestAggregator_OverflowBucketStatsCorrect(t *testing.T) { + a := NewAggregator(nil, time.Minute) + a.SetCardinalityLimit(0, 1, func(string) {}) + + // First point fills the per-tenant budget. + first := newRawMetric("t", "svc", "metric-base", 0) + first.Value = 5.0 + a.Ingest(first) + + // Three more points overflow — they should all merge into ONE + // overflow bucket, not each create a fresh one. + for i := range 3 { + m := newRawMetric("t", "svc", "metric-overflow-"+strconv.Itoa(i), i) + m.Value = float64(10 + i) + a.Ingest(m) + } + + // 1 normal + 1 overflow = 2 buckets. + if got := a.BucketCount(); got != 2 { + t.Fatalf("BucketCount=%d, want 2", got) + } +} diff --git a/main.go b/main.go index 70eb067..ce7279d 100644 --- a/main.go +++ b/main.go @@ -283,11 +283,19 @@ func main() { // 4c. Initialize TSDB Aggregator + Ring Buffer tsdbAgg := tsdb.NewAggregator(repo, 30*time.Second) - if cfg.MetricMaxCardinality > 0 { - tsdbAgg.SetCardinalityLimit(cfg.MetricMaxCardinality, func() { + if cfg.MetricMaxCardinality > 0 || cfg.MetricMaxCardinalityPerTenant > 0 { + tsdbAgg.SetCardinalityLimit(cfg.MetricMaxCardinality, cfg.MetricMaxCardinalityPerTenant, func(tenantID string) { + // Maintain the legacy unlabeled counter for back-compat dashboards + // AND emit the labeled by-tenant counter for fairness diagnostics. metrics.TSDBCardinalityOverflow.Inc() + if metrics.TSDBCardinalityOverflowByTenant != nil { + metrics.TSDBCardinalityOverflowByTenant.WithLabelValues(tenantID).Inc() + } }) - slog.Info("📈 TSDB cardinality limit set", "max", cfg.MetricMaxCardinality) + slog.Info("📈 TSDB cardinality limits configured", + "global_max", cfg.MetricMaxCardinality, + "per_tenant_max", cfg.MetricMaxCardinalityPerTenant, + ) } tsdbAgg.SetMetrics( func() { metrics.TSDBIngestTotal.Inc() },