Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,19 @@ func (c *Config) Validate() error {
return fmt.Errorf("DB_PARTITION_LOOKAHEAD_DAYS must be between 0 and 365, got %d", c.DBPartitionLookaheadDays)
}

// MCP robustness knobs. 0 is the documented sentinel for "disable" on
// each axis; negative values are nonsensical (clamping to 0 silently
// would mask typos like MCP_MAX_CONCURRENT=-1). Reject explicitly.
if c.MCPMaxConcurrent < 0 {
return fmt.Errorf("MCP_MAX_CONCURRENT must be >= 0 (0 disables the cap), got %d", c.MCPMaxConcurrent)
}
if c.MCPCallTimeoutMs < 0 {
return fmt.Errorf("MCP_CALL_TIMEOUT_MS must be >= 0 (0 disables the deadline), got %d", c.MCPCallTimeoutMs)
}
if c.MCPCacheTTLMs < 0 {
return fmt.Errorf("MCP_CACHE_TTL_MS must be >= 0 (0 disables the cache), got %d", c.MCPCacheTTLMs)
}

// Numeric ranges.
// Upper bound on HOT_RETENTION_DAYS guards against int64 nanosecond overflow in
// time.Duration(days) * 24 * time.Hour (overflow above ~106751 days flips the
Expand Down
10 changes: 7 additions & 3 deletions internal/mcp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,13 @@ func New(
}

// SetCallLimit configures the maximum number of concurrent tools/call
// invocations. <= 0 disables the cap (legacy behavior). Subsequent calls
// resize the underlying semaphore — be aware that an in-flight call holds
// a slot of the previous size; the new size only governs new acquisitions.
// invocations. <= 0 disables the cap (legacy behavior).
//
// Startup-only: this swaps the underlying channel reference without
// quiescing in-flight callers. An already-running call will release into
// the OLD channel when it completes, leaving the NEW semaphore one slot
// short until process restart. Call exactly once during construction
// (main.go does); never from a request-handling goroutine.
func (s *Server) SetCallLimit(maxConcurrent int) {
if maxConcurrent <= 0 {
s.callSlots = nil
Expand Down
7 changes: 5 additions & 2 deletions internal/storage/partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package storage

import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
"strings"
Expand Down Expand Up @@ -245,8 +247,9 @@ func pgLogsRelkind(db *gorm.DB) (string, error) {
var relkind string
row := db.Raw(`SELECT relkind::text FROM pg_class WHERE relname = 'logs' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = current_schema())`).Row()
if err := row.Scan(&relkind); err != nil {
// sql.ErrNoRows path — the table doesn't exist yet.
if strings.Contains(err.Error(), "no rows") {
// "table doesn't exist yet" path — sql.ErrNoRows is the standard
// signal here, not a string match against the message.
if errors.Is(err, sql.ErrNoRows) {
return "", nil
}
return "", err
Expand Down
15 changes: 11 additions & 4 deletions internal/storage/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/RandomCodeSpace/otelcontext/internal/telemetry"
Expand Down Expand Up @@ -69,17 +70,23 @@ type Repository struct {
// active and the `logs` parent has been provisioned as a partitioned
// table. RetentionScheduler reads this to skip the logs DELETE — the
// PartitionScheduler does retention via DROP PARTITION instead.
logsPartitioned bool
//
// Stored as atomic.Bool: written once during NewRepository (before any
// goroutine reads it) and read by retention.go from a different
// goroutine. atomic.Bool removes the memory-model fragility of a plain
// bool that "works because the writer ran first" — no test catches a
// torn read on amd64, but the contract is brittle.
logsPartitioned atomic.Bool
}

// LogsPartitioned reports whether the `logs` table is provisioned as a
// declarative partitioned parent. Used by RetentionScheduler to bypass the
// row-level DELETE path when partition-level DROP is in charge of retention.
func (r *Repository) LogsPartitioned() bool { return r.logsPartitioned }
func (r *Repository) LogsPartitioned() bool { return r.logsPartitioned.Load() }

// MarkLogsPartitioned flips the partitioned flag. Called by the partitioning
// setup path (factory.go) once the partitioned schema is in place.
func (r *Repository) MarkLogsPartitioned() { r.logsPartitioned = true }
func (r *Repository) MarkLogsPartitioned() { r.logsPartitioned.Store(true) }

// NewRepository initializes the database connection using environment variables and migrates the schema.
func NewRepository(metrics *telemetry.Metrics) (*Repository, error) {
Expand Down Expand Up @@ -140,7 +147,7 @@ func NewRepository(metrics *telemetry.Metrics) (*Repository, error) {
// half-applied migration.
if driver == "postgres" || driver == "postgresql" {
if rk, err := pgLogsRelkind(db); err == nil && rk == "p" {
repo.logsPartitioned = true
repo.logsPartitioned.Store(true)
slog.Info("📦 Postgres: logs is partitioned — retention will use DROP PARTITION (via PartitionScheduler)")
}
}
Expand Down
22 changes: 15 additions & 7 deletions internal/tsdb/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,14 @@ func (a *Aggregator) Ingest(m RawMetric) {
a.onIngest()
}

// Capture overflow tenant under the lock; fire the callback AFTER
// unlock so a slow callback can't serialize Ingest. Currently the
// callback is a Prometheus increment (atomic, fast) but holding the
// lock across an external function call is a footgun for future
// changes.
var overflowTenant string

a.mu.Lock()
defer a.mu.Unlock()

bucket, exists := a.buckets[key]
if !exists {
Expand All @@ -186,9 +192,7 @@ func (a *Aggregator) Ingest(m RawMetric) {

switch {
case overTenantCap:
if a.cardinalityOverflow != nil {
a.cardinalityOverflow(m.TenantID)
}
overflowTenant = m.TenantID
key = a.overflowKey + "|" + m.TenantID
bucket = a.buckets[key]
if bucket == nil {
Expand All @@ -207,9 +211,7 @@ func (a *Aggregator) Ingest(m RawMetric) {
}
// Fall through to update existing overflow bucket below.
case overGlobalCap:
if a.cardinalityOverflow != nil {
a.cardinalityOverflow(overflowSentinelGlobal)
}
overflowTenant = overflowSentinelGlobal
key = a.overflowKey
bucket = a.buckets[key]
if bucket == nil {
Expand Down Expand Up @@ -242,6 +244,7 @@ func (a *Aggregator) Ingest(m RawMetric) {
}
a.buckets[key] = bucket
a.seriesPerTenant[m.TenantID]++
a.mu.Unlock()
return
}
}
Expand All @@ -254,6 +257,11 @@ func (a *Aggregator) Ingest(m RawMetric) {
}
bucket.Sum += m.Value
bucket.Count++
a.mu.Unlock()

if overflowTenant != "" && a.cardinalityOverflow != nil {
a.cardinalityOverflow(overflowTenant)
}
}

// BucketCount returns the current number of in-memory buckets (for metrics/health).
Expand Down
Loading