diff --git a/internal/graphrag/builder.go b/internal/graphrag/builder.go index 143842b..5653463 100644 --- a/internal/graphrag/builder.go +++ b/internal/graphrag/builder.go @@ -238,8 +238,13 @@ func New(repo *storage.Repository, vectorIdx *vectordb.Index, tsdbAgg *tsdb.Aggr // Restore persisted Drain templates so log clustering survives restarts. // A missing table (fresh install) or transient DB error is non-fatal — // ingestion will rebuild templates from scratch. + // + // The Drain miner is currently a single shared instance, so we treat its + // learned templates as belonging to DefaultTenantID. The persistence layer + // is already keyed by (tenant_id, id) so a future per-tenant Drain miner + // can load each tenant's slice without colliding cluster IDs. if repo != nil && repo.DB() != nil { - if tpls, err := LoadDrainTemplates(repo.DB()); err != nil { + if tpls, err := LoadDrainTemplates(repo.DB(), storage.DefaultTenantID); err != nil { slog.Info("GraphRAG: drain template restore skipped", "reason", err) } else if len(tpls) > 0 { g.drain.LoadTemplates(tpls) @@ -294,7 +299,7 @@ func (g *GraphRAG) Stop() { // Best-effort final Drain template persistence — losing the most recent // updates on an unclean shutdown would force rebuilding from scratch. if g.repo != nil && g.repo.DB() != nil && g.drain != nil { - if err := SaveDrainTemplates(g.repo.DB(), g.drain.Templates()); err != nil { + if err := SaveDrainTemplates(g.repo.DB(), storage.DefaultTenantID, g.drain.Templates()); err != nil { slog.Warn("GraphRAG: final drain template save failed", "error", err) } } diff --git a/internal/graphrag/drain.go b/internal/graphrag/drain.go index c357ee3..90ad1ac 100644 --- a/internal/graphrag/drain.go +++ b/internal/graphrag/drain.go @@ -33,6 +33,7 @@ import ( "sync" "time" + "github.com/RandomCodeSpace/otelcontext/internal/storage" "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -504,13 +505,20 @@ func (d *Drain) Len() int { // --- Persistence --- // SaveDrainTemplates upserts the given templates into the drain_templates -// table. Tokens are JSON-encoded. Uses GORM's clause.OnConflict upsert which -// is dialect-aware (ON CONFLICT for SQLite/PostgreSQL, ON DUPLICATE KEY -// UPDATE for MySQL). -func SaveDrainTemplates(db *gorm.DB, templates []Template) error { +// table under the supplied tenant. Tokens are JSON-encoded. Uses GORM's +// clause.OnConflict upsert which is dialect-aware (ON CONFLICT for SQLite/ +// PostgreSQL, ON DUPLICATE KEY UPDATE for MySQL). +// +// The conflict target is the composite (tenant_id, id) primary key so the +// same Drain template hash can coexist across tenants — a future per-tenant +// Drain miner can rely on this to keep cluster IDs stable per tenant. +func SaveDrainTemplates(db *gorm.DB, tenant string, templates []Template) error { if db == nil || len(templates) == 0 { return nil } + if tenant == "" { + tenant = storage.DefaultTenantID + } rows := make([]DrainTemplateRow, 0, len(templates)) for _, t := range templates { tokensJSON, err := json.Marshal(t.Tokens) @@ -518,6 +526,7 @@ func SaveDrainTemplates(db *gorm.DB, templates []Template) error { return fmt.Errorf("marshal drain tokens id=%d: %w", t.ID, err) } rows = append(rows, DrainTemplateRow{ + TenantID: tenant, ID: int64(t.ID), //nolint:gosec // intentional bit-reinterpret of FNV-64 hash for DB portability Tokens: string(tokensJSON), Count: t.Count, @@ -527,22 +536,25 @@ func SaveDrainTemplates(db *gorm.DB, templates []Template) error { }) } return db.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "id"}}, + Columns: []clause.Column{{Name: "tenant_id"}, {Name: "id"}}, DoUpdates: clause.AssignmentColumns([]string{ "tokens", "count", "last_seen", "sample", }), }).CreateInBatches(&rows, 500).Error } -// LoadDrainTemplates reads all persisted Drain templates from the DB and -// returns them in a format ready to pass to Drain.LoadTemplates. Returns an -// empty slice (and nil error) if the table is empty. -func LoadDrainTemplates(db *gorm.DB) ([]Template, error) { +// LoadDrainTemplates reads persisted Drain templates for the supplied tenant +// and returns them in a format ready to pass to Drain.LoadTemplates. Returns +// an empty slice (and nil error) if no rows match. +func LoadDrainTemplates(db *gorm.DB, tenant string) ([]Template, error) { if db == nil { return nil, nil } + if tenant == "" { + tenant = storage.DefaultTenantID + } var rows []DrainTemplateRow - if err := db.Find(&rows).Error; err != nil { + if err := db.Where("tenant_id = ?", tenant).Find(&rows).Error; err != nil { return nil, err } out := make([]Template, 0, len(rows)) diff --git a/internal/graphrag/drain_persist_test.go b/internal/graphrag/drain_persist_test.go index dd573c5..49a8633 100644 --- a/internal/graphrag/drain_persist_test.go +++ b/internal/graphrag/drain_persist_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/RandomCodeSpace/otelcontext/internal/storage" "github.com/glebarez/sqlite" "gorm.io/gorm" ) @@ -50,12 +51,12 @@ func TestDrainPersistence_RoundTrip(t *testing.T) { } // Persist. - if err := SaveDrainTemplates(db, d1.Templates()); err != nil { + if err := SaveDrainTemplates(db, storage.DefaultTenantID, d1.Templates()); err != nil { t.Fatalf("SaveDrainTemplates: %v", err) } // Reload. - loaded, err := LoadDrainTemplates(db) + loaded, err := LoadDrainTemplates(db, storage.DefaultTenantID) if err != nil { t.Fatalf("LoadDrainTemplates: %v", err) } @@ -91,7 +92,7 @@ func TestDrainPersistence_Upsert(t *testing.T) { for i := 0; i < 10; i++ { d.Match(fmt.Sprintf("upsert kind %c event", 'a'+byte(i)), t0) } - if err := SaveDrainTemplates(db, d.Templates()); err != nil { + if err := SaveDrainTemplates(db, storage.DefaultTenantID, d.Templates()); err != nil { t.Fatalf("first save: %v", err) } @@ -108,7 +109,7 @@ func TestDrainPersistence_Upsert(t *testing.T) { for i := 0; i < 10; i++ { d.Match(fmt.Sprintf("upsert kind %c event", 'a'+byte(i)), t1) } - if err := SaveDrainTemplates(db, d.Templates()); err != nil { + if err := SaveDrainTemplates(db, storage.DefaultTenantID, d.Templates()); err != nil { t.Fatalf("second save: %v", err) } @@ -139,7 +140,7 @@ func TestDrainPersistence_Upsert(t *testing.T) { // returns an empty slice and nil error (fresh-install path). func TestDrainPersistence_EmptyDB(t *testing.T) { db := newTestDrainDB(t) - tpls, err := LoadDrainTemplates(db) + tpls, err := LoadDrainTemplates(db, storage.DefaultTenantID) if err != nil { t.Fatalf("LoadDrainTemplates on empty table: %v", err) } @@ -148,7 +149,7 @@ func TestDrainPersistence_EmptyDB(t *testing.T) { } // Calling with nil DB is also safe. - tpls, err = LoadDrainTemplates(nil) + tpls, err = LoadDrainTemplates(nil, storage.DefaultTenantID) if err != nil { t.Fatalf("LoadDrainTemplates(nil): %v", err) } @@ -157,7 +158,7 @@ func TestDrainPersistence_EmptyDB(t *testing.T) { } // Saving an empty slice is a no-op (no error, no rows). - if err := SaveDrainTemplates(db, nil); err != nil { + if err := SaveDrainTemplates(db, storage.DefaultTenantID, nil); err != nil { t.Fatalf("SaveDrainTemplates(nil): %v", err) } var cnt int64 diff --git a/internal/graphrag/investigation.go b/internal/graphrag/investigation.go index e3dbd31..7aa45d8 100644 --- a/internal/graphrag/investigation.go +++ b/internal/graphrag/investigation.go @@ -10,7 +10,6 @@ import ( "time" "github.com/RandomCodeSpace/otelcontext/internal/storage" - "gorm.io/gorm" ) // investigationCooldown suppresses repeated PersistInvestigation calls with @@ -43,13 +42,14 @@ func (c *investigationCooldown) allow(key string, now time.Time) bool { } // cooldownKey builds a case- and whitespace-insensitive key from the tuple -// (trigger_service, root_service, root_operation). Service names emitted -// from different instrumentations occasionally differ in casing or have -// trailing whitespace; canonicalizing here prevents those variants from -// bypassing the cooldown guard. -func cooldownKey(triggerService, rootService, rootOperation string) string { +// (tenant, trigger_service, root_service, root_operation). Tenant scopes the +// guard so an error in tenant-A doesn't suppress the same error pattern in +// tenant-B. Service names emitted from different instrumentations occasionally +// differ in casing or have trailing whitespace; canonicalizing here prevents +// those variants from bypassing the cooldown guard. +func cooldownKey(tenant, triggerService, rootService, rootOperation string) string { norm := func(s string) string { return strings.ToLower(strings.TrimSpace(s)) } - return norm(triggerService) + "|" + norm(rootService) + "|" + norm(rootOperation) + return norm(tenant) + "|" + norm(triggerService) + "|" + norm(rootService) + "|" + norm(rootOperation) } // prune drops entries older than cutoff to bound map size. Called from @@ -65,9 +65,14 @@ func (c *investigationCooldown) prune(cutoff time.Time) { } // Investigation is a persisted record of an automated error investigation. +// +// TenantID scopes the row to its originating tenant. The composite +// (tenant_id, created_at) index supports the recency-ordered "investigations +// for tenant X" query that GetInvestigations runs on every read. type Investigation struct { + TenantID string `gorm:"size:64;default:'default';not null;index:idx_investigations_tenant_created,priority:1" json:"tenant_id"` ID string `gorm:"primaryKey;size:64" json:"id"` - CreatedAt time.Time `json:"created_at"` + CreatedAt time.Time `gorm:"index:idx_investigations_tenant_created,priority:2" json:"created_at"` Status string `gorm:"size:20" json:"status"` // detected, triaged, resolved Severity string `gorm:"size:20" json:"severity"` // critical, warning, info TriggerService string `gorm:"size:255;index" json:"trigger_service"` @@ -88,20 +93,17 @@ func (Investigation) TableName() string { return "investigations" } -// AutoMigrateGraphRAG runs GORM auto-migration for GraphRAG models. -func AutoMigrateGraphRAG(db *gorm.DB) error { - return db.AutoMigrate(&Investigation{}, &GraphSnapshot{}, &DrainTemplateRow{}) -} - // PersistInvestigation saves an investigation record from an error chain // analysis. Tenant is accepted explicitly so the caller (the per-tenant -// anomaly loop) can re-enter ImpactAnalysis on the correct tenant slice. The -// `investigations` table itself does not yet carry a tenant_id column — -// Subtask B (RAN-38) owns that migration. +// anomaly loop) can re-enter ImpactAnalysis on the correct tenant slice and so +// the persisted row carries its originating tenant_id. func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains []ErrorChainResult, anomalies []*AnomalyNode) { if len(chains) == 0 { return } + if tenant == "" { + tenant = storage.DefaultTenantID + } firstChain := chains[0] if firstChain.RootCause == nil { @@ -111,10 +113,12 @@ func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains [] now := time.Now() // Cooldown: suppress repeat investigations for the same - // (trigger_service, root_service, root_operation) inside a sliding window. - // Keys are canonicalized (lower + trim) so "Orders" and "orders " share a - // bucket — otherwise trivial casing differences would bypass the guard. - key := cooldownKey(triggerService, firstChain.RootCause.Service, firstChain.RootCause.Operation) + // (tenant, trigger_service, root_service, root_operation) inside a sliding + // window. Keys are canonicalized (lower + trim) so "Orders" and "orders " + // share a bucket — otherwise trivial casing differences would bypass the + // guard. Tenant scoping prevents an error in one tenant from suppressing + // the same pattern in another. + key := cooldownKey(tenant, triggerService, firstChain.RootCause.Service, firstChain.RootCause.Operation) if g.invCooldown != nil && !g.invCooldown.allow(key, now) { return } @@ -180,6 +184,7 @@ func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains [] } inv := Investigation{ + TenantID: tenant, ID: id, CreatedAt: now, Status: "detected", @@ -202,15 +207,17 @@ func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains [] return } if err := g.repo.DB().Create(&inv).Error; err != nil { - slog.Error("Failed to persist investigation", "error", err) + slog.Error("Failed to persist investigation", "tenant", tenant, "error", err) return } - slog.Info("Investigation persisted", "id", id, "service", triggerService, "severity", severity) + slog.Info("Investigation persisted", "id", id, "tenant", tenant, "service", triggerService, "severity", severity) } -// GetInvestigations queries persisted investigations. -func (g *GraphRAG) GetInvestigations(service, severity, status string, limit int) ([]Investigation, error) { +// GetInvestigations queries persisted investigations scoped to the tenant +// carried by ctx. The composite (tenant_id, created_at) index supports the +// recency-ordered scan. +func (g *GraphRAG) GetInvestigations(ctx context.Context, service, severity, status string, limit int) ([]Investigation, error) { if limit <= 0 { limit = 20 } @@ -218,7 +225,12 @@ func (g *GraphRAG) GetInvestigations(service, severity, status string, limit int limit = 100 } - db := g.repo.DB().Model(&Investigation{}).Order("created_at DESC").Limit(limit) + tenant := storage.TenantFromContext(ctx) + db := g.repo.DB(). + Model(&Investigation{}). + Where("tenant_id = ?", tenant). + Order("created_at DESC"). + Limit(limit) if service != "" { db = db.Where("trigger_service = ? OR root_service = ?", service, service) } @@ -236,10 +248,13 @@ func (g *GraphRAG) GetInvestigations(service, severity, status string, limit int return investigations, nil } -// GetInvestigation retrieves a single investigation by ID. -func (g *GraphRAG) GetInvestigation(id string) (*Investigation, error) { +// GetInvestigation retrieves a single investigation by ID, scoped to the +// tenant carried by ctx. Returning ErrRecordNotFound for cross-tenant lookups +// prevents id-guessing from leaking another tenant's row. +func (g *GraphRAG) GetInvestigation(ctx context.Context, id string) (*Investigation, error) { + tenant := storage.TenantFromContext(ctx) var inv Investigation - if err := g.repo.DB().Where("id = ?", id).First(&inv).Error; err != nil { + if err := g.repo.DB().Where("tenant_id = ? AND id = ?", tenant, id).First(&inv).Error; err != nil { return nil, err } return &inv, nil diff --git a/internal/graphrag/investigation_cooldown_test.go b/internal/graphrag/investigation_cooldown_test.go index 1e36eeb..fdfce6f 100644 --- a/internal/graphrag/investigation_cooldown_test.go +++ b/internal/graphrag/investigation_cooldown_test.go @@ -46,18 +46,30 @@ func TestPersistInvestigation_Cooldown(t *testing.T) { } // TestCooldownKey_Canonical verifies the key normalizes case and trims -// whitespace so "Orders" / "orders " / "ORDERS" land in the same bucket. +// whitespace so "Orders" / "orders " / "ORDERS" land in the same bucket +// within a tenant. func TestCooldownKey_Canonical(t *testing.T) { - cases := [][3]string{ - {"orders", "orders", "op"}, - {"Orders", "ORDERS", "op"}, - {" orders ", "orders", " op "}, - {"ORDERS", "Orders ", "OP"}, + cases := [][4]string{ + {"acme", "orders", "orders", "op"}, + {"Acme", "Orders", "ORDERS", "op"}, + {" acme ", " orders ", "orders", " op "}, + {"ACME", "ORDERS", "Orders ", "OP"}, } - want := cooldownKey(cases[0][0], cases[0][1], cases[0][2]) + want := cooldownKey(cases[0][0], cases[0][1], cases[0][2], cases[0][3]) for _, c := range cases[1:] { - if got := cooldownKey(c[0], c[1], c[2]); got != want { + if got := cooldownKey(c[0], c[1], c[2], c[3]); got != want { t.Errorf("cooldownKey%v = %q, want %q", c, got, want) } } } + +// TestCooldownKey_TenantIsolated asserts that two tenants emitting the same +// (trigger, root, op) tuple produce distinct cooldown keys, so an error in +// tenant-A doesn't suppress the same pattern in tenant-B. +func TestCooldownKey_TenantIsolated(t *testing.T) { + a := cooldownKey("tenant-a", "orders", "orders", "op") + b := cooldownKey("tenant-b", "orders", "orders", "op") + if a == b { + t.Fatalf("tenant scoping missing: tenant-a and tenant-b share key %q", a) + } +} diff --git a/internal/graphrag/migrate.go b/internal/graphrag/migrate.go new file mode 100644 index 0000000..133593c --- /dev/null +++ b/internal/graphrag/migrate.go @@ -0,0 +1,213 @@ +package graphrag + +// GraphRAG persistence migrations. +// +// AutoMigrateGraphRAG runs the standard GORM AutoMigrate over the three +// persisted GraphRAG models, then performs two idempotent post-migration +// passes that prepare older databases for tenant-scoped reads: +// +// 1. backfillTenantIDs — sets tenant_id = DefaultTenantID on rows that pre-date +// the column being added (or that somehow ended up empty). New rows always +// receive the column default at insert time. +// +// 2. ensureDrainTemplatesCompositePK — promotes drain_templates' single-column +// primary key (id) to the composite (tenant_id, id) when an older schema +// is detected. The composite PK matters because the same Drain template +// hash can legitimately recur across tenants once the in-memory miner is +// partitioned per tenant; without it the second tenant's row would collide +// on insert. +// +// Both passes are safe to call repeatedly. SQLite and PostgreSQL are +// supported explicitly; other dialects skip the PK promotion and log so an +// operator can apply the equivalent DDL by hand. + +import ( + "errors" + "fmt" + "log/slog" + + "github.com/RandomCodeSpace/otelcontext/internal/storage" + "gorm.io/gorm" +) + +// graphRAGTables are the three persisted tables that carry tenant_id after +// RAN-38. Order matches AutoMigrate order so log lines line up. +var graphRAGTables = []string{"investigations", "graph_snapshots", "drain_templates"} + +// AutoMigrateGraphRAG runs GORM auto-migration for GraphRAG models and +// applies tenant backfill + drain_templates composite-PK promotion. Safe to +// call repeatedly. +func AutoMigrateGraphRAG(db *gorm.DB) error { + if db == nil { + return nil + } + if err := db.AutoMigrate(&Investigation{}, &GraphSnapshot{}, &DrainTemplateRow{}); err != nil { + return fmt.Errorf("graphrag automigrate: %w", err) + } + if err := backfillTenantIDs(db); err != nil { + return fmt.Errorf("graphrag tenant backfill: %w", err) + } + if err := ensureDrainTemplatesCompositePK(db); err != nil { + // Non-fatal: new writes still carry tenant_id; only existing rows + // retain the legacy single-column PK. Surfaced as a warn so an + // operator can intervene on dialects we don't auto-migrate. + slog.Warn("graphrag: drain_templates composite PK promotion skipped", "error", err) + } + return nil +} + +// backfillTenantIDs sets tenant_id = DefaultTenantID for any row in the three +// GraphRAG tables that ended up with NULL or empty tenant_id. AutoMigrate +// already supplies a column default for new inserts; this pass covers the +// "added the column to a populated table" path on dialects that don't +// retroactively backfill via DEFAULT (and serves as belt-and-braces on those +// that do). +func backfillTenantIDs(db *gorm.DB) error { + for _, tbl := range graphRAGTables { + // Only touch tables that exist — fresh installs may race with a + // half-migrated DB on first boot. HasTable is dialect-aware and + // returns false rather than erroring on missing tables. + if !db.Migrator().HasTable(tbl) { + continue + } + sql := fmt.Sprintf(`UPDATE %s SET tenant_id = ? WHERE tenant_id IS NULL OR tenant_id = ''`, tbl) //nolint:gosec // table name is from a fixed allow-list + if err := db.Exec(sql, storage.DefaultTenantID).Error; err != nil { + return fmt.Errorf("backfill %s: %w", tbl, err) + } + } + return nil +} + +// ensureDrainTemplatesCompositePK promotes drain_templates' primary key from +// (id) to (tenant_id, id) on existing databases. Fresh installs already get +// the composite PK from AutoMigrate. On dialects this function does not +// understand it returns nil so AutoMigrateGraphRAG can move on. +func ensureDrainTemplatesCompositePK(db *gorm.DB) error { + if db == nil || !db.Migrator().HasTable("drain_templates") { + return nil + } + switch db.Dialector.Name() { + case "sqlite": + return ensureDrainPKSQLite(db) + case "postgres": + return ensureDrainPKPostgres(db) + default: + // MySQL / MSSQL aren't covered by this ticket. Log so an operator + // running those dialects can apply the equivalent DDL by hand. + slog.Info("graphrag: drain_templates PK promotion skipped — unsupported dialect", + "dialect", db.Dialector.Name()) + return nil + } +} + +// ensureDrainPKSQLite uses PRAGMA table_info to detect whether tenant_id is +// already part of the primary key. If not, it rebuilds the table via the +// canonical SQLite recipe (rename → create → copy → drop) inside a +// transaction. CreateTable here mirrors whatever GORM would build for a fresh +// install, so we don't have to hand-maintain a parallel CREATE TABLE. +func ensureDrainPKSQLite(db *gorm.DB) error { + type pragmaCol struct { + Cid int `gorm:"column:cid"` + Name string `gorm:"column:name"` + Type string `gorm:"column:type"` + Notnull int `gorm:"column:notnull"` + DfltValue any `gorm:"column:dflt_value"` + Pk int `gorm:"column:pk"` + } + var cols []pragmaCol + if err := db.Raw(`PRAGMA table_info('drain_templates')`).Scan(&cols).Error; err != nil { + return fmt.Errorf("pragma table_info: %w", err) + } + if len(cols) == 0 { + // Table doesn't exist; AutoMigrate would have created it with the + // composite PK already. Nothing to do. + return nil + } + pkCols := map[string]bool{} + for _, c := range cols { + if c.Pk > 0 { + pkCols[c.Name] = true + } + } + if pkCols["tenant_id"] && pkCols["id"] { + return nil // already composite + } + + return db.Transaction(func(tx *gorm.DB) error { + // Drop any leftover scratch from an interrupted earlier attempt. + if err := tx.Exec(`DROP TABLE IF EXISTS drain_templates__legacy`).Error; err != nil { + return fmt.Errorf("drop scratch: %w", err) + } + if err := tx.Exec(`ALTER TABLE drain_templates RENAME TO drain_templates__legacy`).Error; err != nil { + return fmt.Errorf("rename legacy: %w", err) + } + // Recreate with the current GORM schema (composite PK + indexes). + if err := tx.Migrator().CreateTable(&DrainTemplateRow{}); err != nil { + return fmt.Errorf("create new table: %w", err) + } + // Copy rows over, defaulting tenant_id where the legacy table + // didn't carry one (the column may have been added by AutoMigrate + // before this pass ran). + insert := `INSERT INTO drain_templates (tenant_id, id, tokens, count, first_seen, last_seen, sample) +SELECT COALESCE(NULLIF(tenant_id, ''), ?), id, tokens, count, first_seen, last_seen, sample +FROM drain_templates__legacy` + if err := tx.Exec(insert, storage.DefaultTenantID).Error; err != nil { + // Older schemas may not have the tenant_id column at all — fall + // back to a tenant-less SELECT, defaulting every row. + insertFallback := `INSERT INTO drain_templates (tenant_id, id, tokens, count, first_seen, last_seen, sample) +SELECT ?, id, tokens, count, first_seen, last_seen, sample +FROM drain_templates__legacy` + if err2 := tx.Exec(insertFallback, storage.DefaultTenantID).Error; err2 != nil { + return fmt.Errorf("copy rows: %w (fallback: %w)", err, err2) + } + } + if err := tx.Exec(`DROP TABLE drain_templates__legacy`).Error; err != nil { + return fmt.Errorf("drop legacy: %w", err) + } + slog.Info("graphrag: promoted drain_templates PK to (tenant_id, id) on SQLite") + return nil + }) +} + +// ensureDrainPKPostgres queries pg_index for the current PK column set and, +// when it doesn't already include tenant_id, drops the implicit +// drain_templates_pkey constraint and recreates it as a composite PK on +// (tenant_id, id). DROP/ADD runs inside one statement so the table is never +// without a PK between steps. +func ensureDrainPKPostgres(db *gorm.DB) error { + var pkCols []string + err := db.Raw(` + SELECT a.attname::text + FROM pg_index i + JOIN pg_attribute a + ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) + WHERE i.indrelid = 'drain_templates'::regclass + AND i.indisprimary + `).Scan(&pkCols).Error + if err != nil { + // Most likely the table doesn't exist yet — nothing to migrate. + return nil + } + hasTenant, hasID := false, false + for _, c := range pkCols { + switch c { + case "tenant_id": + hasTenant = true + case "id": + hasID = true + } + } + if hasTenant && hasID { + return nil + } + if !hasID { + // Defensive: an alien schema without `id` in the PK is not something + // this migration should silently overwrite. + return errors.New("drain_templates primary key has unexpected shape; manual migration required") + } + if err := db.Exec(`ALTER TABLE drain_templates DROP CONSTRAINT IF EXISTS drain_templates_pkey, ADD PRIMARY KEY (tenant_id, id)`).Error; err != nil { + return fmt.Errorf("alter pk: %w", err) + } + slog.Info("graphrag: promoted drain_templates PK to (tenant_id, id) on PostgreSQL") + return nil +} diff --git a/internal/graphrag/migrate_test.go b/internal/graphrag/migrate_test.go new file mode 100644 index 0000000..2cc5df8 --- /dev/null +++ b/internal/graphrag/migrate_test.go @@ -0,0 +1,297 @@ +package graphrag + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/RandomCodeSpace/otelcontext/internal/storage" + "github.com/glebarez/sqlite" + "gorm.io/gorm" +) + +// newTestGraphRAGDB stands up an in-memory SQLite DB ready for the GraphRAG +// migrations. Local helper so the migrate tests don't depend on storage's +// _test-only fixtures. +func newTestGraphRAGDB(t *testing.T) *gorm.DB { + t.Helper() + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + return db +} + +// newTestGraphRAGWithDB returns a GraphRAG wired to a real (in-memory SQLite) +// repo so tenant-scoped read tests can exercise the actual GORM query path. +// The returned *gorm.DB is the same handle the GraphRAG uses, suitable for +// seeding rows directly. +func newTestGraphRAGWithDB(t *testing.T) (*GraphRAG, *gorm.DB) { + t.Helper() + db := newTestGraphRAGDB(t) + repo := storage.NewRepositoryFromDB(db, "sqlite") + g := New(repo, nil, nil, nil, DefaultConfig()) + t.Cleanup(func() { g.Stop() }) + return g, db +} + +// TestAutoMigrateGraphRAG_CreatesTenantCompositeIndexes asserts that the +// composite indexes declared on the three persisted GraphRAG models are +// actually materialised on SQLite. Mirrors +// storage.TestAutoMigrate_CreatesTenantCompositeIndexes for the GraphRAG side. +func TestAutoMigrateGraphRAG_CreatesTenantCompositeIndexes(t *testing.T) { + db := newTestGraphRAGDB(t) + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("AutoMigrateGraphRAG: %v", err) + } + expected := []struct { + table string + index string + }{ + {"investigations", "idx_investigations_tenant_created"}, + {"graph_snapshots", "idx_graph_snapshots_tenant_created"}, + } + for _, tc := range expected { + var count int + if err := db.Raw( + "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND tbl_name=? AND name=?", + tc.table, tc.index, + ).Scan(&count).Error; err != nil { + t.Fatalf("sqlite_master query for %s.%s: %v", tc.table, tc.index, err) + } + if count != 1 { + t.Errorf("expected composite index %s on table %s (count=%d)", tc.index, tc.table, count) + } + } +} + +// TestAutoMigrateGraphRAG_DrainTemplatesCompositePK asserts the PK on +// drain_templates is the composite (tenant_id, id) on a fresh install. +// Without this the same Drain template hash from two tenants would collide +// once a per-tenant Drain miner ships. +func TestAutoMigrateGraphRAG_DrainTemplatesCompositePK(t *testing.T) { + db := newTestGraphRAGDB(t) + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("AutoMigrateGraphRAG: %v", err) + } + type col struct { + Name string `gorm:"column:name"` + Pk int `gorm:"column:pk"` + } + var cols []col + if err := db.Raw(`PRAGMA table_info('drain_templates')`).Scan(&cols).Error; err != nil { + t.Fatalf("pragma table_info: %v", err) + } + pk := map[string]bool{} + for _, c := range cols { + if c.Pk > 0 { + pk[c.Name] = true + } + } + if !pk["tenant_id"] || !pk["id"] { + t.Fatalf("drain_templates PK should be composite (tenant_id, id); got %+v", pk) + } +} + +// TestAutoMigrateGraphRAG_IsIdempotent calls AutoMigrateGraphRAG repeatedly +// to assert the backfill UPDATEs and PK-promotion path are safe to re-run. +func TestAutoMigrateGraphRAG_IsIdempotent(t *testing.T) { + db := newTestGraphRAGDB(t) + for i := 0; i < 3; i++ { + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("AutoMigrateGraphRAG pass %d: %v", i, err) + } + } +} + +// TestAutoMigrateGraphRAG_BackfillsLegacyRows pre-populates the three +// GraphRAG tables with rows that have empty tenant_id and asserts the backfill +// pass fills DefaultTenantID on every one. Mimics the upgrade path for an +// existing install whose first boot brings in the tenant_id column. +func TestAutoMigrateGraphRAG_BackfillsLegacyRows(t *testing.T) { + db := newTestGraphRAGDB(t) + // Run once so the tables exist with the new schema. + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("first migrate: %v", err) + } + // Insert rows with empty tenant_id directly via raw SQL — Investigation, + // GraphSnapshot and DrainTemplateRow's GORM defaults would otherwise fill + // the column on insert. + now := time.Now().UTC() + if err := db.Exec(`INSERT INTO investigations (tenant_id, id, created_at, status, severity, trigger_service, trigger_operation, error_message, root_service, root_operation, causal_chain, trace_ids, error_logs, anomalous_metrics, affected_services, span_chain) VALUES ('', 'inv_legacy', ?, 'detected', 'warning', 'svc', 'op', 'boom', 'svc', 'op', '[]', '[]', '[]', '[]', '[]', '[]')`, now).Error; err != nil { + t.Fatalf("seed legacy investigation: %v", err) + } + if err := db.Exec(`INSERT INTO graph_snapshots (tenant_id, id, created_at, nodes, edges, service_count, total_calls, avg_health_score) VALUES ('', 'snap_legacy', ?, '[]', '[]', 0, 0, 0)`, now).Error; err != nil { + t.Fatalf("seed legacy snapshot: %v", err) + } + // Drain rows: tenant_id is part of the PK so we must give it *something* + // — empty string is allowed by SQLite. The backfill is expected to fix it. + if err := db.Exec(`INSERT INTO drain_templates (tenant_id, id, tokens, count, first_seen, last_seen, sample) VALUES ('', 1, '["a","b"]', 1, ?, ?, 'sample')`, now, now).Error; err != nil { + t.Fatalf("seed legacy drain row: %v", err) + } + + // Re-run migration; backfill should populate every empty tenant_id. + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("second migrate: %v", err) + } + + for _, tbl := range graphRAGTables { + var stragglers int + if err := db.Raw(`SELECT COUNT(*) FROM `+tbl+` WHERE tenant_id IS NULL OR tenant_id = ''`).Scan(&stragglers).Error; err != nil { + t.Fatalf("count empty tenant in %s: %v", tbl, err) + } + if stragglers != 0 { + t.Errorf("%s: %d rows still have empty tenant_id after backfill", tbl, stragglers) + } + var defaults int + if err := db.Raw(`SELECT COUNT(*) FROM `+tbl+` WHERE tenant_id = ?`, storage.DefaultTenantID).Scan(&defaults).Error; err != nil { + t.Fatalf("count default tenant in %s: %v", tbl, err) + } + if defaults < 1 { + t.Errorf("%s: backfill produced no DefaultTenantID rows", tbl) + } + } +} + +// TestSaveLoadDrainTemplates_TenantIsolation asserts that Save with one tenant +// and Load with another returns nothing — the per-tenant scoping is enforced +// at the DB layer, not just at the in-memory miner. +func TestSaveLoadDrainTemplates_TenantIsolation(t *testing.T) { + db := newTestGraphRAGDB(t) + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("migrate: %v", err) + } + tplsA := []Template{{ID: 0xA1, Tokens: []string{"a", "b"}, Count: 1, FirstSeen: time.Now(), LastSeen: time.Now()}} + tplsB := []Template{{ID: 0xB2, Tokens: []string{"c", "d"}, Count: 1, FirstSeen: time.Now(), LastSeen: time.Now()}} + // Identical hash across tenants is the interesting case — confirms the + // composite PK lets the same template ID coexist in two tenants. + tplsBSameID := []Template{{ID: 0xA1, Tokens: []string{"x", "y"}, Count: 1, FirstSeen: time.Now(), LastSeen: time.Now()}} + + if err := SaveDrainTemplates(db, "tenant-a", tplsA); err != nil { + t.Fatalf("save tenant-a: %v", err) + } + if err := SaveDrainTemplates(db, "tenant-b", tplsB); err != nil { + t.Fatalf("save tenant-b: %v", err) + } + if err := SaveDrainTemplates(db, "tenant-b", tplsBSameID); err != nil { + t.Fatalf("save tenant-b colliding id: %v", err) + } + + loadedA, err := LoadDrainTemplates(db, "tenant-a") + if err != nil { + t.Fatalf("load tenant-a: %v", err) + } + if len(loadedA) != 1 || loadedA[0].ID != 0xA1 || loadedA[0].Tokens[0] != "a" { + t.Errorf("tenant-a load mismatch: %+v", loadedA) + } + + loadedB, err := LoadDrainTemplates(db, "tenant-b") + if err != nil { + t.Fatalf("load tenant-b: %v", err) + } + if len(loadedB) != 2 { + t.Errorf("tenant-b should have two templates (distinct + colliding-id); got %d", len(loadedB)) + } + + loadedC, err := LoadDrainTemplates(db, "tenant-empty") + if err != nil { + t.Fatalf("load tenant-empty: %v", err) + } + if len(loadedC) != 0 { + t.Errorf("tenant-empty should have no templates; got %d", len(loadedC)) + } +} + +// TestGraphRAG_GetInvestigations_TenantScoped seeds investigations under two +// tenants and asserts each tenant only sees its own rows via ctx scoping. +func TestGraphRAG_GetInvestigations_TenantScoped(t *testing.T) { + g, db := newTestGraphRAGWithDB(t) + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("migrate: %v", err) + } + now := time.Now() + rows := []Investigation{ + {TenantID: "acme", ID: "inv-acme-1", CreatedAt: now, Status: "detected", Severity: "critical", TriggerService: "orders"}, + {TenantID: "acme", ID: "inv-acme-2", CreatedAt: now.Add(time.Second), Status: "detected", Severity: "warning", TriggerService: "payments"}, + {TenantID: "globex", ID: "inv-globex-1", CreatedAt: now, Status: "detected", Severity: "critical", TriggerService: "orders"}, + } + for _, r := range rows { + if err := db.Create(&r).Error; err != nil { + t.Fatalf("seed %s: %v", r.ID, err) + } + } + + acmeCtx := storage.WithTenantContext(context.Background(), "acme") + globexCtx := storage.WithTenantContext(context.Background(), "globex") + + acme, err := g.GetInvestigations(acmeCtx, "", "", "", 100) + if err != nil { + t.Fatalf("acme list: %v", err) + } + if len(acme) != 2 { + t.Errorf("acme should see 2 rows; got %d", len(acme)) + } + for _, r := range acme { + if r.TenantID != "acme" { + t.Errorf("acme list leaked %s row: %s", r.TenantID, r.ID) + } + } + + globex, err := g.GetInvestigations(globexCtx, "", "", "", 100) + if err != nil { + t.Fatalf("globex list: %v", err) + } + if len(globex) != 1 || globex[0].ID != "inv-globex-1" { + t.Errorf("globex should see only inv-globex-1; got %+v", globex) + } + + // Cross-tenant ID lookup must miss — id-guessing should not leak. + if _, err := g.GetInvestigation(acmeCtx, "inv-globex-1"); err == nil { + t.Errorf("acme ctx should NOT find globex investigation") + } + got, err := g.GetInvestigation(globexCtx, "inv-globex-1") + if err != nil { + t.Fatalf("globex own lookup: %v", err) + } + if got.TenantID != "globex" { + t.Errorf("expected globex row; got tenant=%q", got.TenantID) + } +} + +// TestGraphRAG_GetGraphSnapshot_TenantScoped seeds two snapshots (one per +// tenant) at the same instant and asserts each tenant only retrieves its own. +func TestGraphRAG_GetGraphSnapshot_TenantScoped(t *testing.T) { + g, db := newTestGraphRAGWithDB(t) + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("migrate: %v", err) + } + now := time.Now().UTC() + for _, tenant := range []string{"acme", "globex"} { + snap := GraphSnapshot{ + TenantID: tenant, + ID: "snap_" + tenant, + CreatedAt: now, + Nodes: []byte(`[]`), + Edges: []byte(`[]`), + ServiceCount: 1, + AvgHealthScore: 1, + } + if err := db.Create(&snap).Error; err != nil { + t.Fatalf("seed %s: %v", tenant, err) + } + } + for _, tenant := range []string{"acme", "globex"} { + ctx := storage.WithTenantContext(context.Background(), tenant) + snap, err := g.GetGraphSnapshot(ctx, now.Add(time.Second)) + if err != nil { + t.Fatalf("get %s: %v", tenant, err) + } + if snap.TenantID != tenant { + t.Errorf("ctx %s returned snapshot for tenant %q", tenant, snap.TenantID) + } + if !strings.HasSuffix(snap.ID, tenant) { + t.Errorf("ctx %s returned snapshot id %s", tenant, snap.ID) + } + } +} diff --git a/internal/graphrag/refresh.go b/internal/graphrag/refresh.go index 43a941e..b16d8bc 100644 --- a/internal/graphrag/refresh.go +++ b/internal/graphrag/refresh.go @@ -77,7 +77,7 @@ func (g *GraphRAG) persistDrainTemplates() { if len(tpls) == 0 { return } - if err := SaveDrainTemplates(g.repo.DB(), tpls); err != nil { + if err := SaveDrainTemplates(g.repo.DB(), storage.DefaultTenantID, tpls); err != nil { slog.Error("Failed to persist drain templates", "error", err) return } diff --git a/internal/graphrag/schema.go b/internal/graphrag/schema.go index 81b0754..fed9e4d 100644 --- a/internal/graphrag/schema.go +++ b/internal/graphrag/schema.go @@ -222,7 +222,13 @@ type RankedCause struct { // standard SQL drivers reject uint64 values with the high bit set, and signed // int64 carries the same 64 bits without loss. Conversion happens in the // persistence helpers. +// +// The primary key is composite (tenant_id, id): the same template tokens can +// legitimately recur across tenants, and we want the cluster ID to stay stable +// per tenant once the in-memory Drain miner is partitioned per-tenant. TenantID +// is declared first so it leads the PK index. type DrainTemplateRow struct { + TenantID string `gorm:"primaryKey;size:64;default:'default';not null" json:"tenant_id"` ID int64 `gorm:"primaryKey;autoIncrement:false" json:"id"` // int64(Template.ID) Tokens string `gorm:"type:text;not null" json:"tokens"` // JSON-encoded []string Count int `json:"count"` diff --git a/internal/graphrag/snapshot.go b/internal/graphrag/snapshot.go index 3a77328..aef160f 100644 --- a/internal/graphrag/snapshot.go +++ b/internal/graphrag/snapshot.go @@ -12,14 +12,14 @@ import ( // GraphSnapshot is a periodic snapshot of the service topology persisted to DB. // -// Tenant identity is not yet a column on this model — Subtask B (RAN-38) adds -// `tenant_id` to the schema along with the persistence-layer filtering. For -// now, snapshots are written one row per tenant per tick and remain queryable -// only across the whole table; callers must not treat pre-Subtask-B rows as -// tenant-scoped. +// TenantID scopes the row to the tenant slice it was captured from. The +// composite (tenant_id, created_at) index supports the +// "most recent snapshot at-or-before T for tenant X" lookup that +// GetGraphSnapshot runs on every read. type GraphSnapshot struct { + TenantID string `gorm:"size:64;default:'default';not null;index:idx_graph_snapshots_tenant_created,priority:1" json:"tenant_id"` ID string `gorm:"primaryKey;size:64" json:"id"` - CreatedAt time.Time `json:"created_at"` + CreatedAt time.Time `gorm:"index:idx_graph_snapshots_tenant_created,priority:2" json:"created_at"` Nodes json.RawMessage `gorm:"type:text" json:"nodes"` Edges json.RawMessage `gorm:"type:text" json:"edges"` ServiceCount int `json:"service_count"` @@ -117,6 +117,7 @@ func (g *GraphRAG) takeSnapshotForTenant(_ context.Context, tenant string, store edgesJSON, _ := json.Marshal(snapEdges) snap := GraphSnapshot{ + TenantID: tenant, ID: fmt.Sprintf("snap_%s_%d", tenant, time.Now().UnixNano()), CreatedAt: time.Now(), Nodes: nodesJSON, @@ -155,12 +156,14 @@ func (g *GraphRAG) pruneOldSnapshots() { } } -// GetGraphSnapshot retrieves the snapshot closest to the requested time. -// TODO(RAN-38, Subtask B): scope by tenant once tenant_id lands on the table. -func (g *GraphRAG) GetGraphSnapshot(at time.Time) (*GraphSnapshot, error) { +// GetGraphSnapshot retrieves the snapshot closest to the requested time, +// scoped to the tenant carried by ctx. The composite (tenant_id, created_at) +// index supports the descending lookup. +func (g *GraphRAG) GetGraphSnapshot(ctx context.Context, at time.Time) (*GraphSnapshot, error) { + tenant := storage.TenantFromContext(ctx) var snap GraphSnapshot err := g.repo.DB(). - Where("created_at <= ?", at). + Where("tenant_id = ? AND created_at <= ?", tenant, at). Order("created_at DESC"). First(&snap).Error if err != nil { diff --git a/internal/mcp/tools.go b/internal/mcp/tools.go index 03a0ffc..7d5b8af 100644 --- a/internal/mcp/tools.go +++ b/internal/mcp/tools.go @@ -320,11 +320,11 @@ func (s *Server) toolHandler(ctx context.Context, name string, args map[string]a case "correlated_signals": return s.toolCorrelatedSignals(ctx, args) case "get_investigations": - return s.toolGetInvestigations(args) + return s.toolGetInvestigations(ctx, args) case "get_investigation": - return s.toolGetInvestigationByID(args) + return s.toolGetInvestigationByID(ctx, args) case "get_graph_snapshot": - return s.toolGetGraphSnapshot(args) + return s.toolGetGraphSnapshot(ctx, args) case "get_anomaly_timeline": return s.toolGetAnomalyTimeline(ctx, args) default: @@ -752,7 +752,7 @@ func (s *Server) toolCorrelatedSignals(ctx context.Context, args map[string]any) return textResult(string(data)) } -func (s *Server) toolGetInvestigations(args map[string]any) ToolCallResult { +func (s *Server) toolGetInvestigations(ctx context.Context, args map[string]any) ToolCallResult { if s.graphRAG == nil { return errorResult("GraphRAG not initialized") } @@ -761,7 +761,7 @@ func (s *Server) toolGetInvestigations(args map[string]any) ToolCallResult { status, _ := args["status"].(string) limit := argInt(args, "limit", 20) - investigations, err := s.graphRAG.GetInvestigations(service, severity, status, limit) + investigations, err := s.graphRAG.GetInvestigations(ctx, service, severity, status, limit) if err != nil { return errorResult(fmt.Sprintf("failed to query investigations: %v", err)) } @@ -772,7 +772,7 @@ func (s *Server) toolGetInvestigations(args map[string]any) ToolCallResult { return textResult(string(data)) } -func (s *Server) toolGetInvestigationByID(args map[string]any) ToolCallResult { +func (s *Server) toolGetInvestigationByID(ctx context.Context, args map[string]any) ToolCallResult { if s.graphRAG == nil { return errorResult("GraphRAG not initialized") } @@ -780,7 +780,7 @@ func (s *Server) toolGetInvestigationByID(args map[string]any) ToolCallResult { if id == "" { return errorResult("investigation_id is required") } - inv, err := s.graphRAG.GetInvestigation(id) + inv, err := s.graphRAG.GetInvestigation(ctx, id) if err != nil { return errorResult(fmt.Sprintf("investigation not found: %v", err)) } @@ -791,7 +791,7 @@ func (s *Server) toolGetInvestigationByID(args map[string]any) ToolCallResult { return textResult(string(data)) } -func (s *Server) toolGetGraphSnapshot(args map[string]any) ToolCallResult { +func (s *Server) toolGetGraphSnapshot(ctx context.Context, args map[string]any) ToolCallResult { if s.graphRAG == nil { return errorResult("GraphRAG not initialized") } @@ -800,7 +800,7 @@ func (s *Server) toolGetGraphSnapshot(args map[string]any) ToolCallResult { if at.IsZero() { at = time.Now() } - snap, err := s.graphRAG.GetGraphSnapshot(at) + snap, err := s.graphRAG.GetGraphSnapshot(ctx, at) if err != nil { return errorResult(fmt.Sprintf("no snapshot found: %v", err)) }