diff --git a/CLAUDE.md b/CLAUDE.md index 48ca9e4..066d2d4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -218,6 +218,7 @@ Key settings in `internal/config/config.go`: - `INGEST_ASYNC_ENABLED` (true), `INGEST_PIPELINE_QUEUE_SIZE` (50000), `INGEST_PIPELINE_WORKERS` (8) — async ingest pipeline (`internal/ingest/pipeline.go`). Hybrid backpressure: <90% accept all, 90–100% drop healthy batches (errors/slow always pass), 100% return gRPC `RESOURCE_EXHAUSTED`. Set `INGEST_ASYNC_ENABLED=false` to revert to synchronous DB writes inside `Export()`. Drops surface as `otelcontext_ingest_pipeline_dropped_total{signal,reason}`. - `GRPC_MAX_RECV_MB` (16), `GRPC_MAX_CONCURRENT_STREAMS` (1000) — OTLP gRPC server caps, validated to 1..256 and 1..1_000_000 - `RETENTION_BATCH_SIZE` (50000), `RETENTION_BATCH_SLEEP_MS` (1) — purge pacing; raise the sleep on busy production DBs +- `DB_POSTGRES_PARTITIONING` (`""`), `DB_PARTITION_LOOKAHEAD_DAYS` (3) — opt-in Postgres declarative range partitioning of the `logs` table by day. When `daily`, `logs` is provisioned as a partitioned parent (greenfield only — refuses to start if `logs` already exists unpartitioned), the `PartitionScheduler` maintains lookahead partitions and drops expired ones via `DROP TABLE`, and `RetentionScheduler` skips the row-level DELETE for `logs`. Watch `otelcontext_partitions_dropped_total` and `otelcontext_partitions_active`. - `APP_ENV` (`"development"`), `OTELCONTEXT_ALLOW_SQLITE_PROD` (false) — SQLite is refused when `APP_ENV=production` unless the allow flag is set ### Authentication diff --git a/docs/OPERATIONS.md b/docs/OPERATIONS.md index 1aaf352..4a8ec67 100644 --- a/docs/OPERATIONS.md +++ b/docs/OPERATIONS.md @@ -115,6 +115,27 @@ SQLite is rejected at startup when `APP_ENV=production` unless you explicitly op **Multi-tenancy.** Every row carries a `tenant_id` column. The write path reads `X-Tenant-ID` (HTTP) or `x-tenant-id` (gRPC metadata) and populates the column. The read path attaches the tenant from the request context to every repository query (`Where("tenant_id = ?", ...)`). +### Postgres declarative partitioning (opt-in) + +| Setting | Default | When to enable | +|---|---|---| +| `DB_POSTGRES_PARTITIONING` | `""` (off) | High-volume Postgres deployments where row-level retention DELETE on `logs` becomes the bottleneck | +| `DB_PARTITION_LOOKAHEAD_DAYS` | `3` | Future daily partitions to keep staged. Raise if your DB is far from the app or your retention is short | + +When `DB_POSTGRES_PARTITIONING=daily`: + +- `logs` is provisioned as a `RANGE PARTITION BY (timestamp)` parent with the composite PK `(id, timestamp)`. AutoMigrate sees an existing table and skips the model. +- Initial partitions cover yesterday + today + `lookahead` future days. Yesterday absorbs late-arriving events at the day-boundary rollover. +- The PartitionScheduler runs hourly, idempotently ensures upcoming partitions exist, and drops any partition whose entire range predates the retention cutoff. Drop is `DROP TABLE IF EXISTS ` — orders of magnitude faster than the row-by-row DELETE. +- RetentionScheduler skips its `logs` DELETE branch when partitioning is active. `traces` and `metric_buckets` continue to use the existing batched DELETE path. +- `pg_trgm` GIN indexes on `logs.body` and `logs.service_name` are declared on the parent and propagate automatically to current and future partitions. + +**Greenfield only.** Startup refuses to enable partitioning if `logs` already exists as a non-partitioned table — migrating an unpartitioned `logs` to a partitioned one requires copying rows into a swapped table and is out of scope for the current phase. Drop the table or migrate manually before flipping the flag. + +Telemetry: +- `otelcontext_partitions_dropped_total` — increments by `n` when the scheduler drops `n` partitions on a tick +- `otelcontext_partitions_active` — current partition count attached to the parent. Steady-state ≈ `HOT_RETENTION_DAYS + DB_PARTITION_LOOKAHEAD_DAYS + 1`. Alert when this gauge climbs unbounded (drop loop stuck) or falls toward zero (over-aggressive drop) + ### Log search index | Driver | Index | Ranking | diff --git a/internal/config/config.go b/internal/config/config.go index 469848c..4c7888d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -30,6 +30,19 @@ type Config struct { DBMaxIdleConns int DBConnMaxLifetime string // e.g. "1h", "30m" + // Postgres-only opt-in: declarative range partitioning of the logs table by + // day. When set to "daily", AutoMigrate provisions logs as a partitioned + // table and the PartitionScheduler creates lookahead partitions and drops + // expired ones (DROP PARTITION beats DELETE for retention by orders of + // magnitude). Greenfield only — startup refuses if `logs` already exists + // as a non-partitioned table. Empty / "none" = legacy unpartitioned schema. + DBPostgresPartitioning string + + // Number of future daily partitions to maintain ahead of "today" when + // DBPostgresPartitioning=daily. Defaults to 3. Tune up if your retention + // policy is short and ingest spikes around a daily boundary. + DBPartitionLookaheadDays int + // Retention HotRetentionDays int @@ -186,6 +199,10 @@ func Load(customPath string) (*Config, error) { DBMaxIdleConns: getEnvInt("DB_MAX_IDLE_CONNS", 10), DBConnMaxLifetime: getEnv("DB_CONN_MAX_LIFETIME", "1h"), + // Postgres partitioning (opt-in). Default empty = legacy unpartitioned. + DBPostgresPartitioning: strings.ToLower(strings.TrimSpace(getEnv("DB_POSTGRES_PARTITIONING", ""))), + DBPartitionLookaheadDays: getEnvInt("DB_PARTITION_LOOKAHEAD_DAYS", 3), + // Retention HotRetentionDays: getEnvInt("HOT_RETENTION_DAYS", 7), RetentionBatchSize: getEnvInt("RETENTION_BATCH_SIZE", 50000), @@ -324,6 +341,27 @@ func (c *Config) Validate() error { return fmt.Errorf("invalid DB_DRIVER %q: must be one of sqlite, postgres, mysql, mssql", c.DBDriver) } + // Partitioning is Postgres-only. Reject mismatched configs at startup so + // the operator finds out immediately rather than silently running in + // unpartitioned mode. + switch c.DBPostgresPartitioning { + case "", "none", "daily": + // ok + default: + return fmt.Errorf("invalid DB_POSTGRES_PARTITIONING %q: must be one of \"\", \"none\", \"daily\"", c.DBPostgresPartitioning) + } + if c.DBPostgresPartitioning == "daily" { + drv := strings.ToLower(c.DBDriver) + if drv != "postgres" && drv != "postgresql" { + return fmt.Errorf("DB_POSTGRES_PARTITIONING=daily requires DB_DRIVER=postgres, got %q", c.DBDriver) + } + } + // 0 == "use default at the storage layer" so direct struct construction + // (tests, embedded callers) doesn't have to set it. + if c.DBPartitionLookaheadDays < 0 || c.DBPartitionLookaheadDays > 365 { + return fmt.Errorf("DB_PARTITION_LOOKAHEAD_DAYS must be between 0 and 365, got %d", c.DBPartitionLookaheadDays) + } + // Numeric ranges. // Upper bound on HOT_RETENTION_DAYS guards against int64 nanosecond overflow in // time.Duration(days) * 24 * time.Hour (overflow above ~106751 days flips the diff --git a/internal/storage/factory.go b/internal/storage/factory.go index 56109ec..3de6838 100644 --- a/internal/storage/factory.go +++ b/internal/storage/factory.go @@ -184,7 +184,33 @@ func getEnvPoolDuration(key string, fallback time.Duration) time.Duration { } // AutoMigrateModels runs GORM auto-migration for all OtelContext models. +// +// When DB_POSTGRES_PARTITIONING=daily, the `logs` table is provisioned as a +// declarative range-partitioned parent BEFORE GORM AutoMigrate runs, so +// AutoMigrate sees an existing table and skips it (GORM's IF NOT EXISTS +// behaviour). This is greenfield-only — see setupPostgresPartitionedLogs for +// the safety check. func AutoMigrateModels(db *gorm.DB, driver string) error { + return AutoMigrateModelsWithOptions(db, driver, MigrateOptions{}) +} + +// MigrateOptions tunes AutoMigrateModelsWithOptions. Empty zero value +// preserves the legacy AutoMigrateModels behaviour. +type MigrateOptions struct { + // PostgresPartitioning, when "daily", provisions `logs` as a partitioned + // table. Any other value (including the empty string) keeps the legacy + // unpartitioned schema. + PostgresPartitioning string + // PartitionLookaheadDays is the number of future daily partitions to + // pre-create at boot. Defaults to 3 when zero. + PartitionLookaheadDays int +} + +// AutoMigrateModelsWithOptions is the option-driven variant of +// AutoMigrateModels. Existing callers should continue to use AutoMigrateModels +// — the options entry point is for new wiring (currently main.go) that needs +// to plumb the partitioning flag. +func AutoMigrateModelsWithOptions(db *gorm.DB, driver string, opts MigrateOptions) error { driver = strings.ToLower(driver) // Disable FK checks during migration for MySQL. @@ -197,7 +223,26 @@ func AutoMigrateModels(db *gorm.DB, driver string) error { log.Println("🔓 Disabled foreign key checks for migration") } - if err := db.AutoMigrate(&Trace{}, &Span{}, &Log{}, &MetricBucket{}); err != nil { + // Postgres partitioning: provision the partitioned `logs` parent + initial + // daily partitions BEFORE GORM AutoMigrate runs, and skip Log from + // AutoMigrate's slice. AutoMigrate would otherwise try to ALTER the + // timestamp column (because the model tag doesn't carry an explicit + // `not null` and the partitioned PK forces NOT NULL on the column), + // which Postgres rejects because the column is part of the partition key. + logsPartitioned := false + if (driver == "postgres" || driver == "postgresql") && opts.PostgresPartitioning == PartitioningModeDaily { + if err := setupPostgresPartitionedLogs(db, opts.PartitionLookaheadDays); err != nil { + return fmt.Errorf("setup partitioned logs: %w", err) + } + log.Printf("📦 Postgres: declarative partitioning enabled (daily, lookahead=%d days)", opts.PartitionLookaheadDays) + logsPartitioned = true + } + + migrateModels := []any{&Trace{}, &Span{}, &MetricBucket{}} + if !logsPartitioned { + migrateModels = append(migrateModels, &Log{}) + } + if err := db.AutoMigrate(migrateModels...); err != nil { return fmt.Errorf("failed to migrate database: %w", err) } diff --git a/internal/storage/partitions.go b/internal/storage/partitions.go new file mode 100644 index 0000000..81b077b --- /dev/null +++ b/internal/storage/partitions.go @@ -0,0 +1,302 @@ +package storage + +import ( + "context" + "fmt" + "log/slog" + "strings" + "time" + + "gorm.io/gorm" +) + +// Postgres declarative partitioning for high-volume tables. +// +// Phase 3b restricts partitioning to the `logs` table — the only table whose +// retention purge dominates DB time at 7+ days × 100–200 services. Future +// phases can extend the same pattern to `traces` and `metric_buckets` if/when +// their retention costs justify the extra schema complexity. +// +// Design choices: +// +// - Daily range partitions on `timestamp`. Daily granularity gives us +// DROP PARTITION as a near-instant retention path (vs. row-by-row +// DELETE) while keeping partition count bounded at ~retention + a few +// days of lookahead — well below Postgres' soft ceiling for +// query-planning overhead. +// +// - Composite primary key (id, timestamp). Postgres requires the +// partition key to be in every unique/PK constraint of a partitioned +// table. `id` alone would error at DDL time. +// +// - Indexes are created on the partitioned PARENT and propagate to all +// current + future partitions automatically (Postgres ≥ 11). pg_trgm +// GIN indexes also propagate this way. +// +// - Greenfield only: if `logs` already exists as a non-partitioned table, +// setupPostgresPartitionedLogs refuses to start. Migrating an existing +// unpartitioned `logs` to partitioned requires moving data into a +// swapped table — out of scope for this phase per the board ruling. + +// PartitioningModeDaily is the canonical opt-in value for daily partitioning. +const PartitioningModeDaily = "daily" + +// dailyPartitionPrefix is the table-name prefix used for partition children +// (e.g. logs_2026_04_27). Kept package-private to discourage callers from +// constructing names by hand — use partitionNameForDay. +const dailyPartitionPrefix = "logs_" + +// partitionNameForDay returns the deterministic partition table name for the +// given UTC day. Format: `logs_YYYY_MM_DD`. Always normalized to UTC so two +// nodes with different local TZs converge on the same name. +func partitionNameForDay(day time.Time) string { + d := day.UTC() + return fmt.Sprintf("%s%04d_%02d_%02d", dailyPartitionPrefix, d.Year(), int(d.Month()), d.Day()) +} + +// setupPostgresPartitionedLogs provisions the partitioned `logs` parent table +// and an initial set of daily partitions covering [today - 1 day, today + +// lookaheadDays]. The trailing -1 day cushion absorbs clock skew on +// out-of-order ingest at the start-of-day rollover. +// +// Idempotent: if `logs` already exists and is partitioned, it is left +// untouched and we just top up partitions and indexes. If `logs` exists and +// is NOT partitioned, the function returns an error so the operator can +// migrate manually. +func setupPostgresPartitionedLogs(db *gorm.DB, lookaheadDays int) error { + if lookaheadDays < 1 { + lookaheadDays = 3 + } + + relkind, err := pgLogsRelkind(db) + if err != nil { + return fmt.Errorf("inspect logs relkind: %w", err) + } + switch relkind { + case "": + // Fresh DB — create the partitioned parent table with the same + // columns GORM would have created, plus a composite PK that + // includes the partition key. + if err := db.Exec(` + CREATE TABLE logs ( + id BIGSERIAL, + tenant_id VARCHAR(64) NOT NULL DEFAULT 'default', + trace_id VARCHAR(32), + span_id VARCHAR(16), + severity VARCHAR(50), + body TEXT, + service_name VARCHAR(255), + attributes_json BYTEA, + ai_insight BYTEA, + timestamp TIMESTAMPTZ NOT NULL, + PRIMARY KEY (id, timestamp) + ) PARTITION BY RANGE (timestamp)`).Error; err != nil { + return fmt.Errorf("create partitioned logs: %w", err) + } + slog.Info("📦 Postgres: created partitioned logs table (RANGE on timestamp, daily)") + case "p": + // Already partitioned — accept and continue. + case "r", "v", "m", "f", "t", "I": + return fmt.Errorf("logs table already exists as a non-partitioned object (relkind=%q); DB_POSTGRES_PARTITIONING=daily is greenfield-only — drop the table or migrate before retrying", relkind) + default: + return fmt.Errorf("logs table has unexpected relkind=%q", relkind) + } + + // Indexes on the parent — auto-cascade to children. + parentIndexes := []string{ + `CREATE INDEX IF NOT EXISTS idx_logs_tenant_ts ON logs (tenant_id, timestamp DESC)`, + `CREATE INDEX IF NOT EXISTS idx_logs_tenant_service ON logs (tenant_id, service_name)`, + `CREATE INDEX IF NOT EXISTS idx_logs_tenant_severity ON logs (tenant_id, severity)`, + `CREATE INDEX IF NOT EXISTS idx_logs_trace_id ON logs (trace_id)`, + `CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs (timestamp)`, + } + for _, ddl := range parentIndexes { + if err := db.Exec(ddl).Error; err != nil { + return fmt.Errorf("create parent index: %w", err) + } + } + + // pg_trgm indexes — propagate from parent to partitions only if the + // extension is present, so we mirror factory.go's verify-then-create + // pattern. Failing this is non-fatal (logs still ingest), so a missing + // extension downgrades log search to seq scan rather than blocking boot. + var trgmPresent int + if catalogErr := db.Raw("SELECT 1 FROM pg_extension WHERE extname = 'pg_trgm'").Row().Scan(&trgmPresent); catalogErr == nil && trgmPresent == 1 { + if err := db.Exec(`CREATE INDEX IF NOT EXISTS idx_logs_body_trgm ON logs USING GIN (body gin_trgm_ops)`).Error; err != nil { + slog.Warn("partitioned logs: pg_trgm GIN on body failed", "err", err) + } + if err := db.Exec(`CREATE INDEX IF NOT EXISTS idx_logs_service_trgm ON logs USING GIN (service_name gin_trgm_ops)`).Error; err != nil { + slog.Warn("partitioned logs: pg_trgm GIN on service_name failed", "err", err) + } + } + + // Pre-create initial partitions: + // - yesterday (absorbs late-arriving events at the day rollover) + // - today + // - today+1 ... today+lookaheadDays + // Total = lookaheadDays + 2 partitions. + now := time.Now().UTC() + for i := -1; i <= lookaheadDays; i++ { + day := now.Add(time.Duration(i) * 24 * time.Hour) + if err := EnsureLogsPartitionForDay(db, day); err != nil { + return fmt.Errorf("ensure partition for %s: %w", day.Format("2006-01-02"), err) + } + } + + return nil +} + +// EnsureLogsPartitionForDay creates the daily partition that covers `day` +// (UTC). Idempotent — uses CREATE TABLE IF NOT EXISTS PARTITION OF semantics +// so concurrent boots / scheduler ticks never collide. +func EnsureLogsPartitionForDay(db *gorm.DB, day time.Time) error { + d := day.UTC().Truncate(24 * time.Hour) + upper := d.Add(24 * time.Hour) + name := partitionNameForDay(d) + ddl := fmt.Sprintf( + `CREATE TABLE IF NOT EXISTS %s PARTITION OF logs FOR VALUES FROM ('%s') TO ('%s')`, + quoteIdent(name), + d.Format("2006-01-02 15:04:05+00"), + upper.Format("2006-01-02 15:04:05+00"), + ) + if err := db.Exec(ddl).Error; err != nil { + return fmt.Errorf("create partition %s: %w", name, err) + } + return nil +} + +// EnsureLogsLookahead ensures partitions exist for the next `lookaheadDays` +// starting at "today" (UTC). Returns the count of partitions newly created +// for telemetry. +func EnsureLogsLookahead(db *gorm.DB, lookaheadDays int) (int, error) { + if lookaheadDays < 1 { + lookaheadDays = 1 + } + now := time.Now().UTC() + created := 0 + for i := 0; i <= lookaheadDays; i++ { + day := now.Add(time.Duration(i) * 24 * time.Hour) + // IF NOT EXISTS makes this idempotent; we don't try to detect + // "did it actually create" because the DDL is cheap and the + // observability value is low. + if err := EnsureLogsPartitionForDay(db, day); err != nil { + return created, err + } + created++ + } + return created, nil +} + +// DropExpiredLogsPartitions drops every daily logs partition whose entire +// upper bound is older than `cutoff`. Returns the number of partitions +// dropped. Safe to call repeatedly (no-op when nothing matches). +// +// We use the partition catalog (pg_partitioned_table + pg_inherits) instead +// of guessing names, so partitions created by earlier code paths or operator +// scripts are also covered. +func DropExpiredLogsPartitions(ctx context.Context, db *gorm.DB, cutoff time.Time) (int, error) { + cutoffUTC := cutoff.UTC() + + type row struct { + Name string + Bound string // "FOR VALUES FROM ('...') TO ('...')" + } + + // pg_get_expr unfolds the partition bound expression to text. We parse + // the upper bound out of it and compare to cutoff. + var rows []row + if err := db.WithContext(ctx).Raw(` + SELECT c.relname AS name, + pg_get_expr(c.relpartbound, c.oid) AS bound + FROM pg_class p + JOIN pg_inherits i ON i.inhparent = p.oid + JOIN pg_class c ON c.oid = i.inhrelid + WHERE p.relname = 'logs' + AND p.relkind = 'p' + `).Scan(&rows).Error; err != nil { + return 0, fmt.Errorf("list partitions: %w", err) + } + + dropped := 0 + for _, r := range rows { + upper, ok := parsePartitionUpper(r.Bound) + if !ok { + slog.Debug("partition: unable to parse bound", "name", r.Name, "bound", r.Bound) + continue + } + if !upper.After(cutoffUTC) { + // Entire partition range ends at or before the cutoff — safe + // to drop. Use IF EXISTS so a concurrent drop from another + // scheduler instance doesn't error. + if err := db.WithContext(ctx).Exec(fmt.Sprintf(`DROP TABLE IF EXISTS %s`, quoteIdent(r.Name))).Error; err != nil { + return dropped, fmt.Errorf("drop partition %s: %w", r.Name, err) + } + slog.Info("🗑️ dropped expired logs partition", "name", r.Name, "upper", upper.Format(time.RFC3339)) + dropped++ + } + } + return dropped, nil +} + +// pgLogsRelkind returns the relkind of the `logs` relation, or "" if it does +// not exist. Used to gate the greenfield enforcement and to recognize an +// already-partitioned parent on subsequent boots. +func pgLogsRelkind(db *gorm.DB) (string, error) { + var relkind string + row := db.Raw(`SELECT relkind::text FROM pg_class WHERE relname = 'logs' AND relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = current_schema())`).Row() + if err := row.Scan(&relkind); err != nil { + // sql.ErrNoRows path — the table doesn't exist yet. + if strings.Contains(err.Error(), "no rows") { + return "", nil + } + return "", err + } + return relkind, nil +} + +// parsePartitionUpper extracts the upper bound timestamp from a partition +// bound expression of the form +// `FOR VALUES FROM ('YYYY-MM-DD HH:MM:SS+TZ') TO ('YYYY-MM-DD HH:MM:SS+TZ')`. +// Returns (zero, false) if parsing fails — caller logs and skips. +func parsePartitionUpper(boundExpr string) (time.Time, bool) { + // The expression is generated by Postgres; format is stable. We only + // need the second quoted timestamp. + _, rest, ok := strings.Cut(boundExpr, " TO (") + if !ok { + return time.Time{}, false + } + // rest now begins with "'YYYY-...'"). Pull the bytes between the first + // pair of single quotes. + start := strings.IndexByte(rest, '\'') + if start < 0 { + return time.Time{}, false + } + end := strings.IndexByte(rest[start+1:], '\'') + if end < 0 { + return time.Time{}, false + } + tsStr := rest[start+1 : start+1+end] + // Postgres prints the bound in the session TZ; we ask above queries to + // parse it as RFC3339-ish. Try a few layouts. + layouts := []string{ + "2006-01-02 15:04:05-07", + "2006-01-02 15:04:05+00", + "2006-01-02 15:04:05Z07:00", + "2006-01-02 15:04:05", + } + for _, l := range layouts { + if t, err := time.Parse(l, tsStr); err == nil { + return t.UTC(), true + } + } + return time.Time{}, false +} + +// quoteIdent returns a Postgres-safe quoted identifier. We deliberately keep +// this minimal — partition names are derived from a fixed prefix + UTC date, +// so the only quoting required is wrapping in double quotes and doubling any +// embedded `"`. Using fmt.Sprintf with a raw identifier would expose us to +// SQL injection if a future caller passes user-controlled input. +func quoteIdent(name string) string { + return `"` + strings.ReplaceAll(name, `"`, `""`) + `"` +} diff --git a/internal/storage/partitions_scheduler.go b/internal/storage/partitions_scheduler.go new file mode 100644 index 0000000..f8cbade --- /dev/null +++ b/internal/storage/partitions_scheduler.go @@ -0,0 +1,163 @@ +package storage + +import ( + "context" + "log/slog" + "sync" + "sync/atomic" + "time" + + "gorm.io/gorm" +) + +// PartitionScheduler maintains daily logs partitions on Postgres when +// DB_POSTGRES_PARTITIONING=daily is enabled. Hourly it ensures the next +// `lookaheadDays` partitions exist; daily it drops partitions whose upper +// bound predates the retention cutoff. Both passes are idempotent so a +// stalled tick (or a parallel scheduler from a different replica) is safe. +// +// The scheduler is independent of RetentionScheduler so the legacy DELETE +// path (used for SQLite/MySQL/MSSQL or non-partitioned Postgres) keeps +// running on its own loop. When partitioning is enabled, RetentionScheduler +// SHOULD skip logs — wire that up at construction time, not here. +type PartitionScheduler struct { + repo *Repository + retentionDays int + lookaheadDays int + ensureInterval time.Duration + dropInterval time.Duration + onPartitionDrop func(int) // metric callback: count of partitions dropped + onPartitionKeep func(int) // metric callback: count of partitions still active + + started atomic.Bool + mu sync.Mutex + cancel context.CancelFunc + done chan struct{} +} + +// NewPartitionScheduler constructs a scheduler. retentionDays must match the +// HOT_RETENTION_DAYS setting so DROP PARTITION is the moral equivalent of the +// hourly DELETE-by-age the RetentionScheduler runs for non-partitioned tables. +func NewPartitionScheduler(repo *Repository, retentionDays, lookaheadDays int) *PartitionScheduler { + if retentionDays < 1 { + retentionDays = 7 + } + if lookaheadDays < 1 { + lookaheadDays = 3 + } + return &PartitionScheduler{ + repo: repo, + retentionDays: retentionDays, + lookaheadDays: lookaheadDays, + ensureInterval: 1 * time.Hour, + dropInterval: 1 * time.Hour, + done: make(chan struct{}), + } +} + +// SetMetrics wires telemetry callbacks. Both arguments may be nil. +func (s *PartitionScheduler) SetMetrics(onDrop, onKeep func(int)) { + s.mu.Lock() + defer s.mu.Unlock() + s.onPartitionDrop = onDrop + s.onPartitionKeep = onKeep +} + +// Start kicks off the background loop. It performs an initial ensure+drop +// pass synchronously so a fresh boot has the next-day partition staged +// before any ingest hits it. +func (s *PartitionScheduler) Start(parent context.Context) { + s.mu.Lock() + if s.started.Load() { + s.mu.Unlock() + return + } + ctx, cancel := context.WithCancel(parent) + s.cancel = cancel + s.started.Store(true) + s.mu.Unlock() + + // Initial pass — synchronous so the operator sees the partition layout + // before the binary becomes ready. + s.runEnsure(ctx) + s.runDrop(ctx) + + go s.loop(ctx) +} + +// Stop cancels the loop and waits for it to exit. Safe to call multiple times. +func (s *PartitionScheduler) Stop() { + if !s.started.Load() { + return + } + s.mu.Lock() + cancel := s.cancel + done := s.done + s.mu.Unlock() + if cancel != nil { + cancel() + } + if done != nil { + <-done + } +} + +func (s *PartitionScheduler) loop(ctx context.Context) { + defer close(s.done) + ensureTick := time.NewTicker(s.ensureInterval) + dropTick := time.NewTicker(s.dropInterval) + defer ensureTick.Stop() + defer dropTick.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ensureTick.C: + s.runEnsure(ctx) + case <-dropTick.C: + s.runDrop(ctx) + } + } +} + +func (s *PartitionScheduler) runEnsure(ctx context.Context) { + if ctx.Err() != nil { + return + } + if _, err := EnsureLogsLookahead(s.repo.db.WithContext(ctx), s.lookaheadDays); err != nil { + slog.Error("partition scheduler: ensure failed", "err", err) + } +} + +func (s *PartitionScheduler) runDrop(ctx context.Context) { + if ctx.Err() != nil { + return + } + cutoff := time.Now().UTC().Add(-time.Duration(s.retentionDays) * 24 * time.Hour) + dropped, err := DropExpiredLogsPartitions(ctx, s.repo.db, cutoff) + if err != nil { + slog.Error("partition scheduler: drop failed", "err", err) + return + } + if dropped > 0 && s.onPartitionDrop != nil { + s.onPartitionDrop(dropped) + } + if s.onPartitionKeep != nil { + count, _ := countLogsPartitions(ctx, s.repo.db) + s.onPartitionKeep(count) + } +} + +// countLogsPartitions returns the current number of partitions attached to +// the `logs` parent. Used for the gauge so operators can spot a stuck loop +// (count keeps growing) or an over-aggressive drop (count keeps shrinking). +func countLogsPartitions(ctx context.Context, db *gorm.DB) (int, error) { + var n int + err := db.WithContext(ctx).Raw(` + SELECT COUNT(*) + FROM pg_class p + JOIN pg_inherits i ON i.inhparent = p.oid + WHERE p.relname = 'logs' AND p.relkind = 'p' + `).Row().Scan(&n) + return n, err +} diff --git a/internal/storage/partitions_test.go b/internal/storage/partitions_test.go new file mode 100644 index 0000000..f453b1a --- /dev/null +++ b/internal/storage/partitions_test.go @@ -0,0 +1,86 @@ +package storage + +import ( + "testing" + "time" +) + +func TestPartitionNameForDay_Format(t *testing.T) { + d := time.Date(2026, 4, 27, 14, 30, 0, 0, time.UTC) + got := partitionNameForDay(d) + want := "logs_2026_04_27" + if got != want { + t.Fatalf("partitionNameForDay(%s) = %q, want %q", d, got, want) + } +} + +// Two callers in different timezones must converge on the same partition +// name for the same instant — partitionNameForDay normalizes to UTC. +func TestPartitionNameForDay_NormalizesToUTC(t *testing.T) { + loc, err := time.LoadLocation("America/Los_Angeles") + if err != nil { + t.Skip("LA tzdata not available") + } + utc := time.Date(2026, 4, 27, 6, 0, 0, 0, time.UTC) + la := utc.In(loc) // same instant, different wall clock + if partitionNameForDay(utc) != partitionNameForDay(la) { + t.Fatalf("expected same partition name across TZs: utc=%q la=%q", + partitionNameForDay(utc), partitionNameForDay(la)) + } +} + +func TestQuoteIdent_EscapesEmbeddedQuotes(t *testing.T) { + cases := []struct { + in, want string + }{ + {"logs_2026_04_27", `"logs_2026_04_27"`}, + {`bad"name`, `"bad""name"`}, + {`logs"; DROP TABLE x; --`, `"logs""; DROP TABLE x; --"`}, + } + for _, c := range cases { + if got := quoteIdent(c.in); got != c.want { + t.Fatalf("quoteIdent(%q) = %q, want %q", c.in, got, c.want) + } + } +} + +func TestParsePartitionUpper_ValidLayouts(t *testing.T) { + cases := []struct { + bound string + want string // expected upper, ISO + }{ + { + bound: "FOR VALUES FROM ('2026-04-26 00:00:00+00') TO ('2026-04-27 00:00:00+00')", + want: "2026-04-27T00:00:00Z", + }, + { + bound: "FOR VALUES FROM ('2026-04-26 00:00:00') TO ('2026-04-27 00:00:00')", + want: "2026-04-27T00:00:00Z", + }, + } + for _, c := range cases { + t.Run(c.bound, func(t *testing.T) { + got, ok := parsePartitionUpper(c.bound) + if !ok { + t.Fatalf("parsePartitionUpper(%q) failed", c.bound) + } + if got.UTC().Format(time.RFC3339) != c.want { + t.Fatalf("parsePartitionUpper(%q) = %v, want %s", c.bound, got, c.want) + } + }) + } +} + +func TestParsePartitionUpper_Malformed(t *testing.T) { + cases := []string{ + "", + "FOR VALUES FROM ('2026-04-26 00:00:00+00')", // no TO + "FOR VALUES FROM ('xxx') TO ('not a timestamp')", + "random garbage", + } + for _, c := range cases { + if _, ok := parsePartitionUpper(c); ok { + t.Fatalf("parsePartitionUpper(%q) should have failed", c) + } + } +} diff --git a/internal/storage/pg_partitions_test.go b/internal/storage/pg_partitions_test.go new file mode 100644 index 0000000..f92d0c9 --- /dev/null +++ b/internal/storage/pg_partitions_test.go @@ -0,0 +1,283 @@ +//go:build integration +// +build integration + +// Postgres-backed integration tests for declarative daily partitioning. +// +// Run with: +// +// go test -race -tags=integration ./internal/storage/... +// +// Auto-skips if Docker is unavailable (matches pg_integration_test.go). + +package storage + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/testcontainers/testcontainers-go/modules/postgres" +) + +// setupPGContainerPartitioned boots a Postgres 16 container, runs +// AutoMigrateModelsWithOptions with daily partitioning enabled, and returns a +// repository + teardown closure. Skipped (not failed) when Docker is missing. +func setupPGContainerPartitioned(t *testing.T, lookahead int) (*Repository, func()) { + t.Helper() + ctx := context.Background() + + pgContainer, err := postgres.Run(ctx, "postgres:16-alpine", + postgres.WithDatabase("otel_test"), + postgres.WithUsername("otel"), + postgres.WithPassword("otel"), + postgres.BasicWaitStrategies(), + ) + if err != nil { + t.Skipf("docker unavailable, skipping pg partition tests: %v", err) + } + + dsn, err := pgContainer.ConnectionString(ctx, "sslmode=disable") + if err != nil { + _ = pgContainer.Terminate(ctx) + t.Fatalf("ConnectionString: %v", err) + } + + db, err := NewDatabase("postgres", dsn) + if err != nil { + _ = pgContainer.Terminate(ctx) + t.Fatalf("NewDatabase(postgres): %v", err) + } + if err := AutoMigrateModelsWithOptions(db, "postgres", MigrateOptions{ + PostgresPartitioning: PartitioningModeDaily, + PartitionLookaheadDays: lookahead, + }); err != nil { + _ = pgContainer.Terminate(ctx) + t.Fatalf("AutoMigrateModelsWithOptions: %v", err) + } + repo := NewRepositoryFromDB(db, "postgres") + teardown := func() { + _ = repo.Close() + _ = pgContainer.Terminate(ctx) + } + return repo, teardown +} + +// TestPGPartition_LogsTableIsPartitioned verifies that with the option +// enabled, `logs` is a partitioned (relkind=p) parent and the initial +// partitions are attached. +func TestPGPartition_LogsTableIsPartitioned(t *testing.T) { + repo, teardown := setupPGContainerPartitioned(t, 3) + defer teardown() + + rk, err := pgLogsRelkind(repo.db) + if err != nil { + t.Fatalf("pgLogsRelkind: %v", err) + } + if rk != "p" { + t.Fatalf("logs should be partitioned (relkind=p), got %q", rk) + } + + count, err := countLogsPartitions(context.Background(), repo.db) + if err != nil { + t.Fatalf("countLogsPartitions: %v", err) + } + // yesterday + today + 3 future = 5 + if count < 5 { + t.Fatalf("want >=5 initial partitions; got %d", count) + } +} + +// TestPGPartition_InsertRoutesToCorrectChild verifies that an INSERT goes +// into the correct daily child partition. +func TestPGPartition_InsertRoutesToCorrectChild(t *testing.T) { + repo, teardown := setupPGContainerPartitioned(t, 1) + defer teardown() + + now := time.Now().UTC() + if err := repo.db.Create(&Log{ + Severity: "INFO", + Body: "routed to today", + ServiceName: "api", + Timestamp: now, + }).Error; err != nil { + t.Fatalf("create log: %v", err) + } + + expected := partitionNameForDay(now) + var found int + row := repo.db.Raw(fmt.Sprintf(`SELECT COUNT(*) FROM %s`, quoteIdent(expected))).Row() + if err := row.Scan(&found); err != nil { + t.Fatalf("count partition rows: %v", err) + } + if found != 1 { + t.Fatalf("expected 1 row in partition %s, got %d", expected, found) + } +} + +// TestPGPartition_DropExpired confirms that a partition whose upper bound +// predates the cutoff is dropped, and that other partitions are kept. +func TestPGPartition_DropExpired(t *testing.T) { + repo, teardown := setupPGContainerPartitioned(t, 2) + defer teardown() + + // Stage a partition for 30 days ago — well outside any reasonable retention. + old := time.Now().UTC().Add(-30 * 24 * time.Hour) + if err := EnsureLogsPartitionForDay(repo.db, old); err != nil { + t.Fatalf("ensure old partition: %v", err) + } + beforeName := partitionNameForDay(old) + beforeCount, err := countLogsPartitions(context.Background(), repo.db) + if err != nil { + t.Fatalf("count: %v", err) + } + + cutoff := time.Now().UTC().Add(-7 * 24 * time.Hour) // 7-day retention + dropped, err := DropExpiredLogsPartitions(context.Background(), repo.db, cutoff) + if err != nil { + t.Fatalf("DropExpiredLogsPartitions: %v", err) + } + if dropped < 1 { + t.Fatalf("expected at least 1 dropped partition (the 30-day-old one), got %d", dropped) + } + + // The dropped partition should no longer exist. + var present int + if err := repo.db.Raw(`SELECT COUNT(*) FROM pg_class WHERE relname = ?`, beforeName).Row().Scan(&present); err != nil { + t.Fatalf("check class: %v", err) + } + if present != 0 { + t.Fatalf("partition %s should have been dropped", beforeName) + } + + // Today's partition should NOT have been dropped. + todayName := partitionNameForDay(time.Now().UTC()) + if err := repo.db.Raw(`SELECT COUNT(*) FROM pg_class WHERE relname = ?`, todayName).Row().Scan(&present); err != nil { + t.Fatalf("check today class: %v", err) + } + if present != 1 { + t.Fatalf("today's partition %s should still exist", todayName) + } + + afterCount, err := countLogsPartitions(context.Background(), repo.db) + if err != nil { + t.Fatalf("count after: %v", err) + } + if afterCount >= beforeCount { + t.Fatalf("partition count should have decreased: before=%d after=%d", beforeCount, afterCount) + } +} + +// TestPGPartition_GreenfieldGuard verifies that running partitioning setup +// against a DB with an existing non-partitioned `logs` table refuses to +// proceed. We reach this by running AutoMigrateModels (no partition opts) +// first, then trying the partitioning path on the same DB. +func TestPGPartition_GreenfieldGuard(t *testing.T) { + ctx := context.Background() + pgContainer, err := postgres.Run(ctx, "postgres:16-alpine", + postgres.WithDatabase("otel_test"), + postgres.WithUsername("otel"), + postgres.WithPassword("otel"), + postgres.BasicWaitStrategies(), + ) + if err != nil { + t.Skipf("docker unavailable, skipping: %v", err) + } + defer func() { _ = pgContainer.Terminate(ctx) }() + + dsn, err := pgContainer.ConnectionString(ctx, "sslmode=disable") + if err != nil { + t.Fatalf("ConnectionString: %v", err) + } + db, err := NewDatabase("postgres", dsn) + if err != nil { + t.Fatalf("NewDatabase: %v", err) + } + defer func() { + if sqlDB, _ := db.DB(); sqlDB != nil { + _ = sqlDB.Close() + } + }() + + // First migrate without partitioning — creates a regular `logs` table. + if err := AutoMigrateModels(db, "postgres"); err != nil { + t.Fatalf("first migrate: %v", err) + } + + // Now attempt to enable partitioning — should refuse. + err = AutoMigrateModelsWithOptions(db, "postgres", MigrateOptions{ + PostgresPartitioning: PartitioningModeDaily, + PartitionLookaheadDays: 3, + }) + if err == nil { + t.Fatal("expected error when enabling partitioning on existing non-partitioned logs table") + } + if !strings.Contains(err.Error(), "greenfield-only") { + t.Fatalf("expected greenfield-only error, got: %v", err) + } +} + +// TestPGPartition_SchedulerDropsExpiredAndCreatesLookahead exercises the +// full PartitionScheduler lifecycle. +func TestPGPartition_SchedulerDropsExpiredAndCreatesLookahead(t *testing.T) { + repo, teardown := setupPGContainerPartitioned(t, 1) + defer teardown() + + // Stage an old partition outside retention. + old := time.Now().UTC().Add(-30 * 24 * time.Hour) + if err := EnsureLogsPartitionForDay(repo.db, old); err != nil { + t.Fatalf("ensure old: %v", err) + } + + sched := NewPartitionScheduler(repo, 7, 5) // retention=7d, lookahead=5d + // Tighten intervals so the test isn't slow; we still rely on the + // synchronous initial pass in Start() rather than the loop. + sched.ensureInterval = time.Hour + sched.dropInterval = time.Hour + + dropped := 0 + active := 0 + sched.SetMetrics(func(n int) { dropped += n }, func(n int) { active = n }) + + ctx, cancel := context.WithCancel(context.Background()) + sched.Start(ctx) + defer func() { cancel(); sched.Stop() }() + + if dropped < 1 { + t.Fatalf("scheduler initial pass should have dropped >=1 expired partition; got %d", dropped) + } + // active should be at least lookahead (5) + today + yesterday = 7. + if active < 7 { + t.Fatalf("active partitions after initial ensure should be >=7; got %d", active) + } + + // Idempotency: another tick is a no-op. + sched.runEnsure(ctx) + sched.runDrop(ctx) +} + +// TestPGPartition_PgTrgmIndexesPropagateToChildren verifies that the GIN +// trigram indexes declared on the parent are inherited by daily children +// (Postgres ≥ 11 propagates partitioned indexes automatically). +func TestPGPartition_PgTrgmIndexesPropagateToChildren(t *testing.T) { + repo, teardown := setupPGContainerPartitioned(t, 1) + defer teardown() + + // Pick today's partition name and confirm that an idx_logs_body_trgm + // inherited index exists on it. + todayName := partitionNameForDay(time.Now().UTC()) + var ginCount int + if err := repo.db.Raw(` + SELECT COUNT(*) + FROM pg_indexes + WHERE schemaname = current_schema() + AND tablename = ? + AND indexdef ILIKE '%USING gin%' + `, todayName).Row().Scan(&ginCount); err != nil { + t.Fatalf("inspect indexes: %v", err) + } + if ginCount < 1 { + t.Skipf("pg_trgm GIN inheritance index not present on %s — extension may be missing", todayName) + } +} diff --git a/internal/storage/repository.go b/internal/storage/repository.go index de8ba77..80b1c38 100644 --- a/internal/storage/repository.go +++ b/internal/storage/repository.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "os" + "strconv" "strings" "time" @@ -12,6 +13,22 @@ import ( "gorm.io/gorm" ) +// partitionLookaheadFromEnv reads DB_PARTITION_LOOKAHEAD_DAYS, defaulting to +// 3 when unset, malformed, or out of the validation range. Validation also +// happens at the config layer; this fallback keeps the storage package +// usable when wired without a *config.Config (tests, embedded callers). +func partitionLookaheadFromEnv() int { + v := strings.TrimSpace(os.Getenv("DB_PARTITION_LOOKAHEAD_DAYS")) + if v == "" { + return 3 + } + n, err := strconv.Atoi(v) + if err != nil || n < 1 || n > 365 { + return 3 + } + return n +} + // likeOpFor returns the case-insensitive LIKE operator for the given driver. // Postgres LIKE is case-sensitive; SQLite/MySQL LIKE is case-insensitive by default. // Callers should embed the returned token directly into SQL fragments. @@ -47,8 +64,23 @@ type Repository struct { db *gorm.DB driver string metrics *telemetry.Metrics + + // logsPartitioned is set to true when DB_POSTGRES_PARTITIONING=daily is + // active and the `logs` parent has been provisioned as a partitioned + // table. RetentionScheduler reads this to skip the logs DELETE — the + // PartitionScheduler does retention via DROP PARTITION instead. + logsPartitioned bool } +// LogsPartitioned reports whether the `logs` table is provisioned as a +// declarative partitioned parent. Used by RetentionScheduler to bypass the +// row-level DELETE path when partition-level DROP is in charge of retention. +func (r *Repository) LogsPartitioned() bool { return r.logsPartitioned } + +// MarkLogsPartitioned flips the partitioned flag. Called by the partitioning +// setup path (factory.go) once the partitioned schema is in place. +func (r *Repository) MarkLogsPartitioned() { r.logsPartitioned = true } + // NewRepository initializes the database connection using environment variables and migrates the schema. func NewRepository(metrics *telemetry.Metrics) (*Repository, error) { driver := os.Getenv("DB_DRIVER") @@ -67,7 +99,11 @@ func NewRepository(metrics *telemetry.Metrics) (*Repository, error) { // Auto-migration is enabled by default. Disable via DB_AUTOMIGRATE=false when // using versioned migrations in production (Postgres table locks, no rollback). if autoMigrateEnabled() { - if err := AutoMigrateModels(db, driver); err != nil { + opts := MigrateOptions{ + PostgresPartitioning: strings.ToLower(strings.TrimSpace(os.Getenv("DB_POSTGRES_PARTITIONING"))), + PartitionLookaheadDays: partitionLookaheadFromEnv(), + } + if err := AutoMigrateModelsWithOptions(db, driver, opts); err != nil { return nil, err } } else { @@ -96,7 +132,19 @@ func NewRepository(metrics *telemetry.Metrics) (*Repository, error) { }) } - return &Repository{db: db, driver: driver, metrics: metrics}, nil + repo := &Repository{db: db, driver: driver, metrics: metrics} + // Detect partitioned-logs mode from the live schema so the + // RetentionScheduler can skip the row-level DELETE path. We do this from + // the DB rather than passing the config flag through several layers, + // which keeps the storage package config-agnostic and resilient to a + // half-applied migration. + if driver == "postgres" || driver == "postgresql" { + if rk, err := pgLogsRelkind(db); err == nil && rk == "p" { + repo.logsPartitioned = true + slog.Info("📦 Postgres: logs is partitioned — retention will use DROP PARTITION (via PartitionScheduler)") + } + } + return repo, nil } // Stats aggregation and DB management diff --git a/internal/storage/retention.go b/internal/storage/retention.go index 2d69a52..dc3f54f 100644 --- a/internal/storage/retention.go +++ b/internal/storage/retention.go @@ -169,9 +169,18 @@ func (r *RetentionScheduler) runPurge(ctx context.Context) { results <- result{kind, n, err} }() } - runGuarded("logs", func() (int64, error) { - return r.repo.PurgeLogsBatched(ctx, cutoff, r.purgeBatchSize, r.purgeBatchSleep) - }) + // When DB_POSTGRES_PARTITIONING=daily is active, retention for `logs` is + // handled by PartitionScheduler via DROP PARTITION (orders of magnitude + // faster than DELETE). Skip the logs DELETE here so we don't pay for two + // retention paths against the same table. + logsHandledByPartition := r.repo.LogsPartitioned() + logsExpected := 0 + if !logsHandledByPartition { + logsExpected = 1 + runGuarded("logs", func() (int64, error) { + return r.repo.PurgeLogsBatched(ctx, cutoff, r.purgeBatchSize, r.purgeBatchSleep) + }) + } runGuarded("traces", func() (int64, error) { return r.repo.PurgeTracesBatched(ctx, cutoff, r.purgeBatchSize, r.purgeBatchSleep) }) @@ -181,7 +190,8 @@ func (r *RetentionScheduler) runPurge(ctx context.Context) { purgeFailed := false totals := map[string]int64{} - for i := 0; i < 3; i++ { + totalRuns := 2 + logsExpected + for range totalRuns { res := <-results if res.err != nil { slog.Error("retention: purge failed", "kind", res.kind, "error", res.err) diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index 290f30d..2637c7a 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -67,6 +67,15 @@ type Metrics struct { RetentionVacuumDurationSeconds *prometheus.HistogramVec RetentionRowsBehindGauge *prometheus.GaugeVec + // --- Postgres partitioning (DB_POSTGRES_PARTITIONING=daily) --- + // PartitionsDropped counts daily logs partitions dropped during the + // retention pass. Each drop is a near-instant DDL — alert when this + // counter is flat for >1.5 retention periods (indicates a stuck loop). + PartitionsDropped prometheus.Counter + // PartitionsActive gauges the live partitions attached to logs. + // Healthy steady-state ~ HOT_RETENTION_DAYS + DB_PARTITION_LOOKAHEAD_DAYS + 1. + PartitionsActive prometheus.Gauge + // --- Runtime --- GoGoroutines prometheus.Gauge GoHeapAllocBytes prometheus.Gauge @@ -242,6 +251,16 @@ func New() *Metrics { Help: "Rows older than retention cutoff that have not yet been purged. Climbing means purge cannot keep pace with ingest.", }, []string{"table", "driver"}), + // Postgres partitioning + PartitionsDropped: promauto.NewCounter(prometheus.CounterOpts{ + Name: "otelcontext_partitions_dropped_total", + Help: "Total daily logs partitions dropped by the partition scheduler. Increments by `n` when n partitions are dropped on a single tick.", + }), + PartitionsActive: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "otelcontext_partitions_active", + Help: "Live partitions attached to the logs parent. Steady-state ≈ HOT_RETENTION_DAYS + DB_PARTITION_LOOKAHEAD_DAYS + 1.", + }), + // Runtime GoGoroutines: promauto.NewGauge(prometheus.GaugeOpts{ Name: "OtelContext_go_goroutines", diff --git a/main.go b/main.go index ce7279d..8ef1c39 100644 --- a/main.go +++ b/main.go @@ -198,6 +198,33 @@ func main() { retention.Start(ctxRetention) slog.Info("🧹 Retention scheduler started", "retention_days", cfg.HotRetentionDays) + // 2b. Partition scheduler: only when DB_POSTGRES_PARTITIONING=daily. + // Maintains lookahead daily partitions and drops expired ones — DROP + // PARTITION is orders of magnitude faster than DELETE for retention. + var partitionScheduler *storage.PartitionScheduler + var cancelPartitions context.CancelFunc = func() {} + if cfg.DBPostgresPartitioning == storage.PartitioningModeDaily { + ctxPart, cancelPart := context.WithCancel(context.Background()) + partitionScheduler = storage.NewPartitionScheduler(repo, cfg.HotRetentionDays, cfg.DBPartitionLookaheadDays) + if metrics != nil { + partitionScheduler.SetMetrics( + func(n int) { + if metrics.PartitionsDropped != nil { + metrics.PartitionsDropped.Add(float64(n)) + } + }, + func(n int) { + if metrics.PartitionsActive != nil { + metrics.PartitionsActive.Set(float64(n)) + } + }, + ) + } + partitionScheduler.Start(ctxPart) + cancelPartitions = cancelPart + slog.Info("📦 Partition scheduler started", "lookahead_days", cfg.DBPartitionLookaheadDays, "retention_days", cfg.HotRetentionDays) + } + // 3. Initialize DLQ (Dead Letter Queue) replayInterval, err := time.ParseDuration(cfg.DLQReplayInterval) if err != nil { @@ -788,9 +815,13 @@ func main() { // 4. Stop DLQ (may still be replaying) dlq.Stop() - // 4a. Stop retention scheduler before closing DB (it issues queries). + // 4a. Stop retention + partition schedulers before closing DB (both issue queries). cancelRetention() retention.Stop() + cancelPartitions() + if partitionScheduler != nil { + partitionScheduler.Stop() + } // 4b. Shutdown the OTel tracer provider (flushes pending spans). if shutdownTracer != nil {