Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ Key settings in `internal/config/config.go`:
- `INGEST_ASYNC_ENABLED` (true), `INGEST_PIPELINE_QUEUE_SIZE` (50000), `INGEST_PIPELINE_WORKERS` (8) — async ingest pipeline (`internal/ingest/pipeline.go`). Hybrid backpressure: <90% accept all, 90–100% drop healthy batches (errors/slow always pass), 100% return gRPC `RESOURCE_EXHAUSTED`. Set `INGEST_ASYNC_ENABLED=false` to revert to synchronous DB writes inside `Export()`. Drops surface as `otelcontext_ingest_pipeline_dropped_total{signal,reason}`.
- `GRPC_MAX_RECV_MB` (16), `GRPC_MAX_CONCURRENT_STREAMS` (1000) — OTLP gRPC server caps, validated to 1..256 and 1..1_000_000
- `RETENTION_BATCH_SIZE` (50000), `RETENTION_BATCH_SLEEP_MS` (1) — purge pacing; raise the sleep on busy production DBs
- `DB_POSTGRES_PARTITIONING` (`""`), `DB_PARTITION_LOOKAHEAD_DAYS` (3) — opt-in Postgres declarative range partitioning of the `logs` table by day. When `daily`, `logs` is provisioned as a partitioned parent (greenfield only — refuses to start if `logs` already exists unpartitioned), the `PartitionScheduler` maintains lookahead partitions and drops expired ones via `DROP TABLE`, and `RetentionScheduler` skips the row-level DELETE for `logs`. Watch `otelcontext_partitions_dropped_total` and `otelcontext_partitions_active`.
- `APP_ENV` (`"development"`), `OTELCONTEXT_ALLOW_SQLITE_PROD` (false) — SQLite is refused when `APP_ENV=production` unless the allow flag is set

### Authentication
Expand Down
21 changes: 21 additions & 0 deletions docs/OPERATIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,27 @@ SQLite is rejected at startup when `APP_ENV=production` unless you explicitly op

**Multi-tenancy.** Every row carries a `tenant_id` column. The write path reads `X-Tenant-ID` (HTTP) or `x-tenant-id` (gRPC metadata) and populates the column. The read path attaches the tenant from the request context to every repository query (`Where("tenant_id = ?", ...)`).

### Postgres declarative partitioning (opt-in)

| Setting | Default | When to enable |
|---|---|---|
| `DB_POSTGRES_PARTITIONING` | `""` (off) | High-volume Postgres deployments where row-level retention DELETE on `logs` becomes the bottleneck |
| `DB_PARTITION_LOOKAHEAD_DAYS` | `3` | Future daily partitions to keep staged. Raise if your DB is far from the app or your retention is short |

When `DB_POSTGRES_PARTITIONING=daily`:

- `logs` is provisioned as a `RANGE PARTITION BY (timestamp)` parent with the composite PK `(id, timestamp)`. AutoMigrate sees an existing table and skips the model.
- Initial partitions cover yesterday + today + `lookahead` future days. Yesterday absorbs late-arriving events at the day-boundary rollover.
- The PartitionScheduler runs hourly, idempotently ensures upcoming partitions exist, and drops any partition whose entire range predates the retention cutoff. Drop is `DROP TABLE IF EXISTS <child>` — orders of magnitude faster than the row-by-row DELETE.
- RetentionScheduler skips its `logs` DELETE branch when partitioning is active. `traces` and `metric_buckets` continue to use the existing batched DELETE path.
- `pg_trgm` GIN indexes on `logs.body` and `logs.service_name` are declared on the parent and propagate automatically to current and future partitions.

**Greenfield only.** Startup refuses to enable partitioning if `logs` already exists as a non-partitioned table — migrating an unpartitioned `logs` to a partitioned one requires copying rows into a swapped table and is out of scope for the current phase. Drop the table or migrate manually before flipping the flag.

Telemetry:
- `otelcontext_partitions_dropped_total` — increments by `n` when the scheduler drops `n` partitions on a tick
- `otelcontext_partitions_active` — current partition count attached to the parent. Steady-state ≈ `HOT_RETENTION_DAYS + DB_PARTITION_LOOKAHEAD_DAYS + 1`. Alert when this gauge climbs unbounded (drop loop stuck) or falls toward zero (over-aggressive drop)

### Log search index

| Driver | Index | Ranking |
Expand Down
38 changes: 38 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ type Config struct {
DBMaxIdleConns int
DBConnMaxLifetime string // e.g. "1h", "30m"

// Postgres-only opt-in: declarative range partitioning of the logs table by
// day. When set to "daily", AutoMigrate provisions logs as a partitioned
// table and the PartitionScheduler creates lookahead partitions and drops
// expired ones (DROP PARTITION beats DELETE for retention by orders of
// magnitude). Greenfield only — startup refuses if `logs` already exists
// as a non-partitioned table. Empty / "none" = legacy unpartitioned schema.
DBPostgresPartitioning string

// Number of future daily partitions to maintain ahead of "today" when
// DBPostgresPartitioning=daily. Defaults to 3. Tune up if your retention
// policy is short and ingest spikes around a daily boundary.
DBPartitionLookaheadDays int

// Retention
HotRetentionDays int

Expand Down Expand Up @@ -186,6 +199,10 @@ func Load(customPath string) (*Config, error) {
DBMaxIdleConns: getEnvInt("DB_MAX_IDLE_CONNS", 10),
DBConnMaxLifetime: getEnv("DB_CONN_MAX_LIFETIME", "1h"),

// Postgres partitioning (opt-in). Default empty = legacy unpartitioned.
DBPostgresPartitioning: strings.ToLower(strings.TrimSpace(getEnv("DB_POSTGRES_PARTITIONING", ""))),
DBPartitionLookaheadDays: getEnvInt("DB_PARTITION_LOOKAHEAD_DAYS", 3),

// Retention
HotRetentionDays: getEnvInt("HOT_RETENTION_DAYS", 7),
RetentionBatchSize: getEnvInt("RETENTION_BATCH_SIZE", 50000),
Expand Down Expand Up @@ -324,6 +341,27 @@ func (c *Config) Validate() error {
return fmt.Errorf("invalid DB_DRIVER %q: must be one of sqlite, postgres, mysql, mssql", c.DBDriver)
}

// Partitioning is Postgres-only. Reject mismatched configs at startup so
// the operator finds out immediately rather than silently running in
// unpartitioned mode.
switch c.DBPostgresPartitioning {
case "", "none", "daily":
// ok
default:
return fmt.Errorf("invalid DB_POSTGRES_PARTITIONING %q: must be one of \"\", \"none\", \"daily\"", c.DBPostgresPartitioning)
}
if c.DBPostgresPartitioning == "daily" {
drv := strings.ToLower(c.DBDriver)
if drv != "postgres" && drv != "postgresql" {
return fmt.Errorf("DB_POSTGRES_PARTITIONING=daily requires DB_DRIVER=postgres, got %q", c.DBDriver)
}
}
// 0 == "use default at the storage layer" so direct struct construction
// (tests, embedded callers) doesn't have to set it.
if c.DBPartitionLookaheadDays < 0 || c.DBPartitionLookaheadDays > 365 {
return fmt.Errorf("DB_PARTITION_LOOKAHEAD_DAYS must be between 0 and 365, got %d", c.DBPartitionLookaheadDays)
}

// Numeric ranges.
// Upper bound on HOT_RETENTION_DAYS guards against int64 nanosecond overflow in
// time.Duration(days) * 24 * time.Hour (overflow above ~106751 days flips the
Expand Down
47 changes: 46 additions & 1 deletion internal/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,33 @@
}

// AutoMigrateModels runs GORM auto-migration for all OtelContext models.
//
// When DB_POSTGRES_PARTITIONING=daily, the `logs` table is provisioned as a
// declarative range-partitioned parent BEFORE GORM AutoMigrate runs, so
// AutoMigrate sees an existing table and skips it (GORM's IF NOT EXISTS
// behaviour). This is greenfield-only — see setupPostgresPartitionedLogs for
// the safety check.
func AutoMigrateModels(db *gorm.DB, driver string) error {
return AutoMigrateModelsWithOptions(db, driver, MigrateOptions{})
}

// MigrateOptions tunes AutoMigrateModelsWithOptions. Empty zero value
// preserves the legacy AutoMigrateModels behaviour.
type MigrateOptions struct {
// PostgresPartitioning, when "daily", provisions `logs` as a partitioned
// table. Any other value (including the empty string) keeps the legacy
// unpartitioned schema.
PostgresPartitioning string
// PartitionLookaheadDays is the number of future daily partitions to
// pre-create at boot. Defaults to 3 when zero.
PartitionLookaheadDays int
}

// AutoMigrateModelsWithOptions is the option-driven variant of
// AutoMigrateModels. Existing callers should continue to use AutoMigrateModels
// — the options entry point is for new wiring (currently main.go) that needs
// to plumb the partitioning flag.
func AutoMigrateModelsWithOptions(db *gorm.DB, driver string, opts MigrateOptions) error {

Check failure on line 213 in internal/storage/factory.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 32 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=RandomCodeSpace_otelcontext&issues=AZ3P1-wgSOt6ravzY3n5&open=AZ3P1-wgSOt6ravzY3n5&pullRequest=53
driver = strings.ToLower(driver)

// Disable FK checks during migration for MySQL.
Expand All @@ -197,7 +223,26 @@
log.Println("🔓 Disabled foreign key checks for migration")
}

if err := db.AutoMigrate(&Trace{}, &Span{}, &Log{}, &MetricBucket{}); err != nil {
// Postgres partitioning: provision the partitioned `logs` parent + initial
// daily partitions BEFORE GORM AutoMigrate runs, and skip Log from
// AutoMigrate's slice. AutoMigrate would otherwise try to ALTER the
// timestamp column (because the model tag doesn't carry an explicit
// `not null` and the partitioned PK forces NOT NULL on the column),
// which Postgres rejects because the column is part of the partition key.
logsPartitioned := false
if (driver == "postgres" || driver == "postgresql") && opts.PostgresPartitioning == PartitioningModeDaily {
if err := setupPostgresPartitionedLogs(db, opts.PartitionLookaheadDays); err != nil {
return fmt.Errorf("setup partitioned logs: %w", err)
}
log.Printf("📦 Postgres: declarative partitioning enabled (daily, lookahead=%d days)", opts.PartitionLookaheadDays)
logsPartitioned = true
}

migrateModels := []any{&Trace{}, &Span{}, &MetricBucket{}}
if !logsPartitioned {
migrateModels = append(migrateModels, &Log{})
}
if err := db.AutoMigrate(migrateModels...); err != nil {
return fmt.Errorf("failed to migrate database: %w", err)
}

Expand Down
Loading
Loading