From bae5bf8b3cff9b62bf3d6e415424e0fd4028fe6d Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Fri, 24 Apr 2026 21:19:05 +0000 Subject: [PATCH] feat(storage): tenant-scoped trace_id uniqueness (RAN-21) Replace standalone uniqueIndex on Trace.TraceID with composite idx_traces_tenant_trace_id(tenant_id, trace_id) so two tenants can ingest the same trace_id without either dropping into OnConflict.DoNothing. AutoMigrateModels now idempotently discovers and drops any legacy single-column unique index on traces.trace_id (SQLite/Postgres/MySQL/MSSQL) so upgraded databases stop silently blocking cross-tenant reuse. Co-Authored-By: Paperclip --- internal/storage/factory.go | 9 + internal/storage/migrate_traces.go | 181 ++++++++++++++++++++ internal/storage/models.go | 9 +- internal/storage/trace_repo.go | 10 +- internal/storage/trace_tenant_test.go | 232 ++++++++++++++++++++++++++ 5 files changed, 436 insertions(+), 5 deletions(-) create mode 100644 internal/storage/migrate_traces.go create mode 100644 internal/storage/trace_tenant_test.go diff --git a/internal/storage/factory.go b/internal/storage/factory.go index 6d2aa7c..733e142 100644 --- a/internal/storage/factory.go +++ b/internal/storage/factory.go @@ -178,6 +178,15 @@ func AutoMigrateModels(db *gorm.DB, driver string) error { return fmt.Errorf("failed to migrate database: %w", err) } + // RAN-21: retire the pre-composite standalone unique index on traces.trace_id. + // AutoMigrate never drops indexes that no longer appear on struct tags, so on + // pre-existing databases the old uniqueIndex would persist and still block + // cross-tenant trace_id reuse. This is idempotent across drivers and a no-op + // on fresh databases. + if err := dropLegacyTraceIDUniqueIndex(db, driver); err != nil { + log.Printf("⚠️ legacy trace_id unique index drop failed: %v", err) + } + // Drop foreign keys that AutoMigrate may have created (MySQL) if driver == "mysql" { db.Exec("ALTER TABLE spans DROP FOREIGN KEY fk_traces_spans") diff --git a/internal/storage/migrate_traces.go b/internal/storage/migrate_traces.go new file mode 100644 index 0000000..07558d2 --- /dev/null +++ b/internal/storage/migrate_traces.go @@ -0,0 +1,181 @@ +package storage + +import ( + "fmt" + "log" + "strings" + + "gorm.io/gorm" +) + +// dropLegacyTraceIDUniqueIndex removes any pre-RAN-21 standalone UNIQUE index +// that covers only traces.trace_id. From RAN-21 onward uniqueness is the +// composite idx_traces_tenant_trace_id on (tenant_id, trace_id); a surviving +// standalone unique index would silently block cross-tenant trace_id reuse. +// +// Discovery is structure-based (not name-based) because the legacy name varied +// across drivers and GORM versions. The composite index — which lists two +// columns — is never matched and therefore never dropped. +// +// Fresh databases never contain the legacy index, so this is a safe no-op on +// first boot. Invoked once per AutoMigrateModels call and is idempotent. +func dropLegacyTraceIDUniqueIndex(db *gorm.DB, driver string) error { + if db == nil { + return nil + } + driver = strings.ToLower(driver) + names, err := findLegacyTraceIDUniqueIndexes(db, driver) + if err != nil { + return err + } + for _, name := range names { + if name == "" { + continue + } + if err := dropIndexOnTraces(db, driver, name); err != nil { + return fmt.Errorf("drop legacy trace_id unique index %q: %w", name, err) + } + log.Printf("🧹 Dropped legacy single-column unique index on traces.trace_id: %s", name) + } + return nil +} + +// findLegacyTraceIDUniqueIndexes returns every UNIQUE index on the traces table +// whose single indexed column is trace_id. The composite RAN-21 index +// (tenant_id, trace_id) is excluded because it covers two columns. +func findLegacyTraceIDUniqueIndexes(db *gorm.DB, driver string) ([]string, error) { + switch driver { + case "sqlite", "": + // Enumerate every unique index on traces, then inspect its column list + // via PRAGMA index_info. SQLite auto-creates indexes for UNIQUE table + // constraints with names prefixed "sqlite_autoindex_" — those also + // surface here and are handled identically. Aliased to is_unique + // because SQLite treats "unique" as a reserved keyword even as an + // output alias. + type idxRow struct { + Name string `gorm:"column:name"` + IsUnique int `gorm:"column:is_unique"` + } + var idxs []idxRow + if err := db.Raw(`SELECT name, "unique" AS is_unique FROM pragma_index_list('traces')`).Scan(&idxs).Error; err != nil { + return nil, fmt.Errorf("pragma_index_list(traces): %w", err) + } + type colRow struct { + Name string `gorm:"column:name"` + } + var out []string + for _, ix := range idxs { + if ix.IsUnique != 1 { + continue + } + var cols []colRow + if err := db.Raw(fmt.Sprintf("SELECT name FROM pragma_index_info('%s')", ix.Name)).Scan(&cols).Error; err != nil { + return nil, fmt.Errorf("pragma_index_info(%s): %w", ix.Name, err) + } + if len(cols) == 1 && cols[0].Name == "trace_id" { + out = append(out, ix.Name) + } + } + return out, nil + + case "postgres", "postgresql": + // pg_index.indkey is an int2vector of attnums; join against + // pg_attribute to resolve column names. Filter to UNIQUE, non-primary + // indexes on the traces table covering exactly one column = trace_id. + var rows []indexNameRow + const q = ` +SELECT c.relname AS name +FROM pg_index i +JOIN pg_class c ON c.oid = i.indexrelid +JOIN pg_class t ON t.oid = i.indrelid +JOIN pg_namespace n ON n.oid = t.relnamespace +WHERE t.relname = 'traces' + AND n.nspname = ANY (current_schemas(false)) + AND i.indisunique + AND NOT i.indisprimary + AND i.indnatts = 1 + AND ( + SELECT attname FROM pg_attribute + WHERE attrelid = t.oid AND attnum = i.indkey[0] + ) = 'trace_id'` + if err := db.Raw(q).Scan(&rows).Error; err != nil { + return nil, fmt.Errorf("pg_index lookup: %w", err) + } + return flattenIndexNames(rows), nil + + case "mysql": + // information_schema.STATISTICS has one row per (index, seq_in_index). + // Group by index, count columns, and keep indexes where the sole + // column is trace_id and NON_UNIQUE=0. + var rows []indexNameRow + const q = ` +SELECT INDEX_NAME AS name +FROM information_schema.STATISTICS +WHERE TABLE_SCHEMA = DATABASE() + AND TABLE_NAME = 'traces' + AND NON_UNIQUE = 0 + AND INDEX_NAME <> 'PRIMARY' +GROUP BY INDEX_NAME +HAVING COUNT(*) = 1 + AND MAX(COLUMN_NAME) = 'trace_id'` + if err := db.Raw(q).Scan(&rows).Error; err != nil { + return nil, fmt.Errorf("information_schema.STATISTICS lookup: %w", err) + } + return flattenIndexNames(rows), nil + + case "sqlserver", "mssql": + // sys.indexes + sys.index_columns: one row per indexed column; keep + // unique, non-primary-key indexes on dbo.traces whose only column is + // trace_id. + var rows []indexNameRow + const q = ` +SELECT i.name AS name +FROM sys.indexes i +JOIN sys.objects t ON t.object_id = i.object_id +WHERE t.name = 'traces' + AND i.is_unique = 1 + AND i.is_primary_key = 0 + AND ( + SELECT COUNT(*) FROM sys.index_columns ic + WHERE ic.object_id = i.object_id AND ic.index_id = i.index_id + ) = 1 + AND EXISTS ( + SELECT 1 FROM sys.index_columns ic + JOIN sys.columns c ON c.object_id = ic.object_id AND c.column_id = ic.column_id + WHERE ic.object_id = i.object_id AND ic.index_id = i.index_id AND c.name = 'trace_id' + )` + if err := db.Raw(q).Scan(&rows).Error; err != nil { + return nil, fmt.Errorf("sys.indexes lookup: %w", err) + } + return flattenIndexNames(rows), nil + } + return nil, nil +} + +// indexNameRow is a single-column scan target shared by the non-SQLite branches. +// Keeping it package-level (rather than redeclared per-branch) lets +// flattenIndexNames project through a single concrete type. +type indexNameRow struct { + Name string `gorm:"column:name"` +} + +func flattenIndexNames(rows []indexNameRow) []string { + out := make([]string, 0, len(rows)) + for _, r := range rows { + out = append(out, r.Name) + } + return out +} + +// dropIndexOnTraces removes an index by name, per-driver. GORM's Migrator +// handles SQLite/Postgres/MySQL cleanly; for SQL Server we must qualify the +// DROP with the table name. +func dropIndexOnTraces(db *gorm.DB, driver, name string) error { + switch driver { + case "sqlserver", "mssql": + // T-SQL: DROP INDEX name ON table + return db.Exec(fmt.Sprintf("DROP INDEX [%s] ON [traces]", name)).Error + default: + return db.Migrator().DropIndex(&Trace{}, name) + } +} diff --git a/internal/storage/models.go b/internal/storage/models.go index 8585c61..b00c22e 100644 --- a/internal/storage/models.go +++ b/internal/storage/models.go @@ -84,11 +84,14 @@ const DefaultTenantID = "default" // Index strategy: single-column tenant_id is redundant — every tenant-scoped // query joins tenant_id with another filter (timestamp, service_name). The // leftmost column of a composite index satisfies single-column tenant lookups, -// so we only declare composites. +// so we only declare composites. TraceID uniqueness is scoped to (tenant_id, +// trace_id): distinct tenants may legitimately ingest identical trace_ids +// (RAN-21). The old standalone `uniqueIndex` on trace_id is dropped at +// migration time by dropLegacyTraceIDUniqueIndex. type Trace struct { ID uint `gorm:"primaryKey" json:"id"` - TenantID string `gorm:"size:64;default:'default';not null;index:idx_traces_tenant_ts,priority:1;index:idx_traces_tenant_service,priority:1" json:"tenant_id"` - TraceID string `gorm:"uniqueIndex;size:32;not null" json:"trace_id"` + TenantID string `gorm:"size:64;default:'default';not null;index:idx_traces_tenant_ts,priority:1;index:idx_traces_tenant_service,priority:1;uniqueIndex:idx_traces_tenant_trace_id,priority:1" json:"tenant_id"` + TraceID string `gorm:"size:32;not null;uniqueIndex:idx_traces_tenant_trace_id,priority:2" json:"trace_id"` ServiceName string `gorm:"size:255;index:idx_traces_tenant_service,priority:2" json:"service_name"` Duration int64 `gorm:"index" json:"duration"` // Microseconds DurationMs float64 `gorm:"-" json:"duration_ms"` diff --git a/internal/storage/trace_repo.go b/internal/storage/trace_repo.go index 65a541c..05eaf61 100644 --- a/internal/storage/trace_repo.go +++ b/internal/storage/trace_repo.go @@ -56,6 +56,9 @@ func (r *Repository) BatchCreateSpans(spans []Span) error { } // BatchCreateTraces inserts traces, skipping duplicates. +// Duplicate is defined per the composite uniqueIndex idx_traces_tenant_trace_id +// on (tenant_id, trace_id): a trace_id clash within the same tenant is ignored, +// while the same trace_id under a different tenant inserts cleanly. func (r *Repository) BatchCreateTraces(traces []Trace) error { if len(traces) == 0 { return nil @@ -67,6 +70,8 @@ func (r *Repository) BatchCreateTraces(traces []Trace) error { } // CreateTrace inserts a new trace, skipping if it already exists. +// Uniqueness is per idx_traces_tenant_trace_id (tenant_id, trace_id), so the +// same trace_id across tenants is allowed. func (r *Repository) CreateTrace(trace Trace) error { if strings.ToLower(r.driver) == "mysql" { return r.db.Clauses(clause.Insert{Modifier: "IGNORE"}).Create(&trace).Error @@ -75,8 +80,9 @@ func (r *Repository) CreateTrace(trace Trace) error { } // GetTrace returns a trace by ID with its spans and logs, scoped to the tenant on ctx. -// The Preloaded Spans and Logs are additionally filtered so a trace ID collision -// across tenants cannot leak cross-tenant children. +// Trace uniqueness is composite (tenant_id, trace_id), so the same trace_id can +// legitimately exist in multiple tenants; the Preloaded Spans and Logs are +// filtered by tenant_id as defense-in-depth against cross-tenant child leakage. func (r *Repository) GetTrace(ctx context.Context, traceID string) (*Trace, error) { tenant := TenantFromContext(ctx) var trace Trace diff --git a/internal/storage/trace_tenant_test.go b/internal/storage/trace_tenant_test.go new file mode 100644 index 0000000..103603a --- /dev/null +++ b/internal/storage/trace_tenant_test.go @@ -0,0 +1,232 @@ +package storage + +import ( + "sort" + "testing" + "time" +) + +// TestCreateTrace_SameTraceIDAcrossTenants_Succeeds proves that identical +// trace_ids under distinct tenants coexist after RAN-21. Before the fix the +// second insert silently collapsed into a no-op via OnConflict.DoNothing +// because the old standalone uniqueIndex on trace_id matched regardless of +// tenant. +func TestCreateTrace_SameTraceIDAcrossTenants_Succeeds(t *testing.T) { + repo := newTestRepo(t) + now := time.Now().UTC() + traceID := "shared-trace-id-0001" + + acme := Trace{TenantID: "acme", TraceID: traceID, ServiceName: "svc-a", Status: "OK", Timestamp: now} + beta := Trace{TenantID: "beta", TraceID: traceID, ServiceName: "svc-b", Status: "OK", Timestamp: now} + + if err := repo.CreateTrace(acme); err != nil { + t.Fatalf("CreateTrace(acme): %v", err) + } + if err := repo.CreateTrace(beta); err != nil { + t.Fatalf("CreateTrace(beta): %v", err) + } + + var rows []Trace + if err := repo.db.Where("trace_id = ?", traceID).Find(&rows).Error; err != nil { + t.Fatalf("Find: %v", err) + } + if got, want := len(rows), 2; got != want { + t.Fatalf("expected %d trace rows for shared trace_id, got %d", want, got) + } + seenTenants := map[string]string{} + for _, r := range rows { + seenTenants[r.TenantID] = r.ServiceName + } + if seenTenants["acme"] != "svc-a" || seenTenants["beta"] != "svc-b" { + t.Fatalf("tenant→service mismatch: %+v", seenTenants) + } + + acmeCtx := WithTenantContext(t.Context(), "acme") + betaCtx := WithTenantContext(t.Context(), "beta") + got, err := repo.GetTrace(acmeCtx, traceID) + if err != nil { + t.Fatalf("GetTrace(acme): %v", err) + } + if got.ServiceName != "svc-a" || got.TenantID != "acme" { + t.Fatalf("GetTrace(acme) returned wrong row: %+v", got) + } + got, err = repo.GetTrace(betaCtx, traceID) + if err != nil { + t.Fatalf("GetTrace(beta): %v", err) + } + if got.ServiceName != "svc-b" || got.TenantID != "beta" { + t.Fatalf("GetTrace(beta) returned wrong row: %+v", got) + } +} + +// TestCreateTrace_SameTraceIDSameTenant_IsIgnored proves the intra-tenant +// dedupe behaviour survives the composite-uniqueness change. Ingestion retries +// under the same tenant must still be absorbed by OnConflict.DoNothing. +func TestCreateTrace_SameTraceIDSameTenant_IsIgnored(t *testing.T) { + repo := newTestRepo(t) + now := time.Now().UTC() + traceID := "same-tenant-trace-0001" + + first := Trace{TenantID: "acme", TraceID: traceID, ServiceName: "svc-a", Status: "OK", Timestamp: now} + dup := Trace{TenantID: "acme", TraceID: traceID, ServiceName: "svc-a-renamed", Status: "OK", Timestamp: now.Add(time.Second)} + + if err := repo.CreateTrace(first); err != nil { + t.Fatalf("CreateTrace first: %v", err) + } + if err := repo.CreateTrace(dup); err != nil { + t.Fatalf("CreateTrace dup: %v", err) + } + + var rows []Trace + if err := repo.db.Where("tenant_id = ? AND trace_id = ?", "acme", traceID).Find(&rows).Error; err != nil { + t.Fatalf("Find: %v", err) + } + if got, want := len(rows), 1; got != want { + t.Fatalf("expected intra-tenant dedupe to keep %d row, got %d", want, got) + } + if rows[0].ServiceName != "svc-a" { + t.Fatalf("expected first-write-wins semantics, got %q", rows[0].ServiceName) + } +} + +// TestBatchCreateTraces_SameTraceIDAcrossTenants_Succeeds exercises the batch +// path, which uses the same OnConflict.DoNothing / INSERT IGNORE plumbing. +func TestBatchCreateTraces_SameTraceIDAcrossTenants_Succeeds(t *testing.T) { + repo := newTestRepo(t) + now := time.Now().UTC() + traceID := "batch-shared-0001" + + batch := []Trace{ + {TenantID: "acme", TraceID: traceID, ServiceName: "svc-a", Status: "OK", Timestamp: now}, + {TenantID: "beta", TraceID: traceID, ServiceName: "svc-b", Status: "OK", Timestamp: now}, + } + if err := repo.BatchCreateTraces(batch); err != nil { + t.Fatalf("BatchCreateTraces: %v", err) + } + + var rows []Trace + if err := repo.db.Where("trace_id = ?", traceID).Order("tenant_id ASC").Find(&rows).Error; err != nil { + t.Fatalf("Find: %v", err) + } + if got, want := len(rows), 2; got != want { + t.Fatalf("expected %d rows, got %d", want, got) + } + if rows[0].TenantID != "acme" || rows[1].TenantID != "beta" { + t.Fatalf("unexpected tenant ordering: %v", []string{rows[0].TenantID, rows[1].TenantID}) + } +} + +// TestGetTrace_IsolatesChildrenAcrossTenants confirms that when the same +// trace_id exists in two tenants, GetTrace returns each tenant's own spans +// and logs — no cross-tenant children leak through the Preload filter. +func TestGetTrace_IsolatesChildrenAcrossTenants(t *testing.T) { + repo := newTestRepo(t) + now := time.Now().UTC() + traceID := "preload-shared-0001" + + // Both tenants share trace_id but have distinct child payloads. + if err := repo.db.Create(&Trace{TenantID: "acme", TraceID: traceID, ServiceName: "svc-a", Status: "OK", Timestamp: now}).Error; err != nil { + t.Fatalf("Create acme trace: %v", err) + } + if err := repo.db.Create(&Trace{TenantID: "beta", TraceID: traceID, ServiceName: "svc-b", Status: "OK", Timestamp: now}).Error; err != nil { + t.Fatalf("Create beta trace: %v", err) + } + if err := repo.db.Create(&Span{TenantID: "acme", TraceID: traceID, SpanID: "acme-span", OperationName: "op-a", ServiceName: "svc-a", StartTime: now, EndTime: now}).Error; err != nil { + t.Fatalf("Create acme span: %v", err) + } + if err := repo.db.Create(&Span{TenantID: "beta", TraceID: traceID, SpanID: "beta-span", OperationName: "op-b", ServiceName: "svc-b", StartTime: now, EndTime: now}).Error; err != nil { + t.Fatalf("Create beta span: %v", err) + } + if err := repo.db.Create(&Log{TenantID: "acme", TraceID: traceID, SpanID: "acme-span", Severity: "INFO", Body: "acme log", ServiceName: "svc-a", Timestamp: now}).Error; err != nil { + t.Fatalf("Create acme log: %v", err) + } + if err := repo.db.Create(&Log{TenantID: "beta", TraceID: traceID, SpanID: "beta-span", Severity: "INFO", Body: "beta log", ServiceName: "svc-b", Timestamp: now}).Error; err != nil { + t.Fatalf("Create beta log: %v", err) + } + + cases := []struct { + tenant string + wantSvc string + wantOp string + wantLog string + }{ + {"acme", "svc-a", "op-a", "acme log"}, + {"beta", "svc-b", "op-b", "beta log"}, + } + for _, tc := range cases { + ctx := WithTenantContext(t.Context(), tc.tenant) + tr, err := repo.GetTrace(ctx, traceID) + if err != nil { + t.Fatalf("GetTrace(%s): %v", tc.tenant, err) + } + if tr.ServiceName != tc.wantSvc { + t.Fatalf("%s: wrong trace row: svc=%q", tc.tenant, tr.ServiceName) + } + if len(tr.Spans) != 1 || tr.Spans[0].OperationName != tc.wantOp { + names := []string{} + for _, s := range tr.Spans { + names = append(names, s.OperationName) + } + sort.Strings(names) + t.Fatalf("%s: span leak — got ops %v, want [%s]", tc.tenant, names, tc.wantOp) + } + if len(tr.Logs) != 1 || tr.Logs[0].Body != tc.wantLog { + bodies := []string{} + for _, l := range tr.Logs { + bodies = append(bodies, l.Body) + } + sort.Strings(bodies) + t.Fatalf("%s: log leak — got bodies %v, want [%s]", tc.tenant, bodies, tc.wantLog) + } + } +} + +// TestAutoMigrateModels_DropsLegacyTraceIDUniqueIndex proves the idempotent +// migration hook retires a pre-RAN-21 standalone unique index on +// traces.trace_id. Simulates an upgrade by creating the legacy index +// out-of-band and rerunning AutoMigrateModels. +func TestAutoMigrateModels_DropsLegacyTraceIDUniqueIndex(t *testing.T) { + repo := newTestRepo(t) + + // Install a legacy-shaped standalone unique index on traces.trace_id. + const legacyIdx = "idx_legacy_traces_trace_id" + if err := repo.db.Exec("CREATE UNIQUE INDEX " + legacyIdx + " ON traces(trace_id)").Error; err != nil { + t.Fatalf("create legacy index: %v", err) + } + if !repo.db.Migrator().HasIndex(&Trace{}, legacyIdx) { + t.Fatal("precondition: legacy index should exist") + } + + // Rerun migration — the hook must drop the legacy index. + if err := AutoMigrateModels(repo.db, "sqlite"); err != nil { + t.Fatalf("AutoMigrateModels: %v", err) + } + if repo.db.Migrator().HasIndex(&Trace{}, legacyIdx) { + t.Fatal("legacy standalone unique index on traces.trace_id was not dropped") + } + + // Cross-tenant reuse must now succeed end-to-end. + now := time.Now().UTC() + if err := repo.CreateTrace(Trace{TenantID: "acme", TraceID: "after-drop", ServiceName: "svc", Timestamp: now}); err != nil { + t.Fatalf("CreateTrace acme: %v", err) + } + if err := repo.CreateTrace(Trace{TenantID: "beta", TraceID: "after-drop", ServiceName: "svc", Timestamp: now}); err != nil { + t.Fatalf("CreateTrace beta: %v", err) + } + var n int64 + if err := repo.db.Model(&Trace{}).Where("trace_id = ?", "after-drop").Count(&n).Error; err != nil { + t.Fatalf("count: %v", err) + } + if n != 2 { + t.Fatalf("expected 2 rows across tenants, got %d", n) + } +} + +// TestAutoMigrateModels_CreatesCompositeTraceIDUniqueIndex asserts the new +// composite index landed with the expected name after a fresh migration. +func TestAutoMigrateModels_CreatesCompositeTraceIDUniqueIndex(t *testing.T) { + repo := newTestRepo(t) + if !repo.db.Migrator().HasIndex(&Trace{}, "idx_traces_tenant_trace_id") { + t.Fatal("expected composite uniqueIndex idx_traces_tenant_trace_id on traces") + } +}