diff --git a/internal/storage/factory.go b/internal/storage/factory.go index 3de6838..e14f78d 100644 --- a/internal/storage/factory.go +++ b/internal/storage/factory.go @@ -238,6 +238,15 @@ func AutoMigrateModelsWithOptions(db *gorm.DB, driver string, opts MigrateOption logsPartitioned = true } + // Dedupe spans BEFORE AutoMigrate adds the composite uniqueIndex + // idx_spans_tenant_trace_span on (tenant_id, trace_id, span_id). + // Pre-RAN-65 deployments may have duplicates from DLQ replays; the + // unique index would fail to create against violating rows. No-op on + // fresh databases or when the unique index already exists. + if err := dedupeSpansForUniqueIndex(db, driver); err != nil { + log.Printf("โš ๏ธ span dedupe before unique index failed: %v", err) + } + migrateModels := []any{&Trace{}, &Span{}, &MetricBucket{}} if !logsPartitioned { migrateModels = append(migrateModels, &Log{}) diff --git a/internal/storage/migrate_spans.go b/internal/storage/migrate_spans.go new file mode 100644 index 0000000..43e28b1 --- /dev/null +++ b/internal/storage/migrate_spans.go @@ -0,0 +1,102 @@ +package storage + +import ( + "fmt" + "log" + "strings" + + "gorm.io/gorm" +) + +// dedupeSpansForUniqueIndex collapses any pre-existing duplicate spans on +// (tenant_id, trace_id, span_id) before AutoMigrate adds the composite +// uniqueIndex. Without this, CREATE UNIQUE INDEX would fail on databases +// that accumulated duplicates from earlier DLQ replays (or any pre-RAN-65 +// non-idempotent ingest path), aborting startup. +// +// Strategy across drivers: keep the lowest primary-key row per +// (tenant_id, trace_id, span_id), delete the rest. Idempotent โ€” a fresh +// database (or one that never collected duplicates) is a no-op. +// +// Must run BEFORE db.AutoMigrate(...) in AutoMigrateModels; once the +// unique constraint is in place, the dedupe is unnecessary because new +// inserts collapse via OnConflict.DoNothing. +func dedupeSpansForUniqueIndex(db *gorm.DB, driver string) error { + if db == nil { + return nil + } + driver = strings.ToLower(driver) + + // Skip if the spans table doesn't exist yet โ€” fresh databases have + // nothing to dedupe and the upcoming AutoMigrate will create the + // table with the uniqueIndex already in place. + if !db.Migrator().HasTable("spans") { + return nil + } + + // Skip if the unique index already exists (idempotent re-runs). + if db.Migrator().HasIndex("spans", "idx_spans_tenant_trace_span") { + return nil + } + + var deleted int64 + switch driver { + case "sqlite", "": + // SQLite supports DELETE with subquery on same table. + res := db.Exec(`DELETE FROM spans WHERE id NOT IN ( + SELECT MIN(id) FROM spans GROUP BY tenant_id, trace_id, span_id + )`) + if res.Error != nil { + return fmt.Errorf("dedupe spans (sqlite): %w", res.Error) + } + deleted = res.RowsAffected + + case "postgres", "postgresql": + // USING self-join keeps the lowest id per (tenant, trace, span). + res := db.Exec(`DELETE FROM spans a USING spans b + WHERE a.id > b.id + AND a.tenant_id = b.tenant_id + AND a.trace_id = b.trace_id + AND a.span_id = b.span_id`) + if res.Error != nil { + return fmt.Errorf("dedupe spans (postgres): %w", res.Error) + } + deleted = res.RowsAffected + + case "mysql": + // MySQL forbids referencing the target table directly inside a + // DELETE subquery; route through a temp table to keep the + // "minimum id wins" semantics portable. + if err := db.Exec(`CREATE TEMPORARY TABLE _spans_dedupe_keep AS + SELECT MIN(id) AS id FROM spans GROUP BY tenant_id, trace_id, span_id`).Error; err != nil { + return fmt.Errorf("dedupe spans (mysql temp table): %w", err) + } + defer db.Exec("DROP TEMPORARY TABLE IF EXISTS _spans_dedupe_keep") + res := db.Exec(`DELETE FROM spans + WHERE id NOT IN (SELECT id FROM _spans_dedupe_keep)`) + if res.Error != nil { + return fmt.Errorf("dedupe spans (mysql): %w", res.Error) + } + deleted = res.RowsAffected + + case "sqlserver", "mssql": + // T-SQL: ROW_NUMBER() over the dedupe key, then delete duplicates. + res := db.Exec(`WITH dups AS ( + SELECT id, ROW_NUMBER() OVER ( + PARTITION BY tenant_id, trace_id, span_id ORDER BY id + ) AS rn FROM spans + ) DELETE FROM spans WHERE id IN (SELECT id FROM dups WHERE rn > 1)`) + if res.Error != nil { + return fmt.Errorf("dedupe spans (mssql): %w", res.Error) + } + deleted = res.RowsAffected + + default: + return nil + } + + if deleted > 0 { + log.Printf("๐Ÿงน Deduplicated %d duplicate span row(s) before adding uniqueIndex idx_spans_tenant_trace_span", deleted) + } + return nil +} diff --git a/internal/storage/models.go b/internal/storage/models.go index b00c22e..4993a6c 100644 --- a/internal/storage/models.go +++ b/internal/storage/models.go @@ -110,11 +110,19 @@ type Trace struct { } // Span represents a single operation within a trace. +// +// Idempotency: the composite uniqueIndex idx_spans_tenant_trace_span on +// (tenant_id, trace_id, span_id) ensures a span is written at most once +// per tenant. DLQ replay (or any duplicate ingest) collapses cleanly via +// OnConflict.DoNothing in BatchCreateAll/BatchCreateSpans rather than +// double-counting in downstream metrics or GraphRAG. The composite covers +// the legacy idx_spans_tenant_trace as a left-prefix; the legacy index +// is retained for query-plan stability across upgrades. type Span struct { ID uint `gorm:"primaryKey" json:"id"` - TenantID string `gorm:"size:64;default:'default';not null;index:idx_spans_tenant_trace,priority:1;index:idx_spans_tenant_service_start,priority:1" json:"tenant_id"` - TraceID string `gorm:"size:32;not null;index:idx_spans_tenant_trace,priority:2" json:"trace_id"` - SpanID string `gorm:"size:16;not null" json:"span_id"` + TenantID string `gorm:"size:64;default:'default';not null;index:idx_spans_tenant_trace,priority:1;index:idx_spans_tenant_service_start,priority:1;uniqueIndex:idx_spans_tenant_trace_span,priority:1" json:"tenant_id"` + TraceID string `gorm:"size:32;not null;index:idx_spans_tenant_trace,priority:2;uniqueIndex:idx_spans_tenant_trace_span,priority:2" json:"trace_id"` + SpanID string `gorm:"size:16;not null;uniqueIndex:idx_spans_tenant_trace_span,priority:3" json:"span_id"` ParentSpanID string `gorm:"size:16" json:"parent_span_id"` OperationName string `gorm:"size:255;index" json:"operation_name"` StartTime time.Time `gorm:"index:idx_spans_tenant_service_start,priority:3" json:"start_time"` diff --git a/internal/storage/span_idempotency_test.go b/internal/storage/span_idempotency_test.go new file mode 100644 index 0000000..d0a8850 --- /dev/null +++ b/internal/storage/span_idempotency_test.go @@ -0,0 +1,272 @@ +package storage + +import ( + "context" + "testing" + "time" +) + +// TestBatchCreateSpans_DuplicateInsertNoOp verifies that re-submitting a span +// with the same (tenant, trace, span_id) is silently absorbed โ€” the second +// insert MUST NOT create a duplicate row, must not return an error, and must +// not overwrite the original. This is the contract DLQ replay relies on. +func TestBatchCreateSpans_DuplicateInsertNoOp(t *testing.T) { + repo := newTestRepo(t) + now := time.Now().UTC().Truncate(time.Second) + + first := Span{ + TenantID: "acme", + TraceID: "trace-1", + SpanID: "span-a", + OperationName: "first", + StartTime: now, + EndTime: now.Add(time.Millisecond), + Duration: 1000, + ServiceName: "svc", + } + if err := repo.BatchCreateSpans([]Span{first}); err != nil { + t.Fatalf("first insert: %v", err) + } + + // Replay with the same dedupe key but a different OperationName proves + // OnConflict.DoNothing semantics (NOT DoUpdate) โ€” the original row wins. + replay := first + replay.OperationName = "second-attempt" + if err := repo.BatchCreateSpans([]Span{replay}); err != nil { + t.Fatalf("replay: %v", err) + } + + if got := mustCount(t, repo.db, &Span{}); got != 1 { + t.Fatalf("expected 1 span after replay, got %d", got) + } + + var stored Span + if err := repo.db.Where("tenant_id = ? AND trace_id = ? AND span_id = ?", "acme", "trace-1", "span-a").First(&stored).Error; err != nil { + t.Fatalf("read back: %v", err) + } + if stored.OperationName != "first" { + t.Fatalf("DoNothing violated: stored op=%q want %q", stored.OperationName, "first") + } +} + +// TestBatchCreateSpans_CrossTenantSameKeyAllowed verifies tenant scope of the +// uniqueIndex โ€” the same (trace_id, span_id) under a different tenant inserts +// cleanly. Without this, multi-tenant correlation by span_id would conflate +// rows across tenants on first ingest. +func TestBatchCreateSpans_CrossTenantSameKeyAllowed(t *testing.T) { + repo := newTestRepo(t) + now := time.Now().UTC() + + mk := func(tenant string) Span { + return Span{ + TenantID: tenant, + TraceID: "shared-trace", + SpanID: "shared-span", + OperationName: "op-" + tenant, + StartTime: now, + EndTime: now.Add(time.Millisecond), + ServiceName: "svc-" + tenant, + } + } + if err := repo.BatchCreateSpans([]Span{mk("acme"), mk("beta")}); err != nil { + t.Fatalf("cross-tenant insert: %v", err) + } + + if got := mustCount(t, repo.db, &Span{}); got != 2 { + t.Fatalf("expected 2 spans (one per tenant), got %d", got) + } +} + +// TestBatchCreateAll_SpanReplayIdempotent covers the same DLQ replay scenario +// through the transactional BatchCreateAll path used by the async ingest +// pipeline. Submitting an entire (traces, spans, logs) batch twice must not +// inflate trace or span counts. +func TestBatchCreateAll_SpanReplayIdempotent(t *testing.T) { + repo := newTestRepo(t) + ctx := WithTenantContext(context.Background(), "acme") + now := time.Now().UTC() + + traces := []Trace{{TenantID: "acme", TraceID: "tr-1", ServiceName: "svc", Duration: 100, Status: "OK", Timestamp: now}} + spans := []Span{ + {TenantID: "acme", TraceID: "tr-1", SpanID: "sp-1", OperationName: "op", StartTime: now, EndTime: now, ServiceName: "svc"}, + {TenantID: "acme", TraceID: "tr-1", SpanID: "sp-2", OperationName: "op", StartTime: now, EndTime: now, ServiceName: "svc"}, + } + logs := []Log{{TenantID: "acme", TraceID: "tr-1", SpanID: "sp-1", Severity: "INFO", Body: "hi", ServiceName: "svc", Timestamp: now}} + + if err := repo.BatchCreateAll(traces, spans, logs); err != nil { + t.Fatalf("first batch: %v", err) + } + // Mimic DLQ replay: rows come from JSON deserialization without + // auto-assigned primary keys. Without this reset GORM would try to + // re-insert rows with explicit IDs and trip the PK constraint โ€” + // distinct from the (tenant, trace, span_id) idempotency we're testing. + traces2 := append([]Trace(nil), traces...) + spans2 := append([]Span(nil), spans...) + logs2 := append([]Log(nil), logs...) + for i := range traces2 { + traces2[i].ID = 0 + } + for i := range spans2 { + spans2[i].ID = 0 + } + for i := range logs2 { + logs2[i].ID = 0 + } + if err := repo.BatchCreateAll(traces2, spans2, logs2); err != nil { + t.Fatalf("replay batch: %v", err) + } + + tr, err := repo.GetTrace(ctx, "tr-1") + if err != nil { + t.Fatalf("GetTrace: %v", err) + } + if got := mustCount(t, repo.db, &Trace{}); got != 1 { + t.Fatalf("traces inflated by replay: got %d, want 1", got) + } + if got := mustCount(t, repo.db, &Span{}); got != 2 { + t.Fatalf("spans inflated by replay: got %d, want 2", got) + } + if len(tr.Spans) != 2 { + t.Fatalf("preloaded span count: got %d, want 2", len(tr.Spans)) + } +} + +// TestDedupeSpansForUniqueIndex_RemovesPreExistingDuplicates simulates an +// upgrade from a pre-RAN-65 deployment that accumulated duplicate spans +// from DLQ replays. The dedupe migration must collapse them BEFORE the +// uniqueIndex creation so AutoMigrate succeeds. +func TestDedupeSpansForUniqueIndex_RemovesPreExistingDuplicates(t *testing.T) { + // Build an unmigrated DB, create the spans table WITHOUT the unique + // index (mirrors the legacy schema), seed duplicates, then run the + // dedupe helper directly. + db, err := NewDatabase("sqlite", ":memory:") + if err != nil { + t.Fatalf("NewDatabase: %v", err) + } + t.Cleanup(func() { sqlDB, _ := db.DB(); _ = sqlDB.Close() }) + + if err := db.Exec(`CREATE TABLE spans ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + tenant_id TEXT NOT NULL DEFAULT 'default', + trace_id TEXT NOT NULL, + span_id TEXT NOT NULL, + parent_span_id TEXT, + operation_name TEXT, + start_time DATETIME, + end_time DATETIME, + duration INTEGER, + service_name TEXT, + status TEXT DEFAULT 'STATUS_CODE_UNSET', + attributes_json BLOB + )`).Error; err != nil { + t.Fatalf("create legacy spans table: %v", err) + } + + insert := func(tenant, trace, span, op string) { + if err := db.Exec( + `INSERT INTO spans (tenant_id, trace_id, span_id, operation_name) VALUES (?, ?, ?, ?)`, + tenant, trace, span, op, + ).Error; err != nil { + t.Fatalf("insert: %v", err) + } + } + insert("acme", "tr-1", "sp-1", "first") + insert("acme", "tr-1", "sp-1", "dup-second") // dup + insert("acme", "tr-1", "sp-1", "dup-third") // dup + insert("acme", "tr-1", "sp-2", "unique") + insert("beta", "tr-1", "sp-1", "cross-tenant-keep") + + if err := dedupeSpansForUniqueIndex(db, "sqlite"); err != nil { + t.Fatalf("dedupeSpansForUniqueIndex: %v", err) + } + + type row struct { + ID int + OperationName string + } + var rows []row + if err := db.Raw(`SELECT id, operation_name FROM spans ORDER BY id`).Scan(&rows).Error; err != nil { + t.Fatalf("read back: %v", err) + } + // Three rows survive: first acme/tr-1/sp-1 (lowest id), acme/tr-1/sp-2, + // beta/tr-1/sp-1. The two duplicate rows for acme/tr-1/sp-1 are gone. + if len(rows) != 3 { + t.Fatalf("post-dedupe row count: got %d, want 3 (rows=%+v)", len(rows), rows) + } + for _, r := range rows { + if r.OperationName == "dup-second" || r.OperationName == "dup-third" { + t.Fatalf("duplicate row survived: %+v", r) + } + } +} + +// TestDedupeSpansForUniqueIndex_NoOpOnFreshDB verifies the dedupe is a safe +// no-op when the spans table doesn't exist yet (greenfield startup) โ€” the +// helper must NOT fail in that case. +func TestDedupeSpansForUniqueIndex_NoOpOnFreshDB(t *testing.T) { + db, err := NewDatabase("sqlite", ":memory:") + if err != nil { + t.Fatalf("NewDatabase: %v", err) + } + t.Cleanup(func() { sqlDB, _ := db.DB(); _ = sqlDB.Close() }) + + if err := dedupeSpansForUniqueIndex(db, "sqlite"); err != nil { + t.Fatalf("dedupe on fresh DB: %v", err) + } +} + +// TestDedupeSpansForUniqueIndex_NoOpOnceIndexExists guards against re-running +// the (potentially expensive) dedupe scan on every restart of an already- +// migrated database. +func TestDedupeSpansForUniqueIndex_NoOpOnceIndexExists(t *testing.T) { + repo := newTestRepo(t) + if !repo.db.Migrator().HasIndex("spans", "idx_spans_tenant_trace_span") { + t.Fatalf("test prerequisite: uniqueIndex idx_spans_tenant_trace_span should be present after AutoMigrate") + } + // Stand up a sentinel: a non-conforming raw row that the dedupe SQL + // would normally remove. If the helper short-circuits on HasIndex, + // the row stays. We can't actually insert a duplicate (the unique + // index blocks us), so just re-run the helper and confirm no error. + if err := dedupeSpansForUniqueIndex(repo.db, "sqlite"); err != nil { + t.Fatalf("re-run on migrated DB: %v", err) + } + // And verify HasIndex is still true (sanity). + if !repo.db.Migrator().HasIndex("spans", "idx_spans_tenant_trace_span") { + t.Fatalf("uniqueIndex disappeared after dedupe call") + } +} + +// TestAutoMigrate_AddsSpanUniqueIndex covers the full AutoMigrate path โ€” +// after the migration runs against a fresh DB, the composite uniqueIndex +// must be present. Belt-and-braces against a future refactor that +// accidentally drops the gorm tag. +func TestAutoMigrate_AddsSpanUniqueIndex(t *testing.T) { + db, err := NewDatabase("sqlite", ":memory:") + if err != nil { + t.Fatalf("NewDatabase: %v", err) + } + t.Cleanup(func() { sqlDB, _ := db.DB(); _ = sqlDB.Close() }) + + if err := AutoMigrateModels(db, "sqlite"); err != nil { + t.Fatalf("AutoMigrateModels: %v", err) + } + if !db.Migrator().HasIndex("spans", "idx_spans_tenant_trace_span") { + t.Fatalf("expected uniqueIndex idx_spans_tenant_trace_span after AutoMigrate") + } + // Verify it's actually unique (not just a regular index) by attempting + // a violating insert. + now := time.Now().UTC() + mk := Span{TenantID: "t1", TraceID: "tr", SpanID: "sp", OperationName: "op", StartTime: now, EndTime: now, ServiceName: "svc"} + if err := db.Create(&mk).Error; err != nil { + t.Fatalf("first insert: %v", err) + } + dup := mk + dup.ID = 0 // let GORM assign + dup.OperationName = "dup" + err = db.Create(&dup).Error + if err == nil { + t.Fatalf("expected unique-constraint violation on duplicate span insert, got nil") + } + // Any non-nil error proves the unique index is enforcing โ€” we don't pin + // to a specific error string because driver wording varies. +} diff --git a/internal/storage/testhelpers_test.go b/internal/storage/testhelpers_test.go index 472c79a..8a04103 100644 --- a/internal/storage/testhelpers_test.go +++ b/internal/storage/testhelpers_test.go @@ -1,6 +1,7 @@ package storage import ( + "fmt" "testing" "time" @@ -27,7 +28,7 @@ func newTestRepo(t *testing.T) *Repository { func seedLogs(t *testing.T, db *gorm.DB, n int, ts time.Time, service string) { t.Helper() logs := make([]Log, n) - for i := 0; i < n; i++ { + for i := range n { logs[i] = Log{ TraceID: "trace-xxxx", SpanID: "span-yyyy", @@ -59,8 +60,11 @@ func seedTrace(t *testing.T, db *gorm.DB, traceID string, traceTS time.Time, spa spans := make([]Span, len(spanStartTimes)) for i, st := range spanStartTimes { spans[i] = Span{ - TraceID: traceID, - SpanID: traceID + "-span", + TraceID: traceID, + // Each span needs a distinct SpanID โ€” the composite uniqueIndex + // idx_spans_tenant_trace_span on (tenant_id, trace_id, span_id) + // would otherwise collapse all spans for this trace into one row. + SpanID: fmt.Sprintf("%s-span-%d", traceID, i), OperationName: "op", StartTime: st, EndTime: st.Add(time.Millisecond), diff --git a/internal/storage/trace_repo.go b/internal/storage/trace_repo.go index 23452ad..7c119a8 100644 --- a/internal/storage/trace_repo.go +++ b/internal/storage/trace_repo.go @@ -44,17 +44,32 @@ type ServiceMapMetrics struct { Edges []ServiceMapEdge `json:"edges"` } -// BatchCreateSpans inserts multiple spans in batches. +// BatchCreateSpans inserts multiple spans, skipping duplicates. +// Duplicate is defined per the composite uniqueIndex idx_spans_tenant_trace_span +// on (tenant_id, trace_id, span_id): a (tenant, trace, span) clash is silently +// absorbed so DLQ replays (or any duplicate ingest) collapse to a no-op rather +// than double-inserting. func (r *Repository) BatchCreateSpans(spans []Span) error { if len(spans) == 0 { return nil } - if err := r.db.CreateInBatches(spans, 500).Error; err != nil { + if err := createSpansIdempotent(r.db, r.driver, spans); err != nil { return fmt.Errorf("failed to batch create spans: %w", err) } return nil } +// createSpansIdempotent runs the conflict-tolerant span insert against an +// arbitrary *gorm.DB so the same logic is reused inside a transaction by +// BatchCreateAll. MySQL takes INSERT IGNORE; SQLite/Postgres/SQL Server take +// ON CONFLICT DO NOTHING via the gorm clause helper. +func createSpansIdempotent(db *gorm.DB, driver string, spans []Span) error { + if strings.ToLower(driver) == "mysql" { + return db.Clauses(clause.Insert{Modifier: "IGNORE"}).CreateInBatches(spans, 500).Error + } + return db.Clauses(clause.OnConflict{DoNothing: true}).CreateInBatches(spans, 500).Error +} + // BatchCreateTraces inserts traces, skipping duplicates. // Duplicate is defined per the composite uniqueIndex idx_traces_tenant_trace_id // on (tenant_id, trace_id): a trace_id clash within the same tenant is ignored, @@ -82,12 +97,14 @@ func createTracesIdempotent(db *gorm.DB, driver string, traces []Trace) error { // rolls back any partial commit, preventing orphan FK rows from a worker that // crashed between BatchCreateTraces and BatchCreateSpans. // -// Trace inserts inherit BatchCreateTraces' idempotency: a trace_id clash -// within the same tenant is silently skipped. Spans and logs have no unique -// constraint, so a replay can still produce duplicate rows โ€” that is a -// separate idempotency concern (requires schema migration to fix) and is -// out of scope for this method, whose contract is solely atomicity of the -// batch. +// Idempotency: traces and spans both collapse duplicates silently โ€” +// - traces via idx_traces_tenant_trace_id on (tenant_id, trace_id) +// - spans via idx_spans_tenant_trace_span on (tenant_id, trace_id, span_id) +// +// so a DLQ replay of an already-persisted batch is a safe no-op for those +// signals. Logs do not yet have a unique key (OTLP logs lack a stable +// identifier) and a replay can still produce duplicate log rows; that is a +// separate idempotency concern out of scope for this method. func (r *Repository) BatchCreateAll(traces []Trace, spans []Span, logs []Log) error { if len(traces) == 0 && len(spans) == 0 && len(logs) == 0 { return nil @@ -99,7 +116,7 @@ func (r *Repository) BatchCreateAll(traces []Trace, spans []Span, logs []Log) er } } if len(spans) > 0 { - if err := tx.CreateInBatches(spans, 500).Error; err != nil { + if err := createSpansIdempotent(tx, r.driver, spans); err != nil { return fmt.Errorf("BatchCreateAll: spans: %w", err) } }