diff --git a/internal/config/config.go b/internal/config/config.go index f8e57f2..4724b7d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -378,6 +378,19 @@ func (c *Config) Validate() error { return fmt.Errorf("DB_PARTITION_LOOKAHEAD_DAYS must be between 0 and 365, got %d", c.DBPartitionLookaheadDays) } + // MCP robustness knobs. 0 is the documented sentinel for "disable" on + // each axis; negative values are nonsensical (clamping to 0 silently + // would mask typos like MCP_MAX_CONCURRENT=-1). Reject explicitly. + if c.MCPMaxConcurrent < 0 { + return fmt.Errorf("MCP_MAX_CONCURRENT must be >= 0 (0 disables the cap), got %d", c.MCPMaxConcurrent) + } + if c.MCPCallTimeoutMs < 0 { + return fmt.Errorf("MCP_CALL_TIMEOUT_MS must be >= 0 (0 disables the deadline), got %d", c.MCPCallTimeoutMs) + } + if c.MCPCacheTTLMs < 0 { + return fmt.Errorf("MCP_CACHE_TTL_MS must be >= 0 (0 disables the cache), got %d", c.MCPCacheTTLMs) + } + // Numeric ranges. // Upper bound on HOT_RETENTION_DAYS guards against int64 nanosecond overflow in // time.Duration(days) * 24 * time.Hour (overflow above ~106751 days flips the diff --git a/internal/mcp/server.go b/internal/mcp/server.go index 3377ade..098271c 100644 --- a/internal/mcp/server.go +++ b/internal/mcp/server.go @@ -121,9 +121,13 @@ func New( } // SetCallLimit configures the maximum number of concurrent tools/call -// invocations. <= 0 disables the cap (legacy behavior). Subsequent calls -// resize the underlying semaphore — be aware that an in-flight call holds -// a slot of the previous size; the new size only governs new acquisitions. +// invocations. <= 0 disables the cap (legacy behavior). +// +// Startup-only: this swaps the underlying channel reference without +// quiescing in-flight callers. An already-running call will release into +// the OLD channel when it completes, leaving the NEW semaphore one slot +// short until process restart. Call exactly once during construction +// (main.go does); never from a request-handling goroutine. func (s *Server) SetCallLimit(maxConcurrent int) { if maxConcurrent <= 0 { s.callSlots = nil diff --git a/internal/storage/partitions.go b/internal/storage/partitions.go index 81b077b..112e48b 100644 --- a/internal/storage/partitions.go +++ b/internal/storage/partitions.go @@ -2,6 +2,8 @@ package storage import ( "context" + "database/sql" + "errors" "fmt" "log/slog" "strings" @@ -245,8 +247,9 @@ func pgLogsRelkind(db *gorm.DB) (string, error) { var relkind string row := db.Raw(`SELECT relkind::text FROM pg_class WHERE relname = 'logs' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = current_schema())`).Row() if err := row.Scan(&relkind); err != nil { - // sql.ErrNoRows path — the table doesn't exist yet. - if strings.Contains(err.Error(), "no rows") { + // "table doesn't exist yet" path — sql.ErrNoRows is the standard + // signal here, not a string match against the message. + if errors.Is(err, sql.ErrNoRows) { return "", nil } return "", err diff --git a/internal/storage/repository.go b/internal/storage/repository.go index 15a4fcf..d190662 100644 --- a/internal/storage/repository.go +++ b/internal/storage/repository.go @@ -7,6 +7,7 @@ import ( "os" "strconv" "strings" + "sync/atomic" "time" "github.com/RandomCodeSpace/otelcontext/internal/telemetry" @@ -69,17 +70,23 @@ type Repository struct { // active and the `logs` parent has been provisioned as a partitioned // table. RetentionScheduler reads this to skip the logs DELETE — the // PartitionScheduler does retention via DROP PARTITION instead. - logsPartitioned bool + // + // Stored as atomic.Bool: written once during NewRepository (before any + // goroutine reads it) and read by retention.go from a different + // goroutine. atomic.Bool removes the memory-model fragility of a plain + // bool that "works because the writer ran first" — no test catches a + // torn read on amd64, but the contract is brittle. + logsPartitioned atomic.Bool } // LogsPartitioned reports whether the `logs` table is provisioned as a // declarative partitioned parent. Used by RetentionScheduler to bypass the // row-level DELETE path when partition-level DROP is in charge of retention. -func (r *Repository) LogsPartitioned() bool { return r.logsPartitioned } +func (r *Repository) LogsPartitioned() bool { return r.logsPartitioned.Load() } // MarkLogsPartitioned flips the partitioned flag. Called by the partitioning // setup path (factory.go) once the partitioned schema is in place. -func (r *Repository) MarkLogsPartitioned() { r.logsPartitioned = true } +func (r *Repository) MarkLogsPartitioned() { r.logsPartitioned.Store(true) } // NewRepository initializes the database connection using environment variables and migrates the schema. func NewRepository(metrics *telemetry.Metrics) (*Repository, error) { @@ -140,7 +147,7 @@ func NewRepository(metrics *telemetry.Metrics) (*Repository, error) { // half-applied migration. if driver == "postgres" || driver == "postgresql" { if rk, err := pgLogsRelkind(db); err == nil && rk == "p" { - repo.logsPartitioned = true + repo.logsPartitioned.Store(true) slog.Info("📦 Postgres: logs is partitioned — retention will use DROP PARTITION (via PartitionScheduler)") } } diff --git a/internal/tsdb/aggregator.go b/internal/tsdb/aggregator.go index 4fab796..5b11139 100644 --- a/internal/tsdb/aggregator.go +++ b/internal/tsdb/aggregator.go @@ -168,8 +168,14 @@ func (a *Aggregator) Ingest(m RawMetric) { a.onIngest() } + // Capture overflow tenant under the lock; fire the callback AFTER + // unlock so a slow callback can't serialize Ingest. Currently the + // callback is a Prometheus increment (atomic, fast) but holding the + // lock across an external function call is a footgun for future + // changes. + var overflowTenant string + a.mu.Lock() - defer a.mu.Unlock() bucket, exists := a.buckets[key] if !exists { @@ -186,9 +192,7 @@ func (a *Aggregator) Ingest(m RawMetric) { switch { case overTenantCap: - if a.cardinalityOverflow != nil { - a.cardinalityOverflow(m.TenantID) - } + overflowTenant = m.TenantID key = a.overflowKey + "|" + m.TenantID bucket = a.buckets[key] if bucket == nil { @@ -207,9 +211,7 @@ func (a *Aggregator) Ingest(m RawMetric) { } // Fall through to update existing overflow bucket below. case overGlobalCap: - if a.cardinalityOverflow != nil { - a.cardinalityOverflow(overflowSentinelGlobal) - } + overflowTenant = overflowSentinelGlobal key = a.overflowKey bucket = a.buckets[key] if bucket == nil { @@ -242,6 +244,7 @@ func (a *Aggregator) Ingest(m RawMetric) { } a.buckets[key] = bucket a.seriesPerTenant[m.TenantID]++ + a.mu.Unlock() return } } @@ -254,6 +257,11 @@ func (a *Aggregator) Ingest(m RawMetric) { } bucket.Sum += m.Value bucket.Count++ + a.mu.Unlock() + + if overflowTenant != "" && a.cardinalityOverflow != nil { + a.cardinalityOverflow(overflowTenant) + } } // BucketCount returns the current number of in-memory buckets (for metrics/health).