Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions internal/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,15 @@ func AutoMigrateModelsWithOptions(db *gorm.DB, driver string, opts MigrateOption
logsPartitioned = true
}

// Dedupe spans BEFORE AutoMigrate adds the composite uniqueIndex
// idx_spans_tenant_trace_span on (tenant_id, trace_id, span_id).
// Pre-RAN-65 deployments may have duplicates from DLQ replays; the
// unique index would fail to create against violating rows. No-op on
// fresh databases or when the unique index already exists.
if err := dedupeSpansForUniqueIndex(db, driver); err != nil {
log.Printf("⚠️ span dedupe before unique index failed: %v", err)
}

migrateModels := []any{&Trace{}, &Span{}, &MetricBucket{}}
if !logsPartitioned {
migrateModels = append(migrateModels, &Log{})
Expand Down
102 changes: 102 additions & 0 deletions internal/storage/migrate_spans.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package storage

import (
"fmt"
"log"
"strings"

"gorm.io/gorm"
)

// dedupeSpansForUniqueIndex collapses any pre-existing duplicate spans on
// (tenant_id, trace_id, span_id) before AutoMigrate adds the composite
// uniqueIndex. Without this, CREATE UNIQUE INDEX would fail on databases
// that accumulated duplicates from earlier DLQ replays (or any pre-RAN-65
// non-idempotent ingest path), aborting startup.
//
// Strategy across drivers: keep the lowest primary-key row per
// (tenant_id, trace_id, span_id), delete the rest. Idempotent — a fresh
// database (or one that never collected duplicates) is a no-op.
//
// Must run BEFORE db.AutoMigrate(...) in AutoMigrateModels; once the
// unique constraint is in place, the dedupe is unnecessary because new
// inserts collapse via OnConflict.DoNothing.
func dedupeSpansForUniqueIndex(db *gorm.DB, driver string) error {
if db == nil {
return nil
}
driver = strings.ToLower(driver)

// Skip if the spans table doesn't exist yet — fresh databases have
// nothing to dedupe and the upcoming AutoMigrate will create the
// table with the uniqueIndex already in place.
if !db.Migrator().HasTable("spans") {
return nil
}

// Skip if the unique index already exists (idempotent re-runs).
if db.Migrator().HasIndex("spans", "idx_spans_tenant_trace_span") {
return nil
}

var deleted int64
switch driver {
case "sqlite", "":
// SQLite supports DELETE with subquery on same table.
res := db.Exec(`DELETE FROM spans WHERE id NOT IN (
SELECT MIN(id) FROM spans GROUP BY tenant_id, trace_id, span_id
)`)
if res.Error != nil {
return fmt.Errorf("dedupe spans (sqlite): %w", res.Error)
}
deleted = res.RowsAffected

case "postgres", "postgresql":
// USING self-join keeps the lowest id per (tenant, trace, span).
res := db.Exec(`DELETE FROM spans a USING spans b
WHERE a.id > b.id
AND a.tenant_id = b.tenant_id
AND a.trace_id = b.trace_id
AND a.span_id = b.span_id`)
if res.Error != nil {
return fmt.Errorf("dedupe spans (postgres): %w", res.Error)
}
deleted = res.RowsAffected

case "mysql":
// MySQL forbids referencing the target table directly inside a
// DELETE subquery; route through a temp table to keep the
// "minimum id wins" semantics portable.
if err := db.Exec(`CREATE TEMPORARY TABLE _spans_dedupe_keep AS
SELECT MIN(id) AS id FROM spans GROUP BY tenant_id, trace_id, span_id`).Error; err != nil {
return fmt.Errorf("dedupe spans (mysql temp table): %w", err)
}
defer db.Exec("DROP TEMPORARY TABLE IF EXISTS _spans_dedupe_keep")
res := db.Exec(`DELETE FROM spans
WHERE id NOT IN (SELECT id FROM _spans_dedupe_keep)`)
if res.Error != nil {
return fmt.Errorf("dedupe spans (mysql): %w", res.Error)
}
deleted = res.RowsAffected

case "sqlserver", "mssql":
// T-SQL: ROW_NUMBER() over the dedupe key, then delete duplicates.
res := db.Exec(`WITH dups AS (
SELECT id, ROW_NUMBER() OVER (
PARTITION BY tenant_id, trace_id, span_id ORDER BY id
) AS rn FROM spans
) DELETE FROM spans WHERE id IN (SELECT id FROM dups WHERE rn > 1)`)
if res.Error != nil {
return fmt.Errorf("dedupe spans (mssql): %w", res.Error)
}
deleted = res.RowsAffected

default:
return nil
}

if deleted > 0 {
log.Printf("🧹 Deduplicated %d duplicate span row(s) before adding uniqueIndex idx_spans_tenant_trace_span", deleted)
}
return nil
}
14 changes: 11 additions & 3 deletions internal/storage/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,19 @@ type Trace struct {
}

// Span represents a single operation within a trace.
//
// Idempotency: the composite uniqueIndex idx_spans_tenant_trace_span on
// (tenant_id, trace_id, span_id) ensures a span is written at most once
// per tenant. DLQ replay (or any duplicate ingest) collapses cleanly via
// OnConflict.DoNothing in BatchCreateAll/BatchCreateSpans rather than
// double-counting in downstream metrics or GraphRAG. The composite covers
// the legacy idx_spans_tenant_trace as a left-prefix; the legacy index
// is retained for query-plan stability across upgrades.
type Span struct {
ID uint `gorm:"primaryKey" json:"id"`
TenantID string `gorm:"size:64;default:'default';not null;index:idx_spans_tenant_trace,priority:1;index:idx_spans_tenant_service_start,priority:1" json:"tenant_id"`
TraceID string `gorm:"size:32;not null;index:idx_spans_tenant_trace,priority:2" json:"trace_id"`
SpanID string `gorm:"size:16;not null" json:"span_id"`
TenantID string `gorm:"size:64;default:'default';not null;index:idx_spans_tenant_trace,priority:1;index:idx_spans_tenant_service_start,priority:1;uniqueIndex:idx_spans_tenant_trace_span,priority:1" json:"tenant_id"`
TraceID string `gorm:"size:32;not null;index:idx_spans_tenant_trace,priority:2;uniqueIndex:idx_spans_tenant_trace_span,priority:2" json:"trace_id"`
SpanID string `gorm:"size:16;not null;uniqueIndex:idx_spans_tenant_trace_span,priority:3" json:"span_id"`
ParentSpanID string `gorm:"size:16" json:"parent_span_id"`
OperationName string `gorm:"size:255;index" json:"operation_name"`
StartTime time.Time `gorm:"index:idx_spans_tenant_service_start,priority:3" json:"start_time"`
Expand Down
Loading
Loading