From d99de5ed0b16d2b05bfa4f3d134efc24df774bcc Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Tue, 28 Apr 2026 00:24:46 +0000 Subject: [PATCH] fix(post-review): M/L cleanup from deep code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five small follow-ups from the second-pass review of PRs #49–#55: - tsdb: fire cardinality-overflow callback AFTER releasing the Aggregator mutex. The callback is currently a Prometheus increment (atomic) but holding mu across an external function call is a footgun for any future hook. Capture the tenant under lock; invoke after Unlock. - storage: use errors.Is(err, sql.ErrNoRows) in pgLogsRelkind instead of strings.Contains(err.Error(), "no rows"). Robust against driver wrapping. - storage: convert Repository.logsPartitioned from plain bool to atomic.Bool. Removes the memory-model fragility of "the writer ran first" — read by retention.go from a separate goroutine. - config: reject negative MCP_MAX_CONCURRENT / MCP_CALL_TIMEOUT_MS / MCP_CACHE_TTL_MS at Validate(). 0 stays the documented "disable" sentinel; negatives are typos that should fail loud. - mcp: upgrade SetCallLimit doc to flag it startup-only — runtime resize leaks a slot in the old channel. Skipped (with rationale, not silently dropped): - M1 Submit TOCTOU on closed pipeline — cosmetic only, current ordering is documented. - M2 ring/onIngest setter races — would require API change to fix properly; benign during normal startup-only usage. - M4 FTS5 trigger throughput — needs a bulk-rebuild path, not a one-line tweak. - M5 isQueueFull scope — hypothetical concern with no observed symptom; revisit only if metrics show drift. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/config/config.go | 13 +++++++++++++ internal/mcp/server.go | 10 +++++++--- internal/storage/partitions.go | 7 +++++-- internal/storage/repository.go | 15 +++++++++++---- internal/tsdb/aggregator.go | 22 +++++++++++++++------- 5 files changed, 51 insertions(+), 16 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index f8e57f2..4724b7d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -378,6 +378,19 @@ func (c *Config) Validate() error { return fmt.Errorf("DB_PARTITION_LOOKAHEAD_DAYS must be between 0 and 365, got %d", c.DBPartitionLookaheadDays) } + // MCP robustness knobs. 0 is the documented sentinel for "disable" on + // each axis; negative values are nonsensical (clamping to 0 silently + // would mask typos like MCP_MAX_CONCURRENT=-1). Reject explicitly. + if c.MCPMaxConcurrent < 0 { + return fmt.Errorf("MCP_MAX_CONCURRENT must be >= 0 (0 disables the cap), got %d", c.MCPMaxConcurrent) + } + if c.MCPCallTimeoutMs < 0 { + return fmt.Errorf("MCP_CALL_TIMEOUT_MS must be >= 0 (0 disables the deadline), got %d", c.MCPCallTimeoutMs) + } + if c.MCPCacheTTLMs < 0 { + return fmt.Errorf("MCP_CACHE_TTL_MS must be >= 0 (0 disables the cache), got %d", c.MCPCacheTTLMs) + } + // Numeric ranges. // Upper bound on HOT_RETENTION_DAYS guards against int64 nanosecond overflow in // time.Duration(days) * 24 * time.Hour (overflow above ~106751 days flips the diff --git a/internal/mcp/server.go b/internal/mcp/server.go index 3377ade..098271c 100644 --- a/internal/mcp/server.go +++ b/internal/mcp/server.go @@ -121,9 +121,13 @@ func New( } // SetCallLimit configures the maximum number of concurrent tools/call -// invocations. <= 0 disables the cap (legacy behavior). Subsequent calls -// resize the underlying semaphore — be aware that an in-flight call holds -// a slot of the previous size; the new size only governs new acquisitions. +// invocations. <= 0 disables the cap (legacy behavior). +// +// Startup-only: this swaps the underlying channel reference without +// quiescing in-flight callers. An already-running call will release into +// the OLD channel when it completes, leaving the NEW semaphore one slot +// short until process restart. Call exactly once during construction +// (main.go does); never from a request-handling goroutine. func (s *Server) SetCallLimit(maxConcurrent int) { if maxConcurrent <= 0 { s.callSlots = nil diff --git a/internal/storage/partitions.go b/internal/storage/partitions.go index 81b077b..112e48b 100644 --- a/internal/storage/partitions.go +++ b/internal/storage/partitions.go @@ -2,6 +2,8 @@ package storage import ( "context" + "database/sql" + "errors" "fmt" "log/slog" "strings" @@ -245,8 +247,9 @@ func pgLogsRelkind(db *gorm.DB) (string, error) { var relkind string row := db.Raw(`SELECT relkind::text FROM pg_class WHERE relname = 'logs' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = current_schema())`).Row() if err := row.Scan(&relkind); err != nil { - // sql.ErrNoRows path — the table doesn't exist yet. - if strings.Contains(err.Error(), "no rows") { + // "table doesn't exist yet" path — sql.ErrNoRows is the standard + // signal here, not a string match against the message. + if errors.Is(err, sql.ErrNoRows) { return "", nil } return "", err diff --git a/internal/storage/repository.go b/internal/storage/repository.go index 15a4fcf..d190662 100644 --- a/internal/storage/repository.go +++ b/internal/storage/repository.go @@ -7,6 +7,7 @@ import ( "os" "strconv" "strings" + "sync/atomic" "time" "github.com/RandomCodeSpace/otelcontext/internal/telemetry" @@ -69,17 +70,23 @@ type Repository struct { // active and the `logs` parent has been provisioned as a partitioned // table. RetentionScheduler reads this to skip the logs DELETE — the // PartitionScheduler does retention via DROP PARTITION instead. - logsPartitioned bool + // + // Stored as atomic.Bool: written once during NewRepository (before any + // goroutine reads it) and read by retention.go from a different + // goroutine. atomic.Bool removes the memory-model fragility of a plain + // bool that "works because the writer ran first" — no test catches a + // torn read on amd64, but the contract is brittle. + logsPartitioned atomic.Bool } // LogsPartitioned reports whether the `logs` table is provisioned as a // declarative partitioned parent. Used by RetentionScheduler to bypass the // row-level DELETE path when partition-level DROP is in charge of retention. -func (r *Repository) LogsPartitioned() bool { return r.logsPartitioned } +func (r *Repository) LogsPartitioned() bool { return r.logsPartitioned.Load() } // MarkLogsPartitioned flips the partitioned flag. Called by the partitioning // setup path (factory.go) once the partitioned schema is in place. -func (r *Repository) MarkLogsPartitioned() { r.logsPartitioned = true } +func (r *Repository) MarkLogsPartitioned() { r.logsPartitioned.Store(true) } // NewRepository initializes the database connection using environment variables and migrates the schema. func NewRepository(metrics *telemetry.Metrics) (*Repository, error) { @@ -140,7 +147,7 @@ func NewRepository(metrics *telemetry.Metrics) (*Repository, error) { // half-applied migration. if driver == "postgres" || driver == "postgresql" { if rk, err := pgLogsRelkind(db); err == nil && rk == "p" { - repo.logsPartitioned = true + repo.logsPartitioned.Store(true) slog.Info("📦 Postgres: logs is partitioned — retention will use DROP PARTITION (via PartitionScheduler)") } } diff --git a/internal/tsdb/aggregator.go b/internal/tsdb/aggregator.go index 4fab796..5b11139 100644 --- a/internal/tsdb/aggregator.go +++ b/internal/tsdb/aggregator.go @@ -168,8 +168,14 @@ func (a *Aggregator) Ingest(m RawMetric) { a.onIngest() } + // Capture overflow tenant under the lock; fire the callback AFTER + // unlock so a slow callback can't serialize Ingest. Currently the + // callback is a Prometheus increment (atomic, fast) but holding the + // lock across an external function call is a footgun for future + // changes. + var overflowTenant string + a.mu.Lock() - defer a.mu.Unlock() bucket, exists := a.buckets[key] if !exists { @@ -186,9 +192,7 @@ func (a *Aggregator) Ingest(m RawMetric) { switch { case overTenantCap: - if a.cardinalityOverflow != nil { - a.cardinalityOverflow(m.TenantID) - } + overflowTenant = m.TenantID key = a.overflowKey + "|" + m.TenantID bucket = a.buckets[key] if bucket == nil { @@ -207,9 +211,7 @@ func (a *Aggregator) Ingest(m RawMetric) { } // Fall through to update existing overflow bucket below. case overGlobalCap: - if a.cardinalityOverflow != nil { - a.cardinalityOverflow(overflowSentinelGlobal) - } + overflowTenant = overflowSentinelGlobal key = a.overflowKey bucket = a.buckets[key] if bucket == nil { @@ -242,6 +244,7 @@ func (a *Aggregator) Ingest(m RawMetric) { } a.buckets[key] = bucket a.seriesPerTenant[m.TenantID]++ + a.mu.Unlock() return } } @@ -254,6 +257,11 @@ func (a *Aggregator) Ingest(m RawMetric) { } bucket.Sum += m.Value bucket.Count++ + a.mu.Unlock() + + if overflowTenant != "" && a.cardinalityOverflow != nil { + a.cardinalityOverflow(overflowTenant) + } } // BucketCount returns the current number of in-memory buckets (for metrics/health).