From 62768be8effb8cc1f9b01a59fa101fd06086ffd7 Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Fri, 24 Apr 2026 20:44:16 +0000 Subject: [PATCH 1/5] feat(graphrag): tenant_id column + scoped reads + backfill (RAN-38) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds tenant_id to the three persisted GraphRAG tables (investigations, graph_snapshots, drain_templates), with a one-shot idempotent backfill so existing rows fall back to DefaultTenantID and a driver-specific composite PK promotion for drain_templates on SQLite/PostgreSQL. - schema.go: TenantID first on DrainTemplateRow; PK becomes (tenant_id, id). - investigation.go: TenantID + idx_investigations_tenant_created composite index; cooldown key now scoped by tenant; GetInvestigations / Get-by-id take a context and filter by storage.TenantFromContext. - snapshot.go: TenantID + idx_graph_snapshots_tenant_created; GetGraphSnapshot is ctx-scoped. takeSnapshot continues to fan out per tenant. - drain.go: SaveDrainTemplates / LoadDrainTemplates now take a tenant string; OnConflict targets composite (tenant_id, id) so the same template hash can coexist across tenants for the eventual per-tenant Drain miner. - migrate.go (new): AutoMigrateGraphRAG runs AutoMigrate, idempotent UPDATE backfill, and a SQLite (rename/recreate/copy) + PostgreSQL (DROP/ADD CONSTRAINT) PK promotion path. MySQL/MSSQL skipped with a log. - mcp/tools.go: thread the dispatcher ctx into get_investigations, get_investigation, get_graph_snapshot so the new ctx-aware signatures compile end-to-end. Subtask C (RAN-39) wires the request tenant. - builder.go / refresh.go: Drain Save/Load callers pin storage.DefaultTenantID for the single shared miner. Tests: - TestAutoMigrateGraphRAG_CreatesTenantCompositeIndexes - TestAutoMigrateGraphRAG_DrainTemplatesCompositePK - TestAutoMigrateGraphRAG_IsIdempotent - TestAutoMigrateGraphRAG_BackfillsLegacyRows - TestSaveLoadDrainTemplates_TenantIsolation (collision across tenants) - TestGraphRAG_GetInvestigations_TenantScoped (cross-tenant id-guess miss) - TestGraphRAG_GetGraphSnapshot_TenantScoped - TestCooldownKey_TenantIsolated Verification: go vet ./... and go test ./... — 281 tests pass across 26 packages. Co-Authored-By: Claude Opus 4.7 (1M context) Co-Authored-By: Paperclip --- internal/graphrag/builder.go | 9 +- internal/graphrag/drain.go | 32 +- internal/graphrag/drain_persist_test.go | 15 +- internal/graphrag/investigation.go | 71 +++-- .../graphrag/investigation_cooldown_test.go | 28 +- internal/graphrag/migrate.go | 213 +++++++++++++ internal/graphrag/migrate_test.go | 297 ++++++++++++++++++ internal/graphrag/refresh.go | 2 +- internal/graphrag/schema.go | 6 + internal/graphrag/snapshot.go | 23 +- internal/mcp/tools.go | 18 +- 11 files changed, 639 insertions(+), 75 deletions(-) create mode 100644 internal/graphrag/migrate.go create mode 100644 internal/graphrag/migrate_test.go diff --git a/internal/graphrag/builder.go b/internal/graphrag/builder.go index 143842b..5653463 100644 --- a/internal/graphrag/builder.go +++ b/internal/graphrag/builder.go @@ -238,8 +238,13 @@ func New(repo *storage.Repository, vectorIdx *vectordb.Index, tsdbAgg *tsdb.Aggr // Restore persisted Drain templates so log clustering survives restarts. // A missing table (fresh install) or transient DB error is non-fatal — // ingestion will rebuild templates from scratch. + // + // The Drain miner is currently a single shared instance, so we treat its + // learned templates as belonging to DefaultTenantID. The persistence layer + // is already keyed by (tenant_id, id) so a future per-tenant Drain miner + // can load each tenant's slice without colliding cluster IDs. if repo != nil && repo.DB() != nil { - if tpls, err := LoadDrainTemplates(repo.DB()); err != nil { + if tpls, err := LoadDrainTemplates(repo.DB(), storage.DefaultTenantID); err != nil { slog.Info("GraphRAG: drain template restore skipped", "reason", err) } else if len(tpls) > 0 { g.drain.LoadTemplates(tpls) @@ -294,7 +299,7 @@ func (g *GraphRAG) Stop() { // Best-effort final Drain template persistence — losing the most recent // updates on an unclean shutdown would force rebuilding from scratch. if g.repo != nil && g.repo.DB() != nil && g.drain != nil { - if err := SaveDrainTemplates(g.repo.DB(), g.drain.Templates()); err != nil { + if err := SaveDrainTemplates(g.repo.DB(), storage.DefaultTenantID, g.drain.Templates()); err != nil { slog.Warn("GraphRAG: final drain template save failed", "error", err) } } diff --git a/internal/graphrag/drain.go b/internal/graphrag/drain.go index c357ee3..90ad1ac 100644 --- a/internal/graphrag/drain.go +++ b/internal/graphrag/drain.go @@ -33,6 +33,7 @@ import ( "sync" "time" + "github.com/RandomCodeSpace/otelcontext/internal/storage" "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -504,13 +505,20 @@ func (d *Drain) Len() int { // --- Persistence --- // SaveDrainTemplates upserts the given templates into the drain_templates -// table. Tokens are JSON-encoded. Uses GORM's clause.OnConflict upsert which -// is dialect-aware (ON CONFLICT for SQLite/PostgreSQL, ON DUPLICATE KEY -// UPDATE for MySQL). -func SaveDrainTemplates(db *gorm.DB, templates []Template) error { +// table under the supplied tenant. Tokens are JSON-encoded. Uses GORM's +// clause.OnConflict upsert which is dialect-aware (ON CONFLICT for SQLite/ +// PostgreSQL, ON DUPLICATE KEY UPDATE for MySQL). +// +// The conflict target is the composite (tenant_id, id) primary key so the +// same Drain template hash can coexist across tenants — a future per-tenant +// Drain miner can rely on this to keep cluster IDs stable per tenant. +func SaveDrainTemplates(db *gorm.DB, tenant string, templates []Template) error { if db == nil || len(templates) == 0 { return nil } + if tenant == "" { + tenant = storage.DefaultTenantID + } rows := make([]DrainTemplateRow, 0, len(templates)) for _, t := range templates { tokensJSON, err := json.Marshal(t.Tokens) @@ -518,6 +526,7 @@ func SaveDrainTemplates(db *gorm.DB, templates []Template) error { return fmt.Errorf("marshal drain tokens id=%d: %w", t.ID, err) } rows = append(rows, DrainTemplateRow{ + TenantID: tenant, ID: int64(t.ID), //nolint:gosec // intentional bit-reinterpret of FNV-64 hash for DB portability Tokens: string(tokensJSON), Count: t.Count, @@ -527,22 +536,25 @@ func SaveDrainTemplates(db *gorm.DB, templates []Template) error { }) } return db.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "id"}}, + Columns: []clause.Column{{Name: "tenant_id"}, {Name: "id"}}, DoUpdates: clause.AssignmentColumns([]string{ "tokens", "count", "last_seen", "sample", }), }).CreateInBatches(&rows, 500).Error } -// LoadDrainTemplates reads all persisted Drain templates from the DB and -// returns them in a format ready to pass to Drain.LoadTemplates. Returns an -// empty slice (and nil error) if the table is empty. -func LoadDrainTemplates(db *gorm.DB) ([]Template, error) { +// LoadDrainTemplates reads persisted Drain templates for the supplied tenant +// and returns them in a format ready to pass to Drain.LoadTemplates. Returns +// an empty slice (and nil error) if no rows match. +func LoadDrainTemplates(db *gorm.DB, tenant string) ([]Template, error) { if db == nil { return nil, nil } + if tenant == "" { + tenant = storage.DefaultTenantID + } var rows []DrainTemplateRow - if err := db.Find(&rows).Error; err != nil { + if err := db.Where("tenant_id = ?", tenant).Find(&rows).Error; err != nil { return nil, err } out := make([]Template, 0, len(rows)) diff --git a/internal/graphrag/drain_persist_test.go b/internal/graphrag/drain_persist_test.go index dd573c5..49a8633 100644 --- a/internal/graphrag/drain_persist_test.go +++ b/internal/graphrag/drain_persist_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/RandomCodeSpace/otelcontext/internal/storage" "github.com/glebarez/sqlite" "gorm.io/gorm" ) @@ -50,12 +51,12 @@ func TestDrainPersistence_RoundTrip(t *testing.T) { } // Persist. - if err := SaveDrainTemplates(db, d1.Templates()); err != nil { + if err := SaveDrainTemplates(db, storage.DefaultTenantID, d1.Templates()); err != nil { t.Fatalf("SaveDrainTemplates: %v", err) } // Reload. - loaded, err := LoadDrainTemplates(db) + loaded, err := LoadDrainTemplates(db, storage.DefaultTenantID) if err != nil { t.Fatalf("LoadDrainTemplates: %v", err) } @@ -91,7 +92,7 @@ func TestDrainPersistence_Upsert(t *testing.T) { for i := 0; i < 10; i++ { d.Match(fmt.Sprintf("upsert kind %c event", 'a'+byte(i)), t0) } - if err := SaveDrainTemplates(db, d.Templates()); err != nil { + if err := SaveDrainTemplates(db, storage.DefaultTenantID, d.Templates()); err != nil { t.Fatalf("first save: %v", err) } @@ -108,7 +109,7 @@ func TestDrainPersistence_Upsert(t *testing.T) { for i := 0; i < 10; i++ { d.Match(fmt.Sprintf("upsert kind %c event", 'a'+byte(i)), t1) } - if err := SaveDrainTemplates(db, d.Templates()); err != nil { + if err := SaveDrainTemplates(db, storage.DefaultTenantID, d.Templates()); err != nil { t.Fatalf("second save: %v", err) } @@ -139,7 +140,7 @@ func TestDrainPersistence_Upsert(t *testing.T) { // returns an empty slice and nil error (fresh-install path). func TestDrainPersistence_EmptyDB(t *testing.T) { db := newTestDrainDB(t) - tpls, err := LoadDrainTemplates(db) + tpls, err := LoadDrainTemplates(db, storage.DefaultTenantID) if err != nil { t.Fatalf("LoadDrainTemplates on empty table: %v", err) } @@ -148,7 +149,7 @@ func TestDrainPersistence_EmptyDB(t *testing.T) { } // Calling with nil DB is also safe. - tpls, err = LoadDrainTemplates(nil) + tpls, err = LoadDrainTemplates(nil, storage.DefaultTenantID) if err != nil { t.Fatalf("LoadDrainTemplates(nil): %v", err) } @@ -157,7 +158,7 @@ func TestDrainPersistence_EmptyDB(t *testing.T) { } // Saving an empty slice is a no-op (no error, no rows). - if err := SaveDrainTemplates(db, nil); err != nil { + if err := SaveDrainTemplates(db, storage.DefaultTenantID, nil); err != nil { t.Fatalf("SaveDrainTemplates(nil): %v", err) } var cnt int64 diff --git a/internal/graphrag/investigation.go b/internal/graphrag/investigation.go index e3dbd31..7aa45d8 100644 --- a/internal/graphrag/investigation.go +++ b/internal/graphrag/investigation.go @@ -10,7 +10,6 @@ import ( "time" "github.com/RandomCodeSpace/otelcontext/internal/storage" - "gorm.io/gorm" ) // investigationCooldown suppresses repeated PersistInvestigation calls with @@ -43,13 +42,14 @@ func (c *investigationCooldown) allow(key string, now time.Time) bool { } // cooldownKey builds a case- and whitespace-insensitive key from the tuple -// (trigger_service, root_service, root_operation). Service names emitted -// from different instrumentations occasionally differ in casing or have -// trailing whitespace; canonicalizing here prevents those variants from -// bypassing the cooldown guard. -func cooldownKey(triggerService, rootService, rootOperation string) string { +// (tenant, trigger_service, root_service, root_operation). Tenant scopes the +// guard so an error in tenant-A doesn't suppress the same error pattern in +// tenant-B. Service names emitted from different instrumentations occasionally +// differ in casing or have trailing whitespace; canonicalizing here prevents +// those variants from bypassing the cooldown guard. +func cooldownKey(tenant, triggerService, rootService, rootOperation string) string { norm := func(s string) string { return strings.ToLower(strings.TrimSpace(s)) } - return norm(triggerService) + "|" + norm(rootService) + "|" + norm(rootOperation) + return norm(tenant) + "|" + norm(triggerService) + "|" + norm(rootService) + "|" + norm(rootOperation) } // prune drops entries older than cutoff to bound map size. Called from @@ -65,9 +65,14 @@ func (c *investigationCooldown) prune(cutoff time.Time) { } // Investigation is a persisted record of an automated error investigation. +// +// TenantID scopes the row to its originating tenant. The composite +// (tenant_id, created_at) index supports the recency-ordered "investigations +// for tenant X" query that GetInvestigations runs on every read. type Investigation struct { + TenantID string `gorm:"size:64;default:'default';not null;index:idx_investigations_tenant_created,priority:1" json:"tenant_id"` ID string `gorm:"primaryKey;size:64" json:"id"` - CreatedAt time.Time `json:"created_at"` + CreatedAt time.Time `gorm:"index:idx_investigations_tenant_created,priority:2" json:"created_at"` Status string `gorm:"size:20" json:"status"` // detected, triaged, resolved Severity string `gorm:"size:20" json:"severity"` // critical, warning, info TriggerService string `gorm:"size:255;index" json:"trigger_service"` @@ -88,20 +93,17 @@ func (Investigation) TableName() string { return "investigations" } -// AutoMigrateGraphRAG runs GORM auto-migration for GraphRAG models. -func AutoMigrateGraphRAG(db *gorm.DB) error { - return db.AutoMigrate(&Investigation{}, &GraphSnapshot{}, &DrainTemplateRow{}) -} - // PersistInvestigation saves an investigation record from an error chain // analysis. Tenant is accepted explicitly so the caller (the per-tenant -// anomaly loop) can re-enter ImpactAnalysis on the correct tenant slice. The -// `investigations` table itself does not yet carry a tenant_id column — -// Subtask B (RAN-38) owns that migration. +// anomaly loop) can re-enter ImpactAnalysis on the correct tenant slice and so +// the persisted row carries its originating tenant_id. func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains []ErrorChainResult, anomalies []*AnomalyNode) { if len(chains) == 0 { return } + if tenant == "" { + tenant = storage.DefaultTenantID + } firstChain := chains[0] if firstChain.RootCause == nil { @@ -111,10 +113,12 @@ func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains [] now := time.Now() // Cooldown: suppress repeat investigations for the same - // (trigger_service, root_service, root_operation) inside a sliding window. - // Keys are canonicalized (lower + trim) so "Orders" and "orders " share a - // bucket — otherwise trivial casing differences would bypass the guard. - key := cooldownKey(triggerService, firstChain.RootCause.Service, firstChain.RootCause.Operation) + // (tenant, trigger_service, root_service, root_operation) inside a sliding + // window. Keys are canonicalized (lower + trim) so "Orders" and "orders " + // share a bucket — otherwise trivial casing differences would bypass the + // guard. Tenant scoping prevents an error in one tenant from suppressing + // the same pattern in another. + key := cooldownKey(tenant, triggerService, firstChain.RootCause.Service, firstChain.RootCause.Operation) if g.invCooldown != nil && !g.invCooldown.allow(key, now) { return } @@ -180,6 +184,7 @@ func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains [] } inv := Investigation{ + TenantID: tenant, ID: id, CreatedAt: now, Status: "detected", @@ -202,15 +207,17 @@ func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains [] return } if err := g.repo.DB().Create(&inv).Error; err != nil { - slog.Error("Failed to persist investigation", "error", err) + slog.Error("Failed to persist investigation", "tenant", tenant, "error", err) return } - slog.Info("Investigation persisted", "id", id, "service", triggerService, "severity", severity) + slog.Info("Investigation persisted", "id", id, "tenant", tenant, "service", triggerService, "severity", severity) } -// GetInvestigations queries persisted investigations. -func (g *GraphRAG) GetInvestigations(service, severity, status string, limit int) ([]Investigation, error) { +// GetInvestigations queries persisted investigations scoped to the tenant +// carried by ctx. The composite (tenant_id, created_at) index supports the +// recency-ordered scan. +func (g *GraphRAG) GetInvestigations(ctx context.Context, service, severity, status string, limit int) ([]Investigation, error) { if limit <= 0 { limit = 20 } @@ -218,7 +225,12 @@ func (g *GraphRAG) GetInvestigations(service, severity, status string, limit int limit = 100 } - db := g.repo.DB().Model(&Investigation{}).Order("created_at DESC").Limit(limit) + tenant := storage.TenantFromContext(ctx) + db := g.repo.DB(). + Model(&Investigation{}). + Where("tenant_id = ?", tenant). + Order("created_at DESC"). + Limit(limit) if service != "" { db = db.Where("trigger_service = ? OR root_service = ?", service, service) } @@ -236,10 +248,13 @@ func (g *GraphRAG) GetInvestigations(service, severity, status string, limit int return investigations, nil } -// GetInvestigation retrieves a single investigation by ID. -func (g *GraphRAG) GetInvestigation(id string) (*Investigation, error) { +// GetInvestigation retrieves a single investigation by ID, scoped to the +// tenant carried by ctx. Returning ErrRecordNotFound for cross-tenant lookups +// prevents id-guessing from leaking another tenant's row. +func (g *GraphRAG) GetInvestigation(ctx context.Context, id string) (*Investigation, error) { + tenant := storage.TenantFromContext(ctx) var inv Investigation - if err := g.repo.DB().Where("id = ?", id).First(&inv).Error; err != nil { + if err := g.repo.DB().Where("tenant_id = ? AND id = ?", tenant, id).First(&inv).Error; err != nil { return nil, err } return &inv, nil diff --git a/internal/graphrag/investigation_cooldown_test.go b/internal/graphrag/investigation_cooldown_test.go index 1e36eeb..fdfce6f 100644 --- a/internal/graphrag/investigation_cooldown_test.go +++ b/internal/graphrag/investigation_cooldown_test.go @@ -46,18 +46,30 @@ func TestPersistInvestigation_Cooldown(t *testing.T) { } // TestCooldownKey_Canonical verifies the key normalizes case and trims -// whitespace so "Orders" / "orders " / "ORDERS" land in the same bucket. +// whitespace so "Orders" / "orders " / "ORDERS" land in the same bucket +// within a tenant. func TestCooldownKey_Canonical(t *testing.T) { - cases := [][3]string{ - {"orders", "orders", "op"}, - {"Orders", "ORDERS", "op"}, - {" orders ", "orders", " op "}, - {"ORDERS", "Orders ", "OP"}, + cases := [][4]string{ + {"acme", "orders", "orders", "op"}, + {"Acme", "Orders", "ORDERS", "op"}, + {" acme ", " orders ", "orders", " op "}, + {"ACME", "ORDERS", "Orders ", "OP"}, } - want := cooldownKey(cases[0][0], cases[0][1], cases[0][2]) + want := cooldownKey(cases[0][0], cases[0][1], cases[0][2], cases[0][3]) for _, c := range cases[1:] { - if got := cooldownKey(c[0], c[1], c[2]); got != want { + if got := cooldownKey(c[0], c[1], c[2], c[3]); got != want { t.Errorf("cooldownKey%v = %q, want %q", c, got, want) } } } + +// TestCooldownKey_TenantIsolated asserts that two tenants emitting the same +// (trigger, root, op) tuple produce distinct cooldown keys, so an error in +// tenant-A doesn't suppress the same pattern in tenant-B. +func TestCooldownKey_TenantIsolated(t *testing.T) { + a := cooldownKey("tenant-a", "orders", "orders", "op") + b := cooldownKey("tenant-b", "orders", "orders", "op") + if a == b { + t.Fatalf("tenant scoping missing: tenant-a and tenant-b share key %q", a) + } +} diff --git a/internal/graphrag/migrate.go b/internal/graphrag/migrate.go new file mode 100644 index 0000000..133593c --- /dev/null +++ b/internal/graphrag/migrate.go @@ -0,0 +1,213 @@ +package graphrag + +// GraphRAG persistence migrations. +// +// AutoMigrateGraphRAG runs the standard GORM AutoMigrate over the three +// persisted GraphRAG models, then performs two idempotent post-migration +// passes that prepare older databases for tenant-scoped reads: +// +// 1. backfillTenantIDs — sets tenant_id = DefaultTenantID on rows that pre-date +// the column being added (or that somehow ended up empty). New rows always +// receive the column default at insert time. +// +// 2. ensureDrainTemplatesCompositePK — promotes drain_templates' single-column +// primary key (id) to the composite (tenant_id, id) when an older schema +// is detected. The composite PK matters because the same Drain template +// hash can legitimately recur across tenants once the in-memory miner is +// partitioned per tenant; without it the second tenant's row would collide +// on insert. +// +// Both passes are safe to call repeatedly. SQLite and PostgreSQL are +// supported explicitly; other dialects skip the PK promotion and log so an +// operator can apply the equivalent DDL by hand. + +import ( + "errors" + "fmt" + "log/slog" + + "github.com/RandomCodeSpace/otelcontext/internal/storage" + "gorm.io/gorm" +) + +// graphRAGTables are the three persisted tables that carry tenant_id after +// RAN-38. Order matches AutoMigrate order so log lines line up. +var graphRAGTables = []string{"investigations", "graph_snapshots", "drain_templates"} + +// AutoMigrateGraphRAG runs GORM auto-migration for GraphRAG models and +// applies tenant backfill + drain_templates composite-PK promotion. Safe to +// call repeatedly. +func AutoMigrateGraphRAG(db *gorm.DB) error { + if db == nil { + return nil + } + if err := db.AutoMigrate(&Investigation{}, &GraphSnapshot{}, &DrainTemplateRow{}); err != nil { + return fmt.Errorf("graphrag automigrate: %w", err) + } + if err := backfillTenantIDs(db); err != nil { + return fmt.Errorf("graphrag tenant backfill: %w", err) + } + if err := ensureDrainTemplatesCompositePK(db); err != nil { + // Non-fatal: new writes still carry tenant_id; only existing rows + // retain the legacy single-column PK. Surfaced as a warn so an + // operator can intervene on dialects we don't auto-migrate. + slog.Warn("graphrag: drain_templates composite PK promotion skipped", "error", err) + } + return nil +} + +// backfillTenantIDs sets tenant_id = DefaultTenantID for any row in the three +// GraphRAG tables that ended up with NULL or empty tenant_id. AutoMigrate +// already supplies a column default for new inserts; this pass covers the +// "added the column to a populated table" path on dialects that don't +// retroactively backfill via DEFAULT (and serves as belt-and-braces on those +// that do). +func backfillTenantIDs(db *gorm.DB) error { + for _, tbl := range graphRAGTables { + // Only touch tables that exist — fresh installs may race with a + // half-migrated DB on first boot. HasTable is dialect-aware and + // returns false rather than erroring on missing tables. + if !db.Migrator().HasTable(tbl) { + continue + } + sql := fmt.Sprintf(`UPDATE %s SET tenant_id = ? WHERE tenant_id IS NULL OR tenant_id = ''`, tbl) //nolint:gosec // table name is from a fixed allow-list + if err := db.Exec(sql, storage.DefaultTenantID).Error; err != nil { + return fmt.Errorf("backfill %s: %w", tbl, err) + } + } + return nil +} + +// ensureDrainTemplatesCompositePK promotes drain_templates' primary key from +// (id) to (tenant_id, id) on existing databases. Fresh installs already get +// the composite PK from AutoMigrate. On dialects this function does not +// understand it returns nil so AutoMigrateGraphRAG can move on. +func ensureDrainTemplatesCompositePK(db *gorm.DB) error { + if db == nil || !db.Migrator().HasTable("drain_templates") { + return nil + } + switch db.Dialector.Name() { + case "sqlite": + return ensureDrainPKSQLite(db) + case "postgres": + return ensureDrainPKPostgres(db) + default: + // MySQL / MSSQL aren't covered by this ticket. Log so an operator + // running those dialects can apply the equivalent DDL by hand. + slog.Info("graphrag: drain_templates PK promotion skipped — unsupported dialect", + "dialect", db.Dialector.Name()) + return nil + } +} + +// ensureDrainPKSQLite uses PRAGMA table_info to detect whether tenant_id is +// already part of the primary key. If not, it rebuilds the table via the +// canonical SQLite recipe (rename → create → copy → drop) inside a +// transaction. CreateTable here mirrors whatever GORM would build for a fresh +// install, so we don't have to hand-maintain a parallel CREATE TABLE. +func ensureDrainPKSQLite(db *gorm.DB) error { + type pragmaCol struct { + Cid int `gorm:"column:cid"` + Name string `gorm:"column:name"` + Type string `gorm:"column:type"` + Notnull int `gorm:"column:notnull"` + DfltValue any `gorm:"column:dflt_value"` + Pk int `gorm:"column:pk"` + } + var cols []pragmaCol + if err := db.Raw(`PRAGMA table_info('drain_templates')`).Scan(&cols).Error; err != nil { + return fmt.Errorf("pragma table_info: %w", err) + } + if len(cols) == 0 { + // Table doesn't exist; AutoMigrate would have created it with the + // composite PK already. Nothing to do. + return nil + } + pkCols := map[string]bool{} + for _, c := range cols { + if c.Pk > 0 { + pkCols[c.Name] = true + } + } + if pkCols["tenant_id"] && pkCols["id"] { + return nil // already composite + } + + return db.Transaction(func(tx *gorm.DB) error { + // Drop any leftover scratch from an interrupted earlier attempt. + if err := tx.Exec(`DROP TABLE IF EXISTS drain_templates__legacy`).Error; err != nil { + return fmt.Errorf("drop scratch: %w", err) + } + if err := tx.Exec(`ALTER TABLE drain_templates RENAME TO drain_templates__legacy`).Error; err != nil { + return fmt.Errorf("rename legacy: %w", err) + } + // Recreate with the current GORM schema (composite PK + indexes). + if err := tx.Migrator().CreateTable(&DrainTemplateRow{}); err != nil { + return fmt.Errorf("create new table: %w", err) + } + // Copy rows over, defaulting tenant_id where the legacy table + // didn't carry one (the column may have been added by AutoMigrate + // before this pass ran). + insert := `INSERT INTO drain_templates (tenant_id, id, tokens, count, first_seen, last_seen, sample) +SELECT COALESCE(NULLIF(tenant_id, ''), ?), id, tokens, count, first_seen, last_seen, sample +FROM drain_templates__legacy` + if err := tx.Exec(insert, storage.DefaultTenantID).Error; err != nil { + // Older schemas may not have the tenant_id column at all — fall + // back to a tenant-less SELECT, defaulting every row. + insertFallback := `INSERT INTO drain_templates (tenant_id, id, tokens, count, first_seen, last_seen, sample) +SELECT ?, id, tokens, count, first_seen, last_seen, sample +FROM drain_templates__legacy` + if err2 := tx.Exec(insertFallback, storage.DefaultTenantID).Error; err2 != nil { + return fmt.Errorf("copy rows: %w (fallback: %w)", err, err2) + } + } + if err := tx.Exec(`DROP TABLE drain_templates__legacy`).Error; err != nil { + return fmt.Errorf("drop legacy: %w", err) + } + slog.Info("graphrag: promoted drain_templates PK to (tenant_id, id) on SQLite") + return nil + }) +} + +// ensureDrainPKPostgres queries pg_index for the current PK column set and, +// when it doesn't already include tenant_id, drops the implicit +// drain_templates_pkey constraint and recreates it as a composite PK on +// (tenant_id, id). DROP/ADD runs inside one statement so the table is never +// without a PK between steps. +func ensureDrainPKPostgres(db *gorm.DB) error { + var pkCols []string + err := db.Raw(` + SELECT a.attname::text + FROM pg_index i + JOIN pg_attribute a + ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) + WHERE i.indrelid = 'drain_templates'::regclass + AND i.indisprimary + `).Scan(&pkCols).Error + if err != nil { + // Most likely the table doesn't exist yet — nothing to migrate. + return nil + } + hasTenant, hasID := false, false + for _, c := range pkCols { + switch c { + case "tenant_id": + hasTenant = true + case "id": + hasID = true + } + } + if hasTenant && hasID { + return nil + } + if !hasID { + // Defensive: an alien schema without `id` in the PK is not something + // this migration should silently overwrite. + return errors.New("drain_templates primary key has unexpected shape; manual migration required") + } + if err := db.Exec(`ALTER TABLE drain_templates DROP CONSTRAINT IF EXISTS drain_templates_pkey, ADD PRIMARY KEY (tenant_id, id)`).Error; err != nil { + return fmt.Errorf("alter pk: %w", err) + } + slog.Info("graphrag: promoted drain_templates PK to (tenant_id, id) on PostgreSQL") + return nil +} diff --git a/internal/graphrag/migrate_test.go b/internal/graphrag/migrate_test.go new file mode 100644 index 0000000..2cc5df8 --- /dev/null +++ b/internal/graphrag/migrate_test.go @@ -0,0 +1,297 @@ +package graphrag + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/RandomCodeSpace/otelcontext/internal/storage" + "github.com/glebarez/sqlite" + "gorm.io/gorm" +) + +// newTestGraphRAGDB stands up an in-memory SQLite DB ready for the GraphRAG +// migrations. Local helper so the migrate tests don't depend on storage's +// _test-only fixtures. +func newTestGraphRAGDB(t *testing.T) *gorm.DB { + t.Helper() + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + return db +} + +// newTestGraphRAGWithDB returns a GraphRAG wired to a real (in-memory SQLite) +// repo so tenant-scoped read tests can exercise the actual GORM query path. +// The returned *gorm.DB is the same handle the GraphRAG uses, suitable for +// seeding rows directly. +func newTestGraphRAGWithDB(t *testing.T) (*GraphRAG, *gorm.DB) { + t.Helper() + db := newTestGraphRAGDB(t) + repo := storage.NewRepositoryFromDB(db, "sqlite") + g := New(repo, nil, nil, nil, DefaultConfig()) + t.Cleanup(func() { g.Stop() }) + return g, db +} + +// TestAutoMigrateGraphRAG_CreatesTenantCompositeIndexes asserts that the +// composite indexes declared on the three persisted GraphRAG models are +// actually materialised on SQLite. Mirrors +// storage.TestAutoMigrate_CreatesTenantCompositeIndexes for the GraphRAG side. +func TestAutoMigrateGraphRAG_CreatesTenantCompositeIndexes(t *testing.T) { + db := newTestGraphRAGDB(t) + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("AutoMigrateGraphRAG: %v", err) + } + expected := []struct { + table string + index string + }{ + {"investigations", "idx_investigations_tenant_created"}, + {"graph_snapshots", "idx_graph_snapshots_tenant_created"}, + } + for _, tc := range expected { + var count int + if err := db.Raw( + "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND tbl_name=? AND name=?", + tc.table, tc.index, + ).Scan(&count).Error; err != nil { + t.Fatalf("sqlite_master query for %s.%s: %v", tc.table, tc.index, err) + } + if count != 1 { + t.Errorf("expected composite index %s on table %s (count=%d)", tc.index, tc.table, count) + } + } +} + +// TestAutoMigrateGraphRAG_DrainTemplatesCompositePK asserts the PK on +// drain_templates is the composite (tenant_id, id) on a fresh install. +// Without this the same Drain template hash from two tenants would collide +// once a per-tenant Drain miner ships. +func TestAutoMigrateGraphRAG_DrainTemplatesCompositePK(t *testing.T) { + db := newTestGraphRAGDB(t) + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("AutoMigrateGraphRAG: %v", err) + } + type col struct { + Name string `gorm:"column:name"` + Pk int `gorm:"column:pk"` + } + var cols []col + if err := db.Raw(`PRAGMA table_info('drain_templates')`).Scan(&cols).Error; err != nil { + t.Fatalf("pragma table_info: %v", err) + } + pk := map[string]bool{} + for _, c := range cols { + if c.Pk > 0 { + pk[c.Name] = true + } + } + if !pk["tenant_id"] || !pk["id"] { + t.Fatalf("drain_templates PK should be composite (tenant_id, id); got %+v", pk) + } +} + +// TestAutoMigrateGraphRAG_IsIdempotent calls AutoMigrateGraphRAG repeatedly +// to assert the backfill UPDATEs and PK-promotion path are safe to re-run. +func TestAutoMigrateGraphRAG_IsIdempotent(t *testing.T) { + db := newTestGraphRAGDB(t) + for i := 0; i < 3; i++ { + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("AutoMigrateGraphRAG pass %d: %v", i, err) + } + } +} + +// TestAutoMigrateGraphRAG_BackfillsLegacyRows pre-populates the three +// GraphRAG tables with rows that have empty tenant_id and asserts the backfill +// pass fills DefaultTenantID on every one. Mimics the upgrade path for an +// existing install whose first boot brings in the tenant_id column. +func TestAutoMigrateGraphRAG_BackfillsLegacyRows(t *testing.T) { + db := newTestGraphRAGDB(t) + // Run once so the tables exist with the new schema. + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("first migrate: %v", err) + } + // Insert rows with empty tenant_id directly via raw SQL — Investigation, + // GraphSnapshot and DrainTemplateRow's GORM defaults would otherwise fill + // the column on insert. + now := time.Now().UTC() + if err := db.Exec(`INSERT INTO investigations (tenant_id, id, created_at, status, severity, trigger_service, trigger_operation, error_message, root_service, root_operation, causal_chain, trace_ids, error_logs, anomalous_metrics, affected_services, span_chain) VALUES ('', 'inv_legacy', ?, 'detected', 'warning', 'svc', 'op', 'boom', 'svc', 'op', '[]', '[]', '[]', '[]', '[]', '[]')`, now).Error; err != nil { + t.Fatalf("seed legacy investigation: %v", err) + } + if err := db.Exec(`INSERT INTO graph_snapshots (tenant_id, id, created_at, nodes, edges, service_count, total_calls, avg_health_score) VALUES ('', 'snap_legacy', ?, '[]', '[]', 0, 0, 0)`, now).Error; err != nil { + t.Fatalf("seed legacy snapshot: %v", err) + } + // Drain rows: tenant_id is part of the PK so we must give it *something* + // — empty string is allowed by SQLite. The backfill is expected to fix it. + if err := db.Exec(`INSERT INTO drain_templates (tenant_id, id, tokens, count, first_seen, last_seen, sample) VALUES ('', 1, '["a","b"]', 1, ?, ?, 'sample')`, now, now).Error; err != nil { + t.Fatalf("seed legacy drain row: %v", err) + } + + // Re-run migration; backfill should populate every empty tenant_id. + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("second migrate: %v", err) + } + + for _, tbl := range graphRAGTables { + var stragglers int + if err := db.Raw(`SELECT COUNT(*) FROM `+tbl+` WHERE tenant_id IS NULL OR tenant_id = ''`).Scan(&stragglers).Error; err != nil { + t.Fatalf("count empty tenant in %s: %v", tbl, err) + } + if stragglers != 0 { + t.Errorf("%s: %d rows still have empty tenant_id after backfill", tbl, stragglers) + } + var defaults int + if err := db.Raw(`SELECT COUNT(*) FROM `+tbl+` WHERE tenant_id = ?`, storage.DefaultTenantID).Scan(&defaults).Error; err != nil { + t.Fatalf("count default tenant in %s: %v", tbl, err) + } + if defaults < 1 { + t.Errorf("%s: backfill produced no DefaultTenantID rows", tbl) + } + } +} + +// TestSaveLoadDrainTemplates_TenantIsolation asserts that Save with one tenant +// and Load with another returns nothing — the per-tenant scoping is enforced +// at the DB layer, not just at the in-memory miner. +func TestSaveLoadDrainTemplates_TenantIsolation(t *testing.T) { + db := newTestGraphRAGDB(t) + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("migrate: %v", err) + } + tplsA := []Template{{ID: 0xA1, Tokens: []string{"a", "b"}, Count: 1, FirstSeen: time.Now(), LastSeen: time.Now()}} + tplsB := []Template{{ID: 0xB2, Tokens: []string{"c", "d"}, Count: 1, FirstSeen: time.Now(), LastSeen: time.Now()}} + // Identical hash across tenants is the interesting case — confirms the + // composite PK lets the same template ID coexist in two tenants. + tplsBSameID := []Template{{ID: 0xA1, Tokens: []string{"x", "y"}, Count: 1, FirstSeen: time.Now(), LastSeen: time.Now()}} + + if err := SaveDrainTemplates(db, "tenant-a", tplsA); err != nil { + t.Fatalf("save tenant-a: %v", err) + } + if err := SaveDrainTemplates(db, "tenant-b", tplsB); err != nil { + t.Fatalf("save tenant-b: %v", err) + } + if err := SaveDrainTemplates(db, "tenant-b", tplsBSameID); err != nil { + t.Fatalf("save tenant-b colliding id: %v", err) + } + + loadedA, err := LoadDrainTemplates(db, "tenant-a") + if err != nil { + t.Fatalf("load tenant-a: %v", err) + } + if len(loadedA) != 1 || loadedA[0].ID != 0xA1 || loadedA[0].Tokens[0] != "a" { + t.Errorf("tenant-a load mismatch: %+v", loadedA) + } + + loadedB, err := LoadDrainTemplates(db, "tenant-b") + if err != nil { + t.Fatalf("load tenant-b: %v", err) + } + if len(loadedB) != 2 { + t.Errorf("tenant-b should have two templates (distinct + colliding-id); got %d", len(loadedB)) + } + + loadedC, err := LoadDrainTemplates(db, "tenant-empty") + if err != nil { + t.Fatalf("load tenant-empty: %v", err) + } + if len(loadedC) != 0 { + t.Errorf("tenant-empty should have no templates; got %d", len(loadedC)) + } +} + +// TestGraphRAG_GetInvestigations_TenantScoped seeds investigations under two +// tenants and asserts each tenant only sees its own rows via ctx scoping. +func TestGraphRAG_GetInvestigations_TenantScoped(t *testing.T) { + g, db := newTestGraphRAGWithDB(t) + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("migrate: %v", err) + } + now := time.Now() + rows := []Investigation{ + {TenantID: "acme", ID: "inv-acme-1", CreatedAt: now, Status: "detected", Severity: "critical", TriggerService: "orders"}, + {TenantID: "acme", ID: "inv-acme-2", CreatedAt: now.Add(time.Second), Status: "detected", Severity: "warning", TriggerService: "payments"}, + {TenantID: "globex", ID: "inv-globex-1", CreatedAt: now, Status: "detected", Severity: "critical", TriggerService: "orders"}, + } + for _, r := range rows { + if err := db.Create(&r).Error; err != nil { + t.Fatalf("seed %s: %v", r.ID, err) + } + } + + acmeCtx := storage.WithTenantContext(context.Background(), "acme") + globexCtx := storage.WithTenantContext(context.Background(), "globex") + + acme, err := g.GetInvestigations(acmeCtx, "", "", "", 100) + if err != nil { + t.Fatalf("acme list: %v", err) + } + if len(acme) != 2 { + t.Errorf("acme should see 2 rows; got %d", len(acme)) + } + for _, r := range acme { + if r.TenantID != "acme" { + t.Errorf("acme list leaked %s row: %s", r.TenantID, r.ID) + } + } + + globex, err := g.GetInvestigations(globexCtx, "", "", "", 100) + if err != nil { + t.Fatalf("globex list: %v", err) + } + if len(globex) != 1 || globex[0].ID != "inv-globex-1" { + t.Errorf("globex should see only inv-globex-1; got %+v", globex) + } + + // Cross-tenant ID lookup must miss — id-guessing should not leak. + if _, err := g.GetInvestigation(acmeCtx, "inv-globex-1"); err == nil { + t.Errorf("acme ctx should NOT find globex investigation") + } + got, err := g.GetInvestigation(globexCtx, "inv-globex-1") + if err != nil { + t.Fatalf("globex own lookup: %v", err) + } + if got.TenantID != "globex" { + t.Errorf("expected globex row; got tenant=%q", got.TenantID) + } +} + +// TestGraphRAG_GetGraphSnapshot_TenantScoped seeds two snapshots (one per +// tenant) at the same instant and asserts each tenant only retrieves its own. +func TestGraphRAG_GetGraphSnapshot_TenantScoped(t *testing.T) { + g, db := newTestGraphRAGWithDB(t) + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("migrate: %v", err) + } + now := time.Now().UTC() + for _, tenant := range []string{"acme", "globex"} { + snap := GraphSnapshot{ + TenantID: tenant, + ID: "snap_" + tenant, + CreatedAt: now, + Nodes: []byte(`[]`), + Edges: []byte(`[]`), + ServiceCount: 1, + AvgHealthScore: 1, + } + if err := db.Create(&snap).Error; err != nil { + t.Fatalf("seed %s: %v", tenant, err) + } + } + for _, tenant := range []string{"acme", "globex"} { + ctx := storage.WithTenantContext(context.Background(), tenant) + snap, err := g.GetGraphSnapshot(ctx, now.Add(time.Second)) + if err != nil { + t.Fatalf("get %s: %v", tenant, err) + } + if snap.TenantID != tenant { + t.Errorf("ctx %s returned snapshot for tenant %q", tenant, snap.TenantID) + } + if !strings.HasSuffix(snap.ID, tenant) { + t.Errorf("ctx %s returned snapshot id %s", tenant, snap.ID) + } + } +} diff --git a/internal/graphrag/refresh.go b/internal/graphrag/refresh.go index 43a941e..b16d8bc 100644 --- a/internal/graphrag/refresh.go +++ b/internal/graphrag/refresh.go @@ -77,7 +77,7 @@ func (g *GraphRAG) persistDrainTemplates() { if len(tpls) == 0 { return } - if err := SaveDrainTemplates(g.repo.DB(), tpls); err != nil { + if err := SaveDrainTemplates(g.repo.DB(), storage.DefaultTenantID, tpls); err != nil { slog.Error("Failed to persist drain templates", "error", err) return } diff --git a/internal/graphrag/schema.go b/internal/graphrag/schema.go index 81b0754..fed9e4d 100644 --- a/internal/graphrag/schema.go +++ b/internal/graphrag/schema.go @@ -222,7 +222,13 @@ type RankedCause struct { // standard SQL drivers reject uint64 values with the high bit set, and signed // int64 carries the same 64 bits without loss. Conversion happens in the // persistence helpers. +// +// The primary key is composite (tenant_id, id): the same template tokens can +// legitimately recur across tenants, and we want the cluster ID to stay stable +// per tenant once the in-memory Drain miner is partitioned per-tenant. TenantID +// is declared first so it leads the PK index. type DrainTemplateRow struct { + TenantID string `gorm:"primaryKey;size:64;default:'default';not null" json:"tenant_id"` ID int64 `gorm:"primaryKey;autoIncrement:false" json:"id"` // int64(Template.ID) Tokens string `gorm:"type:text;not null" json:"tokens"` // JSON-encoded []string Count int `json:"count"` diff --git a/internal/graphrag/snapshot.go b/internal/graphrag/snapshot.go index 3a77328..aef160f 100644 --- a/internal/graphrag/snapshot.go +++ b/internal/graphrag/snapshot.go @@ -12,14 +12,14 @@ import ( // GraphSnapshot is a periodic snapshot of the service topology persisted to DB. // -// Tenant identity is not yet a column on this model — Subtask B (RAN-38) adds -// `tenant_id` to the schema along with the persistence-layer filtering. For -// now, snapshots are written one row per tenant per tick and remain queryable -// only across the whole table; callers must not treat pre-Subtask-B rows as -// tenant-scoped. +// TenantID scopes the row to the tenant slice it was captured from. The +// composite (tenant_id, created_at) index supports the +// "most recent snapshot at-or-before T for tenant X" lookup that +// GetGraphSnapshot runs on every read. type GraphSnapshot struct { + TenantID string `gorm:"size:64;default:'default';not null;index:idx_graph_snapshots_tenant_created,priority:1" json:"tenant_id"` ID string `gorm:"primaryKey;size:64" json:"id"` - CreatedAt time.Time `json:"created_at"` + CreatedAt time.Time `gorm:"index:idx_graph_snapshots_tenant_created,priority:2" json:"created_at"` Nodes json.RawMessage `gorm:"type:text" json:"nodes"` Edges json.RawMessage `gorm:"type:text" json:"edges"` ServiceCount int `json:"service_count"` @@ -117,6 +117,7 @@ func (g *GraphRAG) takeSnapshotForTenant(_ context.Context, tenant string, store edgesJSON, _ := json.Marshal(snapEdges) snap := GraphSnapshot{ + TenantID: tenant, ID: fmt.Sprintf("snap_%s_%d", tenant, time.Now().UnixNano()), CreatedAt: time.Now(), Nodes: nodesJSON, @@ -155,12 +156,14 @@ func (g *GraphRAG) pruneOldSnapshots() { } } -// GetGraphSnapshot retrieves the snapshot closest to the requested time. -// TODO(RAN-38, Subtask B): scope by tenant once tenant_id lands on the table. -func (g *GraphRAG) GetGraphSnapshot(at time.Time) (*GraphSnapshot, error) { +// GetGraphSnapshot retrieves the snapshot closest to the requested time, +// scoped to the tenant carried by ctx. The composite (tenant_id, created_at) +// index supports the descending lookup. +func (g *GraphRAG) GetGraphSnapshot(ctx context.Context, at time.Time) (*GraphSnapshot, error) { + tenant := storage.TenantFromContext(ctx) var snap GraphSnapshot err := g.repo.DB(). - Where("created_at <= ?", at). + Where("tenant_id = ? AND created_at <= ?", tenant, at). Order("created_at DESC"). First(&snap).Error if err != nil { diff --git a/internal/mcp/tools.go b/internal/mcp/tools.go index 91df67b..4ae7b4b 100644 --- a/internal/mcp/tools.go +++ b/internal/mcp/tools.go @@ -320,11 +320,11 @@ func (s *Server) toolHandler(ctx context.Context, name string, args map[string]a case "correlated_signals": return s.toolCorrelatedSignals(ctx, args) case "get_investigations": - return s.toolGetInvestigations(args) + return s.toolGetInvestigations(ctx, args) case "get_investigation": - return s.toolGetInvestigationByID(args) + return s.toolGetInvestigationByID(ctx, args) case "get_graph_snapshot": - return s.toolGetGraphSnapshot(args) + return s.toolGetGraphSnapshot(ctx, args) case "get_anomaly_timeline": return s.toolGetAnomalyTimeline(ctx, args) default: @@ -749,7 +749,7 @@ func (s *Server) toolCorrelatedSignals(ctx context.Context, args map[string]any) return textResult(string(data)) } -func (s *Server) toolGetInvestigations(args map[string]any) ToolCallResult { +func (s *Server) toolGetInvestigations(ctx context.Context, args map[string]any) ToolCallResult { if s.graphRAG == nil { return errorResult("GraphRAG not initialized") } @@ -758,7 +758,7 @@ func (s *Server) toolGetInvestigations(args map[string]any) ToolCallResult { status, _ := args["status"].(string) limit := argInt(args, "limit", 20) - investigations, err := s.graphRAG.GetInvestigations(service, severity, status, limit) + investigations, err := s.graphRAG.GetInvestigations(ctx, service, severity, status, limit) if err != nil { return errorResult(fmt.Sprintf("failed to query investigations: %v", err)) } @@ -769,7 +769,7 @@ func (s *Server) toolGetInvestigations(args map[string]any) ToolCallResult { return textResult(string(data)) } -func (s *Server) toolGetInvestigationByID(args map[string]any) ToolCallResult { +func (s *Server) toolGetInvestigationByID(ctx context.Context, args map[string]any) ToolCallResult { if s.graphRAG == nil { return errorResult("GraphRAG not initialized") } @@ -777,7 +777,7 @@ func (s *Server) toolGetInvestigationByID(args map[string]any) ToolCallResult { if id == "" { return errorResult("investigation_id is required") } - inv, err := s.graphRAG.GetInvestigation(id) + inv, err := s.graphRAG.GetInvestigation(ctx, id) if err != nil { return errorResult(fmt.Sprintf("investigation not found: %v", err)) } @@ -788,7 +788,7 @@ func (s *Server) toolGetInvestigationByID(args map[string]any) ToolCallResult { return textResult(string(data)) } -func (s *Server) toolGetGraphSnapshot(args map[string]any) ToolCallResult { +func (s *Server) toolGetGraphSnapshot(ctx context.Context, args map[string]any) ToolCallResult { if s.graphRAG == nil { return errorResult("GraphRAG not initialized") } @@ -797,7 +797,7 @@ func (s *Server) toolGetGraphSnapshot(args map[string]any) ToolCallResult { if at.IsZero() { at = time.Now() } - snap, err := s.graphRAG.GetGraphSnapshot(at) + snap, err := s.graphRAG.GetGraphSnapshot(ctx, at) if err != nil { return errorResult(fmt.Sprintf("no snapshot found: %v", err)) } From c839460e5e002d281294484f36bf10327e023188 Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Fri, 24 Apr 2026 21:01:54 +0000 Subject: [PATCH 2/5] feat(mcp): tenant ctx through GraphRAG handlers + merge-gate isolation test (RAN-39) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Threads the tenant resolved by the MCP transport (X-Tenant-ID header → ctx) into every GraphRAG-backed tool handler and adds the merge-gate integration test that asserts cross-tenant isolation for the full GraphRAG-backed MCP surface. mcp/tools.go - get_system_graph and get_service_health now accept ctx and route through GraphRAG.ServiceMap / AllServiceEdges so they pick up RAN-37's per-tenant in-memory partitioning. Legacy svcGraph remains as a fallback path only when GraphRAG isn't wired (boot windows, future test harnesses). - All other GraphRAG handlers were already ctx-threaded after RAN-37/38; no behavior change for those. vectordb/index.go (+ main.go, api/similar_handler.go) - vectordb.Index.Add and Search now take a tenant string; LogVector and SearchResult carry the tenant tag and Search filters by it. RAN-37 already added tenant args at the call sites in graphrag/clustering.go and mcp/tools.go but the matching vectordb signature change had not landed, leaving the branch unbuildable. This closes that gap with the smallest surgical change; the broader vectordb rework remains RAN-20. - main.go now passes l.TenantID into vectorIdx.Add on hydration and the live ingest hook; api/similar_handler resolves tenant from the request context before searching. graphrag/builder.go - New RegisterAnomaly(tenant, AnomalyNode) — small public API symmetric with PersistInvestigation, used by the new isolation test to seed per-tenant anomalies without depending on the throttled detector loop. mcp/tenant_isolation_test.go - Stands up an in-process MCP server (httptest) wired to GraphRAG over in-memory SQLite, seeds three tenants (acme, beta, default) with overlapping service_name / trace_id / span_id / Drain template / log body / snapshot, and exercises every GraphRAG-backed tool — get_service_map, get_service_health, get_error_chains, trace_graph, impact_analysis, root_cause_analysis, correlated_signals, get_anomaly_timeline, get_investigations, get_investigation (own + cross-tenant id-guess), get_graph_snapshot, find_similar_logs, get_system_graph — three times each (X-Tenant-ID acme, X-Tenant-ID beta, no header → DefaultTenantID). Each response is scanned for the caller's own tenant marker and for any other seeded tenant's marker (service name, log body, op name, anomaly evidence, snapshot id) to prove no cross-tenant leak. Verified: go vet ./... clean; go test ./... clean; go test -race ./internal/{mcp,graphrag}/... clean. Co-Authored-By: Paperclip --- internal/api/similar_handler.go | 5 +- internal/graphrag/builder.go | 13 + internal/mcp/tenant_isolation_test.go | 562 ++++++++++++++++++++++++++ internal/mcp/tools.go | 44 +- internal/vectordb/index.go | 37 +- main.go | 4 +- 6 files changed, 654 insertions(+), 11 deletions(-) create mode 100644 internal/mcp/tenant_isolation_test.go diff --git a/internal/api/similar_handler.go b/internal/api/similar_handler.go index 1668801..ac0fe57 100644 --- a/internal/api/similar_handler.go +++ b/internal/api/similar_handler.go @@ -4,6 +4,8 @@ import ( "encoding/json" "net/http" "strconv" + + "github.com/RandomCodeSpace/otelcontext/internal/storage" ) // handleGetSimilarLogs handles GET /api/logs/similar?q=&limit=10 @@ -30,7 +32,8 @@ func (s *Server) handleGetSimilarLogs(w http.ResponseWriter, r *http.Request) { limit = 50 } - results := s.vectorIdx.Search(query, limit) + tenant := storage.TenantFromContext(r.Context()) + results := s.vectorIdx.Search(tenant, query, limit) w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{ diff --git a/internal/graphrag/builder.go b/internal/graphrag/builder.go index 5653463..0de3519 100644 --- a/internal/graphrag/builder.go +++ b/internal/graphrag/builder.go @@ -154,6 +154,19 @@ func (g *GraphRAG) DroppedMetricsCount() int64 { return g.droppedMetrics.Load() // tests to assert cooldown behavior without requiring a live repo. func (g *GraphRAG) InvestigationInsertCount() int64 { return g.invInserts.Load() } +// RegisterAnomaly inserts an anomaly into the AnomalyStore for tenant. +// Mirrors PersistInvestigation's "tenant accepted explicitly" shape so +// out-of-band anomaly producers (synthetic detectors, integration tests, +// future external anomaly feeds) can land directly on the right tenant +// slice without going through the metric/error detection loops. Empty +// tenant collapses to storage.DefaultTenantID. +func (g *GraphRAG) RegisterAnomaly(tenant string, anomaly AnomalyNode) { + if tenant == "" { + tenant = storage.DefaultTenantID + } + g.storesForTenant(tenant).anomalies.AddAnomaly(anomaly) +} + // recordEventDrop increments the per-signal atomic counter and — when // a telemetry registry is wired — the Prometheus counter vec. func (g *GraphRAG) recordEventDrop(signal string) { diff --git a/internal/mcp/tenant_isolation_test.go b/internal/mcp/tenant_isolation_test.go new file mode 100644 index 0000000..6ff8535 --- /dev/null +++ b/internal/mcp/tenant_isolation_test.go @@ -0,0 +1,562 @@ +// Package mcp tests the merge-gate invariant for RAN-19/RAN-39: every +// GraphRAG-backed MCP tool (and the legacy svcGraph-backed tools rewired +// onto GraphRAG) must scope its response to the tenant carried by the +// X-Tenant-ID header — overlapping data ingested for two tenants under +// the same service_name, trace_id, span_id, log template, and snapshot +// time must never leak across tenant boundaries. +package mcp + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/RandomCodeSpace/otelcontext/internal/graphrag" + "github.com/RandomCodeSpace/otelcontext/internal/storage" + "github.com/RandomCodeSpace/otelcontext/internal/vectordb" +) + +// tenants exercised by the test. The third row uses an empty header to +// prove that absence-of-header collapses to storage.DefaultTenantID, which +// is also a real ingest target so we get a meaningful response (not just +// vacuous emptiness). +var isolationCallers = []struct { + name string + header string + scoped string + otherSeeded []string +}{ + {name: "acme", header: "acme", scoped: "acme", otherSeeded: []string{"beta", storage.DefaultTenantID}}, + {name: "beta", header: "beta", scoped: "beta", otherSeeded: []string{"acme", storage.DefaultTenantID}}, + {name: "no_header_default", header: "", scoped: storage.DefaultTenantID, otherSeeded: []string{"acme", "beta"}}, +} + +// allTenants is the set of tenants we actually seed. Used by leak scans +// regardless of which caller is currently being asserted. +var allTenants = []string{"acme", "beta", storage.DefaultTenantID} + +// markersFor builds the (own, others) marker pair for a given caller. +// Markers are tenant-stamped strings that appear inside service names, +// operation names, log bodies, and anomaly evidence — so a textual scan +// of the JSON response is sufficient to detect a cross-tenant leak. +func markersFor(scoped string, others []string) (own []string, leak []string) { + own = []string{ + scoped + "-orders", + scoped + "-marker", + scoped + "-op", + } + for _, t := range others { + leak = append(leak, + t+"-orders", + t+"-marker", + t+"-op", + t+"-anomaly-marker", + ) + } + return own, leak +} + +// setupTenantIsolationServer wires an in-process MCP server against an +// in-memory SQLite repo and a started GraphRAG. The background refresh, +// snapshot, and anomaly loops are stretched to "never" inside the test +// window so the only state that lands in the stores is the data the test +// seeds explicitly — making leak assertions deterministic. +func setupTenantIsolationServer(t *testing.T) (*httptest.Server, *graphrag.GraphRAG, *storage.Repository, *vectordb.Index) { + t.Helper() + + db, err := storage.NewDatabase("sqlite", ":memory:") + if err != nil { + t.Fatalf("NewDatabase: %v", err) + } + if err := storage.AutoMigrateModels(db, "sqlite"); err != nil { + t.Fatalf("AutoMigrateModels: %v", err) + } + if err := graphrag.AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("AutoMigrateGraphRAG: %v", err) + } + repo := storage.NewRepositoryFromDB(db, "sqlite") + + vIdx := vectordb.New(1000) + + cfg := graphrag.DefaultConfig() + cfg.RefreshEvery = 24 * time.Hour + cfg.SnapshotEvery = 24 * time.Hour + cfg.AnomalyEvery = 24 * time.Hour + cfg.WorkerCount = 4 + + g := graphrag.New(repo, vIdx, nil, nil, cfg) + bgCtx, cancel := context.WithCancel(context.Background()) + go g.Start(bgCtx) + + srv := New(repo, nil, nil, vIdx) + srv.SetGraphRAG(g) + + httpSrv := httptest.NewServer(srv.Handler()) + + t.Cleanup(func() { + httpSrv.Close() + cancel() + g.Stop() + _ = repo.Close() + }) + + return httpSrv, g, repo, vIdx +} + +// seedTenant ingests a small but representative slice of telemetry for +// tenant T: a parent OK span, a child ERROR span, a matching ERROR log, +// a vector-index doc, an injected anomaly, a persisted investigation, +// and a graph snapshot row. All identifiers (trace_id, span_id) collide +// across tenants on purpose — the tenant slice is the only thing keeping +// them apart. +func seedTenant(t *testing.T, g *graphrag.GraphRAG, repo *storage.Repository, vIdx *vectordb.Index, tenant string, ts time.Time) { + t.Helper() + + service := tenant + "-orders" + op := tenant + "-op-checkout" + logBody := tenant + "-marker connection refused upstream" + traceID := "trace-shared" + rootSpanID := "span-root" + childSpanID := "span-child" + + // Root span (OK). + g.OnSpanIngested(storage.Span{ + TenantID: tenant, + TraceID: traceID, + SpanID: rootSpanID, + OperationName: "/checkout", + ServiceName: service, + Status: "STATUS_CODE_OK", + StartTime: ts, + EndTime: ts.Add(2 * time.Millisecond), + Duration: 2000, + }) + + // Child span (ERROR), parented to root → upstream walk lands on the + // per-tenant root span. + g.OnSpanIngested(storage.Span{ + TenantID: tenant, + TraceID: traceID, + SpanID: childSpanID, + ParentSpanID: rootSpanID, + OperationName: op, + ServiceName: service, + Status: "STATUS_CODE_ERROR", + StartTime: ts.Add(time.Millisecond), + EndTime: ts.Add(2 * time.Millisecond), + Duration: 1000, + }) + + // Log carrying the per-tenant marker — drives Drain clustering and + // CorrelatedSignals; the body is also stored in the vector index. + g.OnLogIngested(storage.Log{ + TenantID: tenant, + TraceID: traceID, + SpanID: childSpanID, + ServiceName: service, + Severity: "ERROR", + Body: logBody, + Timestamp: ts.Add(2 * time.Millisecond), + }) + + // Vector index doc — find_similar_logs path is keyed by tenant. + vIdx.Add(0, tenant, service, "ERROR", logBody) + + // Inject a per-tenant anomaly directly so AnomalyTimeline has + // something to return without depending on the anomaly detector + // loop (which is throttled to 24h in this fixture). + g.RegisterAnomaly(tenant, graphrag.AnomalyNode{ + ID: tenant + "-anomaly-1", + Type: graphrag.AnomalyErrorSpike, + Severity: graphrag.SeverityCritical, + Service: service, + Evidence: tenant + "-anomaly-marker error_rate=0.95", + Timestamp: ts.Add(3 * time.Millisecond), + }) + + // Snapshot row — insert directly so we control the tenant_id and ID + // (takeSnapshot is the production loop, but it is package-private). + snap := graphrag.GraphSnapshot{ + TenantID: tenant, + ID: "snap-" + tenant, + CreatedAt: ts, + Nodes: json.RawMessage(`[{"name":"` + service + `","marker":"` + tenant + `-marker"}]`), + Edges: json.RawMessage(`[]`), + ServiceCount: 1, + AvgHealthScore: 0.5, + } + if err := repo.DB().Create(&snap).Error; err != nil { + t.Fatalf("seed snapshot for %q: %v", tenant, err) + } +} + +// waitForServiceMaps polls until every seeded tenant's ServiceMap reflects +// at least one service. Required because OnSpanIngested is async. +func waitForServiceMaps(t *testing.T, g *graphrag.GraphRAG, tenants []string) { + t.Helper() + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + ok := true + for _, tn := range tenants { + ctx := storage.WithTenantContext(context.Background(), tn) + if len(g.ServiceMap(ctx, 0)) == 0 { + ok = false + break + } + } + if ok { + return + } + time.Sleep(20 * time.Millisecond) + } + t.Fatalf("timed out waiting for ServiceMap to reflect ingested spans for %v", tenants) +} + +// seedInvestigations relies on the in-memory state already being warm +// (see waitForServiceMaps). PersistInvestigation reaches into ImpactAnalysis +// internally, which reads from the per-tenant ServiceStore. +func seedInvestigations(t *testing.T, g *graphrag.GraphRAG, ts time.Time) { + t.Helper() + for _, tenant := range allTenants { + service := tenant + "-orders" + chain := graphrag.ErrorChainResult{ + RootCause: &graphrag.RootCauseInfo{ + Service: service, + Operation: tenant + "-op-checkout", + ErrorMessage: tenant + "-marker connection refused upstream", + SpanID: "span-child", + TraceID: "trace-shared", + }, + SpanChain: []graphrag.SpanNode{{ + ID: "span-child", + TraceID: "trace-shared", + Service: service, + Operation: tenant + "-op-checkout", + IsError: true, + Timestamp: ts, + }}, + TraceID: "trace-shared", + } + g.PersistInvestigation(tenant, service, []graphrag.ErrorChainResult{chain}, nil) + } +} + +// callTool sends a JSON-RPC tools/call request to the test MCP server +// with the given X-Tenant-ID header (omitted when empty) and returns the +// inner ToolCallResult — i.e., the structure the LLM client would see. +// Also returns the concatenated text payload across content items, which +// is what tenant-leak assertions actually scan. +func callTool(t *testing.T, ts *httptest.Server, headerTenant, name string, args map[string]any) (ToolCallResult, string) { + t.Helper() + if args == nil { + args = map[string]any{} + } + rpcReq := map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": map[string]any{ + "name": name, + "arguments": args, + }, + } + body, err := json.Marshal(rpcReq) + if err != nil { + t.Fatalf("marshal rpc: %v", err) + } + req, err := http.NewRequest(http.MethodPost, ts.URL, bytes.NewReader(body)) + if err != nil { + t.Fatalf("new request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + if headerTenant != "" { + req.Header.Set("X-Tenant-ID", headerTenant) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("rpc do: %v", err) + } + defer resp.Body.Close() + raw, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("read body: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("rpc status %d: %s", resp.StatusCode, raw) + } + + var rpcResp struct { + JSONRPC string `json:"jsonrpc"` + ID any `json:"id"` + Result ToolCallResult `json:"result"` + Error *RPCError `json:"error"` + } + if err := json.Unmarshal(raw, &rpcResp); err != nil { + t.Fatalf("unmarshal rpc: %v\nraw: %s", err, raw) + } + if rpcResp.Error != nil { + t.Fatalf("rpc error %d: %s", rpcResp.Error.Code, rpcResp.Error.Message) + } + + var sb strings.Builder + for _, c := range rpcResp.Result.Content { + sb.WriteString(c.Text) + if c.Resource != nil { + sb.WriteString(c.Resource.Text) + } + } + return rpcResp.Result, sb.String() +} + +// assertNoLeak fails if any of leakMarkers appears in body. ownMarker is +// optional — when non-empty it must appear, proving the tool returned +// real per-tenant data and not just a vacuous empty result. +func assertNoLeak(t *testing.T, label, body, ownMarker string, leakMarkers []string) { + t.Helper() + if ownMarker != "" && !strings.Contains(body, ownMarker) { + t.Errorf("[%s] expected own marker %q in response, body=%s", label, ownMarker, truncate(body)) + } + for _, m := range leakMarkers { + if strings.Contains(body, m) { + t.Errorf("[%s] CROSS-TENANT LEAK: foreign marker %q present in response, body=%s", label, m, truncate(body)) + } + } +} + +func truncate(s string) string { + const max = 800 + if len(s) <= max { + return s + } + return s[:max] + "…(truncated)" +} + +// TestMCP_TenantIsolation_AllGraphRAGTools is the merge gate for RAN-19. +// For every GraphRAG-backed (and GraphRAG-rewired) MCP tool, it issues +// the same call from three callers — X-Tenant-ID: acme, X-Tenant-ID: beta, +// no header — against overlapping seeded data and asserts each response +// contains only the caller-tenant's data and never leaks another tenant's +// service name, log marker, operation, anomaly, or snapshot row. +func TestMCP_TenantIsolation_AllGraphRAGTools(t *testing.T) { + ts, g, repo, vIdx := setupTenantIsolationServer(t) + + now := time.Now().Add(-time.Minute) // a hair in the past so since=now-15m sees us + + for _, tenant := range allTenants { + seedTenant(t, g, repo, vIdx, tenant, now) + } + waitForServiceMaps(t, g, allTenants) + seedInvestigations(t, g, now) + + // Resolve investigation IDs per tenant (PersistInvestigation generates + // them internally; we discover them by querying after the fact, then + // hand them back into get_investigation in the per-caller assertions). + invIDsByTenant := map[string]string{} + for _, tenant := range allTenants { + ctx := storage.WithTenantContext(context.Background(), tenant) + invs, err := g.GetInvestigations(ctx, "", "", "", 10) + if err != nil { + t.Fatalf("GetInvestigations(%s): %v", tenant, err) + } + if len(invs) == 0 { + t.Fatalf("expected at least one persisted investigation for %s, got 0", tenant) + } + invIDsByTenant[tenant] = invs[0].ID + } + + // snapshot lookup time — slightly in the future so "<= at" matches every + // seeded row regardless of microsecond drift. + snapAt := time.Now().Add(time.Minute).UTC().Format(time.RFC3339) + + for _, caller := range isolationCallers { + caller := caller + ownMarkers, leakMarkers := markersFor(caller.scoped, caller.otherSeeded) + // At minimum the response should reference the caller's service + // for queries that are service-shaped. ownMarkers is intentionally + // kept as the canonical "anything tenant-tagged" set in case future + // assertions want it; per-tool checks pick the most relevant one. + ownService := caller.scoped + "-orders" + ownLogMarker := caller.scoped + "-marker" + ownAnomalyMarker := caller.scoped + "-anomaly-marker" + _ = ownMarkers + + // --- in-memory GraphRAG tools --- + + t.Run(caller.name+"/get_service_map", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_service_map", nil) + assertNoLeak(t, "get_service_map", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/get_service_health", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_service_health", map[string]any{ + "service_name": ownService, + }) + assertNoLeak(t, "get_service_health", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/get_error_chains", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_error_chains", map[string]any{ + "service": ownService, + "time_range": "1h", + "limit": 10, + }) + assertNoLeak(t, "get_error_chains", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/trace_graph", func(t *testing.T) { + // trace_id collides across tenants; correct routing must surface + // only the caller's per-tenant operation/service. + _, body := callTool(t, ts, caller.header, "trace_graph", map[string]any{ + "trace_id": "trace-shared", + }) + assertNoLeak(t, "trace_graph", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/impact_analysis", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "impact_analysis", map[string]any{ + "service": ownService, + "depth": 3, + }) + assertNoLeak(t, "impact_analysis", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/root_cause_analysis", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "root_cause_analysis", map[string]any{ + "service": ownService, + "time_range": "1h", + }) + assertNoLeak(t, "root_cause_analysis", body, "", leakMarkers) + }) + + t.Run(caller.name+"/correlated_signals", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "correlated_signals", map[string]any{ + "service": ownService, + "time_range": "1h", + }) + // CorrelatedSignals collects logs/metrics for the service, so the + // per-tenant log marker should appear. + assertNoLeak(t, "correlated_signals", body, ownLogMarker, leakMarkers) + }) + + t.Run(caller.name+"/get_anomaly_timeline", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_anomaly_timeline", nil) + assertNoLeak(t, "get_anomaly_timeline", body, ownAnomalyMarker, leakMarkers) + }) + + // --- DB-backed GraphRAG tools --- + + t.Run(caller.name+"/get_investigations", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_investigations", nil) + assertNoLeak(t, "get_investigations", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/get_investigation_by_id_own_tenant", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_investigation", map[string]any{ + "investigation_id": invIDsByTenant[caller.scoped], + }) + assertNoLeak(t, "get_investigation/own", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/get_investigation_by_id_other_tenant_blocks", func(t *testing.T) { + // Asking by another tenant's ID must NOT return that row — id- + // guessing would otherwise leak across tenants. The handler + // surfaces a tool-level error result, which is fine; what + // matters is that the foreign tenant's data does not appear. + otherTenant := caller.otherSeeded[0] + _, body := callTool(t, ts, caller.header, "get_investigation", map[string]any{ + "investigation_id": invIDsByTenant[otherTenant], + }) + assertNoLeak(t, "get_investigation/cross-tenant", body, "", leakMarkers) + }) + + t.Run(caller.name+"/get_graph_snapshot", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_graph_snapshot", map[string]any{ + "time": snapAt, + }) + // Snapshot rows are tagged with the tenant marker so the leak + // scan covers both ID prefixes (snap-acme/snap-beta/snap-default) + // and the inline node markers. + assertNoLeak(t, "get_graph_snapshot", body, "snap-"+caller.scoped, leakMarkers) + }) + + // --- vectordb-backed tool (Drain path is exercised by ingestion above) --- + + t.Run(caller.name+"/find_similar_logs", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "find_similar_logs", map[string]any{ + "query": "connection refused upstream", + "limit": 10, + }) + assertNoLeak(t, "find_similar_logs", body, ownLogMarker, leakMarkers) + }) + + // --- Legacy/rewired surface --- + // get_system_graph is rewired onto GraphRAG by RAN-39, so the same + // per-tenant invariants apply. + t.Run(caller.name+"/get_system_graph", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_system_graph", nil) + assertNoLeak(t, "get_system_graph", body, ownService, leakMarkers) + }) + } +} + +// TestMCP_TenantIsolation_DrainClusterIDsStayPerTenant proves that two +// tenants writing identical log bodies do not collide on the same Drain +// cluster id surfaced by CorrelatedSignals. Drain itself is currently a +// shared miner, but the LogClusterNodes are stored on per-tenant +// SignalStores so the cluster id surfaces tenant-side and a tenant cannot +// observe another tenant's cluster row. +func TestMCP_TenantIsolation_DrainClusterIDsStayPerTenant(t *testing.T) { + ts, g, repo, vIdx := setupTenantIsolationServer(t) + now := time.Now().Add(-time.Minute) + + // Identical log body for both tenants — collision-by-design. + for _, tenant := range []string{"acme", "beta"} { + seedTenant(t, g, repo, vIdx, tenant, now) + } + waitForServiceMaps(t, g, []string{"acme", "beta"}) + + for _, scoped := range []string{"acme", "beta"} { + _, body := callTool(t, ts, scoped, "correlated_signals", map[string]any{ + "service": scoped + "-orders", + "time_range": "1h", + }) + // Caller's marker must appear, the other tenant's must not. + other := "beta" + if scoped == "beta" { + other = "acme" + } + if !strings.Contains(body, scoped+"-marker") { + t.Errorf("%s correlated_signals missing own marker, body=%s", scoped, truncate(body)) + } + if strings.Contains(body, other+"-marker") { + t.Errorf("%s correlated_signals leaked %s marker, body=%s", scoped, other, truncate(body)) + } + } + + // Sanity: prove the test setup actually shares state between tenants + // at the storage layer (so the isolation we're asserting above is + // non-trivial). Same trace_id should land in two distinct rows because + // Span.TenantID is part of the unique identity for these inserts. + // We don't persist spans here directly (we go through OnSpanIngested + // which is in-memory only), so we just assert the in-memory invariant. + ctxA := storage.WithTenantContext(context.Background(), "acme") + ctxB := storage.WithTenantContext(context.Background(), "beta") + mapA := g.ServiceMap(ctxA, 0) + mapB := g.ServiceMap(ctxB, 0) + if got, want := len(mapA), 1; got != want { + t.Fatalf("acme ServiceMap len=%d want=%d (%+v)", got, want, mapA) + } + if got, want := len(mapB), 1; got != want { + t.Fatalf("beta ServiceMap len=%d want=%d (%+v)", got, want, mapB) + } + if mapA[0].Service.Name == mapB[0].Service.Name { + t.Fatalf("ServiceMap shows same service name for both tenants — partition broken: %v vs %v", mapA[0].Service, mapB[0].Service) + } +} + diff --git a/internal/mcp/tools.go b/internal/mcp/tools.go index 4ae7b4b..96b1fe1 100644 --- a/internal/mcp/tools.go +++ b/internal/mcp/tools.go @@ -286,9 +286,9 @@ func (s *Server) toolHandler(ctx context.Context, name string, args map[string]a }() switch name { case "get_system_graph": - return s.toolGetSystemGraph(args) + return s.toolGetSystemGraph(ctx, args) case "get_service_health": - return s.toolGetServiceHealth(args) + return s.toolGetServiceHealth(ctx, args) case "search_logs": return s.toolSearchLogs(ctx, args) case "tail_logs": @@ -334,7 +334,28 @@ func (s *Server) toolHandler(ctx context.Context, name string, args map[string]a // --- Tool implementations --- -func (s *Server) toolGetSystemGraph(_ map[string]any) ToolCallResult { +// toolGetSystemGraph returns a tenant-scoped service topology snapshot. +// +// When GraphRAG is wired (the default in production) the response is built +// from its per-tenant ServiceMap and AllServiceEdges, so two tenants with +// overlapping service names cannot see each other's nodes or edges. The +// legacy *graph.Graph remains as a fallback for boot windows when GraphRAG +// is still warming up; that fallback is cross-tenant by construction and +// is the documented legacy code path called out in RAN-39. +func (s *Server) toolGetSystemGraph(ctx context.Context, _ map[string]any) ToolCallResult { + if s.graphRAG != nil { + entries := s.graphRAG.ServiceMap(mcpCtx(ctx), 0) + edges := s.graphRAG.AllServiceEdges(mcpCtx(ctx)) + payload := map[string]any{ + "services": entries, + "edges": edges, + } + data, err := json.MarshalIndent(payload, "", " ") + if err != nil { + return errorResult(fmt.Sprintf("failed to marshal system graph: %v", err)) + } + return textResult(string(data)) + } if s.svcGraph == nil { return errorResult("service graph not yet initialized") } @@ -346,11 +367,26 @@ func (s *Server) toolGetSystemGraph(_ map[string]any) ToolCallResult { return textResult(string(data)) } -func (s *Server) toolGetServiceHealth(args map[string]any) ToolCallResult { +// toolGetServiceHealth returns the ServiceMap entry for svcName scoped to +// the tenant on ctx. Falls back to the legacy svcGraph snapshot when +// GraphRAG is not yet wired. +func (s *Server) toolGetServiceHealth(ctx context.Context, args map[string]any) ToolCallResult { svcName, _ := args["service_name"].(string) if svcName == "" { return errorResult("service_name is required") } + if s.graphRAG != nil { + for _, entry := range s.graphRAG.ServiceMap(mcpCtx(ctx), 0) { + if entry.Service != nil && entry.Service.Name == svcName { + data, err := json.MarshalIndent(entry, "", " ") + if err != nil { + return errorResult(fmt.Sprintf("failed to marshal service health: %v", err)) + } + return textResult(string(data)) + } + } + return textResult(fmt.Sprintf("service %q not found in the current tenant window", svcName)) + } if s.svcGraph == nil { return errorResult("service graph not yet initialized") } diff --git a/internal/vectordb/index.go b/internal/vectordb/index.go index 1333d57..8f325f7 100644 --- a/internal/vectordb/index.go +++ b/internal/vectordb/index.go @@ -11,9 +11,20 @@ import ( "unicode" ) +// defaultTenantID is the tenant assigned when the caller passes an empty +// tenant string. Mirrors storage.DefaultTenantID; duplicated here to avoid +// pulling internal/storage into vectordb's import graph. +const defaultTenantID = "default" + // LogVector represents an indexed log entry. +// +// Tenant scopes the document so Search can return only the caller's tenant +// rows. The TF-IDF table is shared across tenants — global IDF still gives +// the right rarity signal — but the per-document tenant tag is enforced at +// query time so two tenants with overlapping log bodies stay isolated. type LogVector struct { LogID uint + Tenant string ServiceName string Severity string Body string @@ -23,6 +34,7 @@ type LogVector struct { // SearchResult is a single similarity hit. type SearchResult struct { LogID uint + Tenant string ServiceName string Severity string Body string @@ -50,8 +62,10 @@ func New(maxSize int) *Index { } } -// Add adds a log to the index. Thread-safe. -func (idx *Index) Add(logID uint, serviceName, severity, body string) { +// Add adds a log to the index. Thread-safe. Tenant is recorded with the +// document so Search can filter by it; an empty tenant collapses to +// the platform default at the boundary, matching storage.TenantFromContext. +func (idx *Index) Add(logID uint, tenant, serviceName, severity, body string) { if !shouldIndex(severity) { return } @@ -61,6 +75,10 @@ func (idx *Index) Add(logID uint, serviceName, severity, body string) { } tf := computeTF(tokens) + if tenant == "" { + tenant = defaultTenantID + } + idx.mu.Lock() defer idx.mu.Unlock() @@ -75,6 +93,7 @@ func (idx *Index) Add(logID uint, serviceName, severity, body string) { idx.docs = append(idx.docs, LogVector{ LogID: logID, + Tenant: tenant, ServiceName: serviceName, Severity: severity, Body: body, @@ -83,11 +102,17 @@ func (idx *Index) Add(logID uint, serviceName, severity, body string) { idx.dirty = true } -// Search finds the top-k logs most similar to the query string. -func (idx *Index) Search(query string, k int) []SearchResult { +// Search finds the top-k logs most similar to the query string within +// tenant. Documents from other tenants are excluded — the IDF table stays +// global so rarity is computed against the whole corpus, but result rows +// are filtered to the caller's tenant. +func (idx *Index) Search(tenant, query string, k int) []SearchResult { if k <= 0 { k = 10 } + if tenant == "" { + tenant = defaultTenantID + } tokens := tokenize(query) if len(tokens) == 0 { return nil @@ -124,6 +149,9 @@ func (idx *Index) Search(query string, k int) []SearchResult { } results := make([]scored, 0, len(docs)) for _, doc := range docs { + if doc.Tenant != tenant { + continue + } docVec := make(map[string]float64, len(doc.vec)) for term, tf := range doc.vec { docVec[term] = tf * idfSnap[term] @@ -145,6 +173,7 @@ func (idx *Index) Search(query string, k int) []SearchResult { for i, r := range results { out[i] = SearchResult{ LogID: r.doc.LogID, + Tenant: r.doc.Tenant, ServiceName: r.doc.ServiceName, Severity: r.doc.Severity, Body: r.doc.Body, diff --git a/main.go b/main.go index c94be26..d3d3d2c 100644 --- a/main.go +++ b/main.go @@ -342,7 +342,7 @@ func main() { }) if err == nil { for _, l := range recentLogs { - vectorIdx.Add(l.ID, l.ServiceName, l.Severity, l.Body) + vectorIdx.Add(l.ID, l.TenantID, l.ServiceName, l.Severity, l.Body) } slog.Info("🔍 Vector index hydrated from recent ERROR logs", "count", len(recentLogs)) } @@ -412,7 +412,7 @@ func main() { Timestamp: l.Timestamp, }) aiService.EnqueueLog(l) - vectorIdx.Add(l.ID, l.ServiceName, l.Severity, l.Body) + vectorIdx.Add(l.ID, l.TenantID, l.ServiceName, l.Severity, l.Body) eventHub.NotifyRefresh() if time.Since(start) > 100*time.Millisecond { slog.Warn("Slow broadcast/enqueue", "duration", time.Since(start)) From fb64dbea9ec37479b599a2859ee8e969d63f3e17 Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Fri, 24 Apr 2026 21:19:05 +0000 Subject: [PATCH 3/5] feat(storage): tenant-scoped trace_id uniqueness (RAN-21) Replace standalone uniqueIndex on Trace.TraceID with composite idx_traces_tenant_trace_id(tenant_id, trace_id) so two tenants can ingest the same trace_id without either dropping into OnConflict.DoNothing. AutoMigrateModels now idempotently discovers and drops any legacy single-column unique index on traces.trace_id (SQLite/Postgres/MySQL/MSSQL) so upgraded databases stop silently blocking cross-tenant reuse. Co-Authored-By: Paperclip --- internal/storage/factory.go | 9 + internal/storage/migrate_traces.go | 181 ++++++++++++++++++++ internal/storage/models.go | 9 +- internal/storage/trace_repo.go | 10 +- internal/storage/trace_tenant_test.go | 232 ++++++++++++++++++++++++++ 5 files changed, 436 insertions(+), 5 deletions(-) create mode 100644 internal/storage/migrate_traces.go create mode 100644 internal/storage/trace_tenant_test.go diff --git a/internal/storage/factory.go b/internal/storage/factory.go index 6d2aa7c..733e142 100644 --- a/internal/storage/factory.go +++ b/internal/storage/factory.go @@ -178,6 +178,15 @@ func AutoMigrateModels(db *gorm.DB, driver string) error { return fmt.Errorf("failed to migrate database: %w", err) } + // RAN-21: retire the pre-composite standalone unique index on traces.trace_id. + // AutoMigrate never drops indexes that no longer appear on struct tags, so on + // pre-existing databases the old uniqueIndex would persist and still block + // cross-tenant trace_id reuse. This is idempotent across drivers and a no-op + // on fresh databases. + if err := dropLegacyTraceIDUniqueIndex(db, driver); err != nil { + log.Printf("⚠️ legacy trace_id unique index drop failed: %v", err) + } + // Drop foreign keys that AutoMigrate may have created (MySQL) if driver == "mysql" { db.Exec("ALTER TABLE spans DROP FOREIGN KEY fk_traces_spans") diff --git a/internal/storage/migrate_traces.go b/internal/storage/migrate_traces.go new file mode 100644 index 0000000..07558d2 --- /dev/null +++ b/internal/storage/migrate_traces.go @@ -0,0 +1,181 @@ +package storage + +import ( + "fmt" + "log" + "strings" + + "gorm.io/gorm" +) + +// dropLegacyTraceIDUniqueIndex removes any pre-RAN-21 standalone UNIQUE index +// that covers only traces.trace_id. From RAN-21 onward uniqueness is the +// composite idx_traces_tenant_trace_id on (tenant_id, trace_id); a surviving +// standalone unique index would silently block cross-tenant trace_id reuse. +// +// Discovery is structure-based (not name-based) because the legacy name varied +// across drivers and GORM versions. The composite index — which lists two +// columns — is never matched and therefore never dropped. +// +// Fresh databases never contain the legacy index, so this is a safe no-op on +// first boot. Invoked once per AutoMigrateModels call and is idempotent. +func dropLegacyTraceIDUniqueIndex(db *gorm.DB, driver string) error { + if db == nil { + return nil + } + driver = strings.ToLower(driver) + names, err := findLegacyTraceIDUniqueIndexes(db, driver) + if err != nil { + return err + } + for _, name := range names { + if name == "" { + continue + } + if err := dropIndexOnTraces(db, driver, name); err != nil { + return fmt.Errorf("drop legacy trace_id unique index %q: %w", name, err) + } + log.Printf("🧹 Dropped legacy single-column unique index on traces.trace_id: %s", name) + } + return nil +} + +// findLegacyTraceIDUniqueIndexes returns every UNIQUE index on the traces table +// whose single indexed column is trace_id. The composite RAN-21 index +// (tenant_id, trace_id) is excluded because it covers two columns. +func findLegacyTraceIDUniqueIndexes(db *gorm.DB, driver string) ([]string, error) { + switch driver { + case "sqlite", "": + // Enumerate every unique index on traces, then inspect its column list + // via PRAGMA index_info. SQLite auto-creates indexes for UNIQUE table + // constraints with names prefixed "sqlite_autoindex_" — those also + // surface here and are handled identically. Aliased to is_unique + // because SQLite treats "unique" as a reserved keyword even as an + // output alias. + type idxRow struct { + Name string `gorm:"column:name"` + IsUnique int `gorm:"column:is_unique"` + } + var idxs []idxRow + if err := db.Raw(`SELECT name, "unique" AS is_unique FROM pragma_index_list('traces')`).Scan(&idxs).Error; err != nil { + return nil, fmt.Errorf("pragma_index_list(traces): %w", err) + } + type colRow struct { + Name string `gorm:"column:name"` + } + var out []string + for _, ix := range idxs { + if ix.IsUnique != 1 { + continue + } + var cols []colRow + if err := db.Raw(fmt.Sprintf("SELECT name FROM pragma_index_info('%s')", ix.Name)).Scan(&cols).Error; err != nil { + return nil, fmt.Errorf("pragma_index_info(%s): %w", ix.Name, err) + } + if len(cols) == 1 && cols[0].Name == "trace_id" { + out = append(out, ix.Name) + } + } + return out, nil + + case "postgres", "postgresql": + // pg_index.indkey is an int2vector of attnums; join against + // pg_attribute to resolve column names. Filter to UNIQUE, non-primary + // indexes on the traces table covering exactly one column = trace_id. + var rows []indexNameRow + const q = ` +SELECT c.relname AS name +FROM pg_index i +JOIN pg_class c ON c.oid = i.indexrelid +JOIN pg_class t ON t.oid = i.indrelid +JOIN pg_namespace n ON n.oid = t.relnamespace +WHERE t.relname = 'traces' + AND n.nspname = ANY (current_schemas(false)) + AND i.indisunique + AND NOT i.indisprimary + AND i.indnatts = 1 + AND ( + SELECT attname FROM pg_attribute + WHERE attrelid = t.oid AND attnum = i.indkey[0] + ) = 'trace_id'` + if err := db.Raw(q).Scan(&rows).Error; err != nil { + return nil, fmt.Errorf("pg_index lookup: %w", err) + } + return flattenIndexNames(rows), nil + + case "mysql": + // information_schema.STATISTICS has one row per (index, seq_in_index). + // Group by index, count columns, and keep indexes where the sole + // column is trace_id and NON_UNIQUE=0. + var rows []indexNameRow + const q = ` +SELECT INDEX_NAME AS name +FROM information_schema.STATISTICS +WHERE TABLE_SCHEMA = DATABASE() + AND TABLE_NAME = 'traces' + AND NON_UNIQUE = 0 + AND INDEX_NAME <> 'PRIMARY' +GROUP BY INDEX_NAME +HAVING COUNT(*) = 1 + AND MAX(COLUMN_NAME) = 'trace_id'` + if err := db.Raw(q).Scan(&rows).Error; err != nil { + return nil, fmt.Errorf("information_schema.STATISTICS lookup: %w", err) + } + return flattenIndexNames(rows), nil + + case "sqlserver", "mssql": + // sys.indexes + sys.index_columns: one row per indexed column; keep + // unique, non-primary-key indexes on dbo.traces whose only column is + // trace_id. + var rows []indexNameRow + const q = ` +SELECT i.name AS name +FROM sys.indexes i +JOIN sys.objects t ON t.object_id = i.object_id +WHERE t.name = 'traces' + AND i.is_unique = 1 + AND i.is_primary_key = 0 + AND ( + SELECT COUNT(*) FROM sys.index_columns ic + WHERE ic.object_id = i.object_id AND ic.index_id = i.index_id + ) = 1 + AND EXISTS ( + SELECT 1 FROM sys.index_columns ic + JOIN sys.columns c ON c.object_id = ic.object_id AND c.column_id = ic.column_id + WHERE ic.object_id = i.object_id AND ic.index_id = i.index_id AND c.name = 'trace_id' + )` + if err := db.Raw(q).Scan(&rows).Error; err != nil { + return nil, fmt.Errorf("sys.indexes lookup: %w", err) + } + return flattenIndexNames(rows), nil + } + return nil, nil +} + +// indexNameRow is a single-column scan target shared by the non-SQLite branches. +// Keeping it package-level (rather than redeclared per-branch) lets +// flattenIndexNames project through a single concrete type. +type indexNameRow struct { + Name string `gorm:"column:name"` +} + +func flattenIndexNames(rows []indexNameRow) []string { + out := make([]string, 0, len(rows)) + for _, r := range rows { + out = append(out, r.Name) + } + return out +} + +// dropIndexOnTraces removes an index by name, per-driver. GORM's Migrator +// handles SQLite/Postgres/MySQL cleanly; for SQL Server we must qualify the +// DROP with the table name. +func dropIndexOnTraces(db *gorm.DB, driver, name string) error { + switch driver { + case "sqlserver", "mssql": + // T-SQL: DROP INDEX name ON table + return db.Exec(fmt.Sprintf("DROP INDEX [%s] ON [traces]", name)).Error + default: + return db.Migrator().DropIndex(&Trace{}, name) + } +} diff --git a/internal/storage/models.go b/internal/storage/models.go index 8585c61..b00c22e 100644 --- a/internal/storage/models.go +++ b/internal/storage/models.go @@ -84,11 +84,14 @@ const DefaultTenantID = "default" // Index strategy: single-column tenant_id is redundant — every tenant-scoped // query joins tenant_id with another filter (timestamp, service_name). The // leftmost column of a composite index satisfies single-column tenant lookups, -// so we only declare composites. +// so we only declare composites. TraceID uniqueness is scoped to (tenant_id, +// trace_id): distinct tenants may legitimately ingest identical trace_ids +// (RAN-21). The old standalone `uniqueIndex` on trace_id is dropped at +// migration time by dropLegacyTraceIDUniqueIndex. type Trace struct { ID uint `gorm:"primaryKey" json:"id"` - TenantID string `gorm:"size:64;default:'default';not null;index:idx_traces_tenant_ts,priority:1;index:idx_traces_tenant_service,priority:1" json:"tenant_id"` - TraceID string `gorm:"uniqueIndex;size:32;not null" json:"trace_id"` + TenantID string `gorm:"size:64;default:'default';not null;index:idx_traces_tenant_ts,priority:1;index:idx_traces_tenant_service,priority:1;uniqueIndex:idx_traces_tenant_trace_id,priority:1" json:"tenant_id"` + TraceID string `gorm:"size:32;not null;uniqueIndex:idx_traces_tenant_trace_id,priority:2" json:"trace_id"` ServiceName string `gorm:"size:255;index:idx_traces_tenant_service,priority:2" json:"service_name"` Duration int64 `gorm:"index" json:"duration"` // Microseconds DurationMs float64 `gorm:"-" json:"duration_ms"` diff --git a/internal/storage/trace_repo.go b/internal/storage/trace_repo.go index 65a541c..05eaf61 100644 --- a/internal/storage/trace_repo.go +++ b/internal/storage/trace_repo.go @@ -56,6 +56,9 @@ func (r *Repository) BatchCreateSpans(spans []Span) error { } // BatchCreateTraces inserts traces, skipping duplicates. +// Duplicate is defined per the composite uniqueIndex idx_traces_tenant_trace_id +// on (tenant_id, trace_id): a trace_id clash within the same tenant is ignored, +// while the same trace_id under a different tenant inserts cleanly. func (r *Repository) BatchCreateTraces(traces []Trace) error { if len(traces) == 0 { return nil @@ -67,6 +70,8 @@ func (r *Repository) BatchCreateTraces(traces []Trace) error { } // CreateTrace inserts a new trace, skipping if it already exists. +// Uniqueness is per idx_traces_tenant_trace_id (tenant_id, trace_id), so the +// same trace_id across tenants is allowed. func (r *Repository) CreateTrace(trace Trace) error { if strings.ToLower(r.driver) == "mysql" { return r.db.Clauses(clause.Insert{Modifier: "IGNORE"}).Create(&trace).Error @@ -75,8 +80,9 @@ func (r *Repository) CreateTrace(trace Trace) error { } // GetTrace returns a trace by ID with its spans and logs, scoped to the tenant on ctx. -// The Preloaded Spans and Logs are additionally filtered so a trace ID collision -// across tenants cannot leak cross-tenant children. +// Trace uniqueness is composite (tenant_id, trace_id), so the same trace_id can +// legitimately exist in multiple tenants; the Preloaded Spans and Logs are +// filtered by tenant_id as defense-in-depth against cross-tenant child leakage. func (r *Repository) GetTrace(ctx context.Context, traceID string) (*Trace, error) { tenant := TenantFromContext(ctx) var trace Trace diff --git a/internal/storage/trace_tenant_test.go b/internal/storage/trace_tenant_test.go new file mode 100644 index 0000000..103603a --- /dev/null +++ b/internal/storage/trace_tenant_test.go @@ -0,0 +1,232 @@ +package storage + +import ( + "sort" + "testing" + "time" +) + +// TestCreateTrace_SameTraceIDAcrossTenants_Succeeds proves that identical +// trace_ids under distinct tenants coexist after RAN-21. Before the fix the +// second insert silently collapsed into a no-op via OnConflict.DoNothing +// because the old standalone uniqueIndex on trace_id matched regardless of +// tenant. +func TestCreateTrace_SameTraceIDAcrossTenants_Succeeds(t *testing.T) { + repo := newTestRepo(t) + now := time.Now().UTC() + traceID := "shared-trace-id-0001" + + acme := Trace{TenantID: "acme", TraceID: traceID, ServiceName: "svc-a", Status: "OK", Timestamp: now} + beta := Trace{TenantID: "beta", TraceID: traceID, ServiceName: "svc-b", Status: "OK", Timestamp: now} + + if err := repo.CreateTrace(acme); err != nil { + t.Fatalf("CreateTrace(acme): %v", err) + } + if err := repo.CreateTrace(beta); err != nil { + t.Fatalf("CreateTrace(beta): %v", err) + } + + var rows []Trace + if err := repo.db.Where("trace_id = ?", traceID).Find(&rows).Error; err != nil { + t.Fatalf("Find: %v", err) + } + if got, want := len(rows), 2; got != want { + t.Fatalf("expected %d trace rows for shared trace_id, got %d", want, got) + } + seenTenants := map[string]string{} + for _, r := range rows { + seenTenants[r.TenantID] = r.ServiceName + } + if seenTenants["acme"] != "svc-a" || seenTenants["beta"] != "svc-b" { + t.Fatalf("tenant→service mismatch: %+v", seenTenants) + } + + acmeCtx := WithTenantContext(t.Context(), "acme") + betaCtx := WithTenantContext(t.Context(), "beta") + got, err := repo.GetTrace(acmeCtx, traceID) + if err != nil { + t.Fatalf("GetTrace(acme): %v", err) + } + if got.ServiceName != "svc-a" || got.TenantID != "acme" { + t.Fatalf("GetTrace(acme) returned wrong row: %+v", got) + } + got, err = repo.GetTrace(betaCtx, traceID) + if err != nil { + t.Fatalf("GetTrace(beta): %v", err) + } + if got.ServiceName != "svc-b" || got.TenantID != "beta" { + t.Fatalf("GetTrace(beta) returned wrong row: %+v", got) + } +} + +// TestCreateTrace_SameTraceIDSameTenant_IsIgnored proves the intra-tenant +// dedupe behaviour survives the composite-uniqueness change. Ingestion retries +// under the same tenant must still be absorbed by OnConflict.DoNothing. +func TestCreateTrace_SameTraceIDSameTenant_IsIgnored(t *testing.T) { + repo := newTestRepo(t) + now := time.Now().UTC() + traceID := "same-tenant-trace-0001" + + first := Trace{TenantID: "acme", TraceID: traceID, ServiceName: "svc-a", Status: "OK", Timestamp: now} + dup := Trace{TenantID: "acme", TraceID: traceID, ServiceName: "svc-a-renamed", Status: "OK", Timestamp: now.Add(time.Second)} + + if err := repo.CreateTrace(first); err != nil { + t.Fatalf("CreateTrace first: %v", err) + } + if err := repo.CreateTrace(dup); err != nil { + t.Fatalf("CreateTrace dup: %v", err) + } + + var rows []Trace + if err := repo.db.Where("tenant_id = ? AND trace_id = ?", "acme", traceID).Find(&rows).Error; err != nil { + t.Fatalf("Find: %v", err) + } + if got, want := len(rows), 1; got != want { + t.Fatalf("expected intra-tenant dedupe to keep %d row, got %d", want, got) + } + if rows[0].ServiceName != "svc-a" { + t.Fatalf("expected first-write-wins semantics, got %q", rows[0].ServiceName) + } +} + +// TestBatchCreateTraces_SameTraceIDAcrossTenants_Succeeds exercises the batch +// path, which uses the same OnConflict.DoNothing / INSERT IGNORE plumbing. +func TestBatchCreateTraces_SameTraceIDAcrossTenants_Succeeds(t *testing.T) { + repo := newTestRepo(t) + now := time.Now().UTC() + traceID := "batch-shared-0001" + + batch := []Trace{ + {TenantID: "acme", TraceID: traceID, ServiceName: "svc-a", Status: "OK", Timestamp: now}, + {TenantID: "beta", TraceID: traceID, ServiceName: "svc-b", Status: "OK", Timestamp: now}, + } + if err := repo.BatchCreateTraces(batch); err != nil { + t.Fatalf("BatchCreateTraces: %v", err) + } + + var rows []Trace + if err := repo.db.Where("trace_id = ?", traceID).Order("tenant_id ASC").Find(&rows).Error; err != nil { + t.Fatalf("Find: %v", err) + } + if got, want := len(rows), 2; got != want { + t.Fatalf("expected %d rows, got %d", want, got) + } + if rows[0].TenantID != "acme" || rows[1].TenantID != "beta" { + t.Fatalf("unexpected tenant ordering: %v", []string{rows[0].TenantID, rows[1].TenantID}) + } +} + +// TestGetTrace_IsolatesChildrenAcrossTenants confirms that when the same +// trace_id exists in two tenants, GetTrace returns each tenant's own spans +// and logs — no cross-tenant children leak through the Preload filter. +func TestGetTrace_IsolatesChildrenAcrossTenants(t *testing.T) { + repo := newTestRepo(t) + now := time.Now().UTC() + traceID := "preload-shared-0001" + + // Both tenants share trace_id but have distinct child payloads. + if err := repo.db.Create(&Trace{TenantID: "acme", TraceID: traceID, ServiceName: "svc-a", Status: "OK", Timestamp: now}).Error; err != nil { + t.Fatalf("Create acme trace: %v", err) + } + if err := repo.db.Create(&Trace{TenantID: "beta", TraceID: traceID, ServiceName: "svc-b", Status: "OK", Timestamp: now}).Error; err != nil { + t.Fatalf("Create beta trace: %v", err) + } + if err := repo.db.Create(&Span{TenantID: "acme", TraceID: traceID, SpanID: "acme-span", OperationName: "op-a", ServiceName: "svc-a", StartTime: now, EndTime: now}).Error; err != nil { + t.Fatalf("Create acme span: %v", err) + } + if err := repo.db.Create(&Span{TenantID: "beta", TraceID: traceID, SpanID: "beta-span", OperationName: "op-b", ServiceName: "svc-b", StartTime: now, EndTime: now}).Error; err != nil { + t.Fatalf("Create beta span: %v", err) + } + if err := repo.db.Create(&Log{TenantID: "acme", TraceID: traceID, SpanID: "acme-span", Severity: "INFO", Body: "acme log", ServiceName: "svc-a", Timestamp: now}).Error; err != nil { + t.Fatalf("Create acme log: %v", err) + } + if err := repo.db.Create(&Log{TenantID: "beta", TraceID: traceID, SpanID: "beta-span", Severity: "INFO", Body: "beta log", ServiceName: "svc-b", Timestamp: now}).Error; err != nil { + t.Fatalf("Create beta log: %v", err) + } + + cases := []struct { + tenant string + wantSvc string + wantOp string + wantLog string + }{ + {"acme", "svc-a", "op-a", "acme log"}, + {"beta", "svc-b", "op-b", "beta log"}, + } + for _, tc := range cases { + ctx := WithTenantContext(t.Context(), tc.tenant) + tr, err := repo.GetTrace(ctx, traceID) + if err != nil { + t.Fatalf("GetTrace(%s): %v", tc.tenant, err) + } + if tr.ServiceName != tc.wantSvc { + t.Fatalf("%s: wrong trace row: svc=%q", tc.tenant, tr.ServiceName) + } + if len(tr.Spans) != 1 || tr.Spans[0].OperationName != tc.wantOp { + names := []string{} + for _, s := range tr.Spans { + names = append(names, s.OperationName) + } + sort.Strings(names) + t.Fatalf("%s: span leak — got ops %v, want [%s]", tc.tenant, names, tc.wantOp) + } + if len(tr.Logs) != 1 || tr.Logs[0].Body != tc.wantLog { + bodies := []string{} + for _, l := range tr.Logs { + bodies = append(bodies, l.Body) + } + sort.Strings(bodies) + t.Fatalf("%s: log leak — got bodies %v, want [%s]", tc.tenant, bodies, tc.wantLog) + } + } +} + +// TestAutoMigrateModels_DropsLegacyTraceIDUniqueIndex proves the idempotent +// migration hook retires a pre-RAN-21 standalone unique index on +// traces.trace_id. Simulates an upgrade by creating the legacy index +// out-of-band and rerunning AutoMigrateModels. +func TestAutoMigrateModels_DropsLegacyTraceIDUniqueIndex(t *testing.T) { + repo := newTestRepo(t) + + // Install a legacy-shaped standalone unique index on traces.trace_id. + const legacyIdx = "idx_legacy_traces_trace_id" + if err := repo.db.Exec("CREATE UNIQUE INDEX " + legacyIdx + " ON traces(trace_id)").Error; err != nil { + t.Fatalf("create legacy index: %v", err) + } + if !repo.db.Migrator().HasIndex(&Trace{}, legacyIdx) { + t.Fatal("precondition: legacy index should exist") + } + + // Rerun migration — the hook must drop the legacy index. + if err := AutoMigrateModels(repo.db, "sqlite"); err != nil { + t.Fatalf("AutoMigrateModels: %v", err) + } + if repo.db.Migrator().HasIndex(&Trace{}, legacyIdx) { + t.Fatal("legacy standalone unique index on traces.trace_id was not dropped") + } + + // Cross-tenant reuse must now succeed end-to-end. + now := time.Now().UTC() + if err := repo.CreateTrace(Trace{TenantID: "acme", TraceID: "after-drop", ServiceName: "svc", Timestamp: now}); err != nil { + t.Fatalf("CreateTrace acme: %v", err) + } + if err := repo.CreateTrace(Trace{TenantID: "beta", TraceID: "after-drop", ServiceName: "svc", Timestamp: now}); err != nil { + t.Fatalf("CreateTrace beta: %v", err) + } + var n int64 + if err := repo.db.Model(&Trace{}).Where("trace_id = ?", "after-drop").Count(&n).Error; err != nil { + t.Fatalf("count: %v", err) + } + if n != 2 { + t.Fatalf("expected 2 rows across tenants, got %d", n) + } +} + +// TestAutoMigrateModels_CreatesCompositeTraceIDUniqueIndex asserts the new +// composite index landed with the expected name after a fresh migration. +func TestAutoMigrateModels_CreatesCompositeTraceIDUniqueIndex(t *testing.T) { + repo := newTestRepo(t) + if !repo.db.Migrator().HasIndex(&Trace{}, "idx_traces_tenant_trace_id") { + t.Fatal("expected composite uniqueIndex idx_traces_tenant_trace_id on traces") + } +} From 3b1cf19659c5463f05b9a92e072e71067f611796 Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Sat, 25 Apr 2026 14:24:50 +0000 Subject: [PATCH 4/5] fix(mcp,vectordb): RAN-39 reviewer follow-ups + RAN-20 vector tenant isolation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reviewer (cf5145d8) requested three changes on commit c839460. Addresses each with verified failure-on-regression checks. #1 — Cross-tenant boot hydration of vector index Previously main.go hydrated the index via repo.GetLogsV2(appCtx, ...) which is tenant-scoped to whatever tenant ctx carries — appCtx has none, so only the default tenant's rows reloaded after restart and find_similar_logs was cold for every other tenant until fresh ERROR logs landed. The new tenant-aware vectorIdx.Add(..., l.TenantID, ...) didn't fix it because the non-default rows were never fetched. - internal/storage/log_repo.go: new ListRecentHighSeverityLogsAllTenants — explicitly cross-tenant administrative read used only by hydration. Each row carries its own TenantID; fan-out happens in the caller. - main.go: hydration now uses the new method so every tenant's warm index survives a restart. - internal/vectordb/index.go: tenant-aware FIFO eviction. At cap, drop up to maxSize/10 of the inserting tenant's oldest rows so a noisy tenant cannot evict another tenant's warm rows (availability isolation; confidentiality is still enforced by doc.Tenant filtering in Search). Brand-new tenants drop one globally-oldest row to claim a first slot. #2 — root_cause_analysis assertion was vacuous internal/mcp/tenant_isolation_test.go:434 passed an empty ownMarker to assertNoLeak, so the merge gate would still pass if the tool regressed to returning [] for every tenant. Now passes ownService (RankedCause carries Service so it must appear in a non-empty response). Verified by sabotaging the handler to return a nil slice — the assertion fails with 'expected own marker "acme-orders" in response, body=null' as designed, then reverted. #3 — Drain cluster-id test didn't actually compare cluster IDs The previous test reused seedTenant (per-tenant service names) and only scanned response text, so a regression that surfaced the same cluster row across tenants would still pass. Rewritten to: - Use one shared service name across both tenants so Drain produces colliding (service, templateID) keys — the SignalStore partition is the only thing keeping rows separate. - Inspect the actual []graphrag.LogClusterNode returned by CorrelatedSignals (not just response text), checking Template + SampleLog content for own-marker presence and foreign-marker absence. - Log the per-tenant cluster IDs so future refactors that change the ID scheme leave a visible audit trail. - End-to-end probe via the MCP HTTP surface remains, asserting the same isolation reaches clients. RAN-20 supporting tests internal/vectordb/index_test.go, internal/api/similar_handler_test.go, internal/mcp/tools_ran20_test.go cover vectordb tenant scoping at three layers (in-memory, REST handler, MCP tool). They were sitting untracked on the branch from the parallel RAN-20 work; bundling them so the follow-up vectordb behavior added here is covered. Verified: - go vet ./... clean - go test ./... clean (full repo) - go test -race ./internal/{mcp,graphrag,vectordb,api}/... clean Co-Authored-By: Paperclip --- internal/api/similar_handler_test.go | 117 ++++++++++++++++++++++ internal/mcp/tenant_isolation_test.go | 133 ++++++++++++++++++------- internal/mcp/tools_ran20_test.go | 79 +++++++++++++++ internal/storage/log_repo.go | 27 +++++ internal/vectordb/index.go | 33 ++++++- internal/vectordb/index_test.go | 136 ++++++++++++++++++++++++++ main.go | 12 +-- 7 files changed, 492 insertions(+), 45 deletions(-) create mode 100644 internal/api/similar_handler_test.go create mode 100644 internal/mcp/tools_ran20_test.go create mode 100644 internal/vectordb/index_test.go diff --git a/internal/api/similar_handler_test.go b/internal/api/similar_handler_test.go new file mode 100644 index 0000000..69af324 --- /dev/null +++ b/internal/api/similar_handler_test.go @@ -0,0 +1,117 @@ +package api + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/RandomCodeSpace/otelcontext/internal/config" + "github.com/RandomCodeSpace/otelcontext/internal/vectordb" +) + +// TestSimilarHandler_TenantIsolation is the RAN-20 acceptance bar for the HTTP +// surface. Two tenants with distinct corpora query /api/logs/similar; each +// sees ZERO rows belonging to the other tenant. +func TestSimilarHandler_TenantIsolation(t *testing.T) { + idx := vectordb.New(1_000) + idx.Add(101, "acme", "checkout", "ERROR", "payment gateway timeout charging customer") + idx.Add(102, "acme", "checkout", "ERROR", "payment gateway refused charge insufficient funds") + idx.Add(201, "globex", "auth", "ERROR", "payment gateway token expired for session") + idx.Add(202, "globex", "auth", "ERROR", "payment gateway 500 internal error while authenticating") + + srv := &Server{vectorIdx: idx} + mux := http.NewServeMux() + mux.HandleFunc("GET /api/logs/similar", srv.handleGetSimilarLogs) + handler := TenantMiddleware(&config.Config{DefaultTenant: "default"})(mux) + + acmeIDs := map[float64]bool{101: true, 102: true} + globexIDs := map[float64]bool{201: true, 202: true} + + q := url.Values{} + q.Set("q", "payment gateway") + q.Set("limit", "50") + path := "/api/logs/similar?" + q.Encode() + + // Tenant A + aRec := httptest.NewRecorder() + aReq := httptest.NewRequest(http.MethodGet, path, nil) + aReq.Header.Set(TenantHeader, "acme") + handler.ServeHTTP(aRec, aReq) + if aRec.Code != http.StatusOK { + t.Fatalf("acme: want 200, got %d body=%q", aRec.Code, aRec.Body.String()) + } + acme := decodeResults(t, aRec) + if len(acme) == 0 { + t.Fatalf("acme got zero hits despite matching corpus") + } + for _, r := range acme { + if !acmeIDs[r.ID] { + t.Fatalf("acme leaked cross-tenant id=%v tenant=%q body=%q", r.ID, r.Tenant, r.Body) + } + } + + // Tenant B + gRec := httptest.NewRecorder() + gReq := httptest.NewRequest(http.MethodGet, path, nil) + gReq.Header.Set(TenantHeader, "globex") + handler.ServeHTTP(gRec, gReq) + if gRec.Code != http.StatusOK { + t.Fatalf("globex: want 200, got %d", gRec.Code) + } + globex := decodeResults(t, gRec) + if len(globex) == 0 { + t.Fatalf("globex got zero hits despite matching corpus") + } + for _, r := range globex { + if !globexIDs[r.ID] { + t.Fatalf("globex leaked cross-tenant id=%v tenant=%q body=%q", r.ID, r.Tenant, r.Body) + } + } +} + +// TestSimilarHandler_UnknownTenantReturnsEmpty confirms a request bearing an +// unknown tenant header returns zero results — the handler must not silently +// fall back to another tenant's rows. +func TestSimilarHandler_UnknownTenantReturnsEmpty(t *testing.T) { + idx := vectordb.New(100) + idx.Add(1, "acme", "svc", "ERROR", "database connection refused upstream") + + srv := &Server{vectorIdx: idx} + mux := http.NewServeMux() + mux.HandleFunc("GET /api/logs/similar", srv.handleGetSimilarLogs) + handler := TenantMiddleware(&config.Config{DefaultTenant: "default"})(mux) + + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/api/logs/similar?q=database+connection", nil) + req.Header.Set(TenantHeader, "initech") + handler.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("want 200, got %d", rec.Code) + } + if r := decodeResults(t, rec); len(r) != 0 { + t.Fatalf("unknown tenant saw %d cross-tenant hits", len(r)) + } +} + +type similarResult struct { + ID float64 `json:"LogID"` + Tenant string `json:"Tenant"` + ServiceName string `json:"ServiceName"` + Severity string `json:"Severity"` + Body string `json:"Body"` + Score float64 `json:"Score"` +} + +func decodeResults(t *testing.T, rec *httptest.ResponseRecorder) []similarResult { + t.Helper() + var env struct { + Results []similarResult `json:"results"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &env); err != nil { + t.Fatalf("decode response: %v (body=%q)", err, rec.Body.String()) + } + return env.Results +} diff --git a/internal/mcp/tenant_isolation_test.go b/internal/mcp/tenant_isolation_test.go index 6ff8535..653ab90 100644 --- a/internal/mcp/tenant_isolation_test.go +++ b/internal/mcp/tenant_isolation_test.go @@ -431,7 +431,11 @@ func TestMCP_TenantIsolation_AllGraphRAGTools(t *testing.T) { "service": ownService, "time_range": "1h", }) - assertNoLeak(t, "root_cause_analysis", body, "", leakMarkers) + // RankedCause carries Service + Operation, so the caller's + // own service name MUST appear; an empty result here would + // silently regress the tool to a vacuous "[]" response that + // trivially "passes" leak checks (review feedback fix). + assertNoLeak(t, "root_cause_analysis", body, ownService, leakMarkers) }) t.Run(caller.name+"/correlated_signals", func(t *testing.T) { @@ -506,57 +510,118 @@ func TestMCP_TenantIsolation_AllGraphRAGTools(t *testing.T) { } // TestMCP_TenantIsolation_DrainClusterIDsStayPerTenant proves that two -// tenants writing identical log bodies do not collide on the same Drain -// cluster id surfaced by CorrelatedSignals. Drain itself is currently a -// shared miner, but the LogClusterNodes are stored on per-tenant -// SignalStores so the cluster id surfaces tenant-side and a tenant cannot -// observe another tenant's cluster row. +// tenants writing identical log bodies under an *identical* service name +// do not share a single shared LogClusterNode. Drain itself is currently +// a shared miner — without per-tenant SignalStore partitioning the same +// (template, service) pair would collapse to one cluster row visible to +// both tenants. The test inspects the actual LogClusterNodes returned by +// CorrelatedSignals (not just the response text) and asserts each tenant +// only ever sees rows tagged with its own marker. func TestMCP_TenantIsolation_DrainClusterIDsStayPerTenant(t *testing.T) { - ts, g, repo, vIdx := setupTenantIsolationServer(t) + ts, g, _, _ := setupTenantIsolationServer(t) now := time.Now().Add(-time.Minute) - // Identical log body for both tenants — collision-by-design. + // Identical service AND identical log template across tenants — Drain + // is a shared miner so the (service, templateID) cluster key would + // collide if SignalStore weren't tenant-partitioned. The body marker + // is the only per-tenant differentiator. + const sharedService = "shared-orders" + const sharedTrace = "trace-shared" + const sharedSpan = "span-shared" + for _, tenant := range []string{"acme", "beta"} { - seedTenant(t, g, repo, vIdx, tenant, now) + g.OnSpanIngested(storage.Span{ + TenantID: tenant, + TraceID: sharedTrace, + SpanID: sharedSpan, + ServiceName: sharedService, + OperationName: "/checkout", + Status: "STATUS_CODE_ERROR", + StartTime: now, + EndTime: now.Add(time.Millisecond), + Duration: 1000, + }) + g.OnLogIngested(storage.Log{ + TenantID: tenant, + TraceID: sharedTrace, + SpanID: sharedSpan, + ServiceName: sharedService, + Severity: "ERROR", + Body: tenant + "-marker upstream connection refused", + Timestamp: now.Add(time.Millisecond), + }) } - waitForServiceMaps(t, g, []string{"acme", "beta"}) + ctxA := storage.WithTenantContext(context.Background(), "acme") + ctxB := storage.WithTenantContext(context.Background(), "beta") + + // Wait for both tenants' SignalStores to surface the cluster row. + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + a := g.CorrelatedSignals(ctxA, sharedService, now.Add(-time.Hour)) + b := g.CorrelatedSignals(ctxB, sharedService, now.Add(-time.Hour)) + if a != nil && b != nil && len(a.ErrorLogs) >= 1 && len(b.ErrorLogs) >= 1 { + break + } + time.Sleep(20 * time.Millisecond) + } + + sigA := g.CorrelatedSignals(ctxA, sharedService, now.Add(-time.Hour)) + sigB := g.CorrelatedSignals(ctxB, sharedService, now.Add(-time.Hour)) + if sigA == nil || len(sigA.ErrorLogs) == 0 { + t.Fatalf("acme CorrelatedSignals returned no ErrorLogs — Drain/SignalStore did not see the seeded log") + } + if sigB == nil || len(sigB.ErrorLogs) == 0 { + t.Fatalf("beta CorrelatedSignals returned no ErrorLogs — Drain/SignalStore did not see the seeded log") + } + + // Per-tenant cluster row content must carry only that tenant's marker. + // We probe both Template and SampleLog because Drain stores the + // templated form on Template and the original body on SampleLog, and + // both should be uncontaminated. + checkClusters := func(name string, clusters []graphrag.LogClusterNode, ownMarker, foreignMarker string) []string { + t.Helper() + var ids []string + for _, lc := range clusters { + ids = append(ids, lc.ID) + joined := lc.Template + "\n" + lc.SampleLog + if !strings.Contains(joined, ownMarker) { + t.Errorf("[%s] cluster %q missing own marker %q (template=%q sample=%q)", name, lc.ID, ownMarker, lc.Template, lc.SampleLog) + } + if strings.Contains(joined, foreignMarker) { + t.Errorf("[%s] cluster %q LEAKED foreign marker %q (template=%q sample=%q)", name, lc.ID, foreignMarker, lc.Template, lc.SampleLog) + } + } + return ids + } + idsA := checkClusters("acme", sigA.ErrorLogs, "acme-marker", "beta-marker") + idsB := checkClusters("beta", sigB.ErrorLogs, "beta-marker", "acme-marker") + + // The cluster IDs themselves can be identical across tenants (Drain ID + // is service-scoped, not tenant-scoped) — that is precisely WHY the + // SignalStore partition matters: without it, the same key would point + // at one shared row. Surface this fact in the test record so a future + // refactor that makes IDs tenant-stamped doesn't accidentally weaken + // the assertion above. + t.Logf("drain cluster IDs: acme=%v beta=%v", idsA, idsB) + + // End-to-end probe: the same isolation must hold via the MCP HTTP + // surface, not just the in-process API. for _, scoped := range []string{"acme", "beta"} { _, body := callTool(t, ts, scoped, "correlated_signals", map[string]any{ - "service": scoped + "-orders", + "service": sharedService, "time_range": "1h", }) - // Caller's marker must appear, the other tenant's must not. other := "beta" if scoped == "beta" { other = "acme" } if !strings.Contains(body, scoped+"-marker") { - t.Errorf("%s correlated_signals missing own marker, body=%s", scoped, truncate(body)) + t.Errorf("%s correlated_signals (HTTP) missing own marker, body=%s", scoped, truncate(body)) } if strings.Contains(body, other+"-marker") { - t.Errorf("%s correlated_signals leaked %s marker, body=%s", scoped, other, truncate(body)) + t.Errorf("%s correlated_signals (HTTP) leaked %s marker, body=%s", scoped, other, truncate(body)) } } - - // Sanity: prove the test setup actually shares state between tenants - // at the storage layer (so the isolation we're asserting above is - // non-trivial). Same trace_id should land in two distinct rows because - // Span.TenantID is part of the unique identity for these inserts. - // We don't persist spans here directly (we go through OnSpanIngested - // which is in-memory only), so we just assert the in-memory invariant. - ctxA := storage.WithTenantContext(context.Background(), "acme") - ctxB := storage.WithTenantContext(context.Background(), "beta") - mapA := g.ServiceMap(ctxA, 0) - mapB := g.ServiceMap(ctxB, 0) - if got, want := len(mapA), 1; got != want { - t.Fatalf("acme ServiceMap len=%d want=%d (%+v)", got, want, mapA) - } - if got, want := len(mapB), 1; got != want { - t.Fatalf("beta ServiceMap len=%d want=%d (%+v)", got, want, mapB) - } - if mapA[0].Service.Name == mapB[0].Service.Name { - t.Fatalf("ServiceMap shows same service name for both tenants — partition broken: %v vs %v", mapA[0].Service, mapB[0].Service) - } } diff --git a/internal/mcp/tools_ran20_test.go b/internal/mcp/tools_ran20_test.go new file mode 100644 index 0000000..7477ae5 --- /dev/null +++ b/internal/mcp/tools_ran20_test.go @@ -0,0 +1,79 @@ +package mcp + +import ( + "context" + "strings" + "testing" + + "github.com/RandomCodeSpace/otelcontext/internal/storage" + "github.com/RandomCodeSpace/otelcontext/internal/vectordb" +) + +// TestFindSimilarLogs_TenantIsolation is the RAN-20 acceptance bar for the MCP +// surface. Two tenants with unique marker strings in their log bodies query +// find_similar_logs; each tenant's response must never contain the other's +// markers. +func TestFindSimilarLogs_TenantIsolation(t *testing.T) { + idx := vectordb.New(1_000) + idx.Add(101, "acme", "checkout", "ERROR", "payment gateway timeout acme-secret-charge-id-abc") + idx.Add(102, "acme", "checkout", "ERROR", "payment gateway refused acme-only-marker-xyz") + idx.Add(201, "globex", "auth", "ERROR", "payment gateway token expired globex-secret-session-123") + idx.Add(202, "globex", "auth", "ERROR", "payment gateway 500 internal globex-only-marker-qqq") + + srv := &Server{vectorIdx: idx, defaultTenant: storage.DefaultTenantID} + args := map[string]any{"query": "payment gateway", "limit": float64(50)} + + // Acme + acmeRes := srv.toolFindSimilarLogs(storage.WithTenantContext(context.Background(), "acme"), args) + if acmeRes.IsError { + t.Fatalf("acme call errored: %+v", acmeRes) + } + acmeBody := concatContent(acmeRes.Content) + for _, forbidden := range []string{"globex-secret-session-123", "globex-only-marker-qqq", `"LogID": 201`, `"LogID": 202`} { + if strings.Contains(acmeBody, forbidden) { + t.Fatalf("acme leaked globex content %q in body:\n%s", forbidden, acmeBody) + } + } + if !strings.Contains(acmeBody, "acme-secret-charge-id-abc") && !strings.Contains(acmeBody, "acme-only-marker-xyz") { + t.Fatalf("acme did not receive its own rows:\n%s", acmeBody) + } + + // Globex + gRes := srv.toolFindSimilarLogs(storage.WithTenantContext(context.Background(), "globex"), args) + if gRes.IsError { + t.Fatalf("globex call errored: %+v", gRes) + } + gBody := concatContent(gRes.Content) + for _, forbidden := range []string{"acme-secret-charge-id-abc", "acme-only-marker-xyz", `"LogID": 101`, `"LogID": 102`} { + if strings.Contains(gBody, forbidden) { + t.Fatalf("globex leaked acme content %q in body:\n%s", forbidden, gBody) + } + } +} + +// TestFindSimilarLogs_NoTenantFallsBackToDefault proves that a context with no +// tenant value is coerced to the server default — it must NOT bleed into +// another tenant's rows. +func TestFindSimilarLogs_NoTenantFallsBackToDefault(t *testing.T) { + idx := vectordb.New(100) + idx.Add(1, "acme", "svc", "ERROR", "acme secret body only") + + srv := &Server{vectorIdx: idx, defaultTenant: storage.DefaultTenantID} + args := map[string]any{"query": "secret body"} + + res := srv.toolFindSimilarLogs(context.Background(), args) + if res.IsError { + t.Fatalf("unexpected error: %+v", res) + } + if strings.Contains(concatContent(res.Content), "acme secret body only") { + t.Fatalf("no-tenant call leaked acme content:\n%s", concatContent(res.Content)) + } +} + +func concatContent(items []ContentItem) string { + var b strings.Builder + for _, c := range items { + b.WriteString(c.Text) + } + return b.String() +} diff --git a/internal/storage/log_repo.go b/internal/storage/log_repo.go index dfaae97..e57be6b 100644 --- a/internal/storage/log_repo.go +++ b/internal/storage/log_repo.go @@ -143,6 +143,33 @@ func (r *Repository) UpdateLogInsight(ctx context.Context, logID uint, insight s return nil } +// ListRecentHighSeverityLogsAllTenants returns recent logs of the given +// severity across EVERY tenant, each row carrying its own TenantID. This is an +// administrative read used exclusively by the vector index's startup +// hydration path, which fans rows out to per-tenant shards. It is not exposed +// on any tenant-scoped API surface — tenant isolation for read paths must +// otherwise be preserved via the context-driven WHERE clause. +func (r *Repository) ListRecentHighSeverityLogsAllTenants(ctx context.Context, severity string, since, until time.Time, limit int) ([]Log, error) { + if limit <= 0 { + limit = 5000 + } + q := r.db.WithContext(ctx).Model(&Log{}) + if severity != "" { + q = q.Where("severity = ?", severity) + } + if !since.IsZero() { + q = q.Where("timestamp >= ?", since) + } + if !until.IsZero() { + q = q.Where("timestamp <= ?", until) + } + var logs []Log + if err := q.Order("timestamp desc").Limit(limit).Find(&logs).Error; err != nil { + return nil, fmt.Errorf("failed to list recent logs all tenants: %w", err) + } + return logs, nil +} + // PurgeLogs deletes logs older than the given timestamp in a single statement. // Suitable for SQLite; for Postgres at large retention volumes prefer PurgeLogsBatched. func (r *Repository) PurgeLogs(olderThan time.Time) (int64, error) { diff --git a/internal/vectordb/index.go b/internal/vectordb/index.go index 8f325f7..99b804a 100644 --- a/internal/vectordb/index.go +++ b/internal/vectordb/index.go @@ -82,12 +82,35 @@ func (idx *Index) Add(logID uint, tenant, serviceName, severity, body string) { idx.mu.Lock() defer idx.mu.Unlock() - // FIFO eviction — copy to new slice to release old backing array memory + // Tenant-aware FIFO eviction. When at cap, remove up to maxSize/10 of the + // oldest entries belonging to the inserting tenant so a noisy tenant + // cannot push another tenant's warm rows out of the index (availability + // isolation — the confidentiality invariant is enforced separately by + // doc.Tenant filtering in Search). The new backing slice also releases + // the old array memory on the next GC cycle. if len(idx.docs) >= idx.maxSize { - keep := idx.docs[idx.maxSize/10:] - newDocs := make([]LogVector, len(keep), idx.maxSize) - copy(newDocs, keep) - idx.docs = newDocs + toDrop := idx.maxSize / 10 + if toDrop < 1 { + toDrop = 1 + } + kept := make([]LogVector, 0, idx.maxSize) + droppedSame := 0 + for _, d := range idx.docs { + if droppedSame < toDrop && d.Tenant == tenant { + droppedSame++ + continue + } + kept = append(kept, d) + } + // Edge case: the inserting tenant has no prior entries while the + // index is at cap with other tenants' rows. Drop one globally-oldest + // entry so the new tenant can take its first slot. This is the only + // path where a tenant's entry can be evicted by another tenant, and + // it costs at most one row per brand-new tenant. + if droppedSame == 0 && len(kept) > 0 { + kept = kept[1:] + } + idx.docs = kept idx.dirty = true } diff --git a/internal/vectordb/index_test.go b/internal/vectordb/index_test.go new file mode 100644 index 0000000..9b9186c --- /dev/null +++ b/internal/vectordb/index_test.go @@ -0,0 +1,136 @@ +package vectordb + +import ( + "strconv" + "sync" + "testing" +) + +// TestTenantIsolation_Search is the RAN-20 confidentiality bar: a query on +// tenant A never returns a document indexed under tenant B, even when the +// vocabularies collide on the query terms. +func TestTenantIsolation_Search(t *testing.T) { + idx := New(1_000) + + idx.Add(1, "acme", "checkout", "ERROR", "payment gateway timeout upstream") + idx.Add(2, "acme", "checkout", "ERROR", "payment gateway refused charge") + idx.Add(10, "globex", "auth", "ERROR", "payment gateway token expired") + idx.Add(11, "globex", "auth", "ERROR", "payment gateway 500 internal error") + + acmeHits := idx.Search("acme", "payment gateway timeout", 10) + if len(acmeHits) == 0 { + t.Fatalf("acme search returned zero hits despite matching docs") + } + for _, h := range acmeHits { + if h.Tenant != "acme" || h.LogID >= 10 { + t.Fatalf("acme search leaked id=%d tenant=%q body=%q", h.LogID, h.Tenant, h.Body) + } + } + + globexHits := idx.Search("globex", "payment gateway token", 10) + if len(globexHits) == 0 { + t.Fatalf("globex search returned zero hits despite matching docs") + } + for _, h := range globexHits { + if h.Tenant != "globex" || h.LogID < 10 { + t.Fatalf("globex search leaked id=%d tenant=%q body=%q", h.LogID, h.Tenant, h.Body) + } + } +} + +// TestUnknownTenantReturnsEmpty proves a tenant with no indexed docs returns +// nothing even when other tenants have matching content. +func TestUnknownTenantReturnsEmpty(t *testing.T) { + idx := New(100) + idx.Add(1, "acme", "svc", "ERROR", "database connection refused upstream") + + if got := idx.Search("initech", "database connection", 10); len(got) != 0 { + t.Fatalf("unknown tenant saw %d cross-tenant hits", len(got)) + } +} + +// TestEmptyTenantCoercedToDefault verifies Add and Search coerce an empty +// tenant to the platform default so untenanted callers stay isolated from +// real tenants. +func TestEmptyTenantCoercedToDefault(t *testing.T) { + idx := New(100) + idx.Add(1, "", "svc", "ERROR", "network unreachable upstream host") + + if hits := idx.Search("", "network unreachable", 10); len(hits) != 1 { + t.Fatalf("search with empty tenant: want 1 hit, got %d", len(hits)) + } + if hits := idx.Search(defaultTenantID, "network unreachable", 10); len(hits) != 1 { + t.Fatalf("search with default tenant id: want 1 hit, got %d", len(hits)) + } + if hits := idx.Search("acme", "network unreachable", 10); len(hits) != 0 { + t.Fatalf("acme saw %d cross-tenant hits for default-tenant doc", len(hits)) + } +} + +// TestFIFOEvictionFairness is TechLead's requested assertion: a tenant that +// writes near-cap volume cannot evict another tenant's documents from the +// shared index. Under a naive global-FIFO policy tenant B's flood would +// remove tenant A's older entries and A would silently "lose" its warm +// rows — a confidentiality-safe but availability-breaking failure mode. +func TestFIFOEvictionFairness(t *testing.T) { + const cap = 200 + idx := New(cap) + + // Tenant A writes a small set of distinctive markers. + for i := 0; i < 5; i++ { + idx.Add(uint(1+i), "acme", "checkout", "ERROR", "acme-canary-marker alpha beta gamma "+strconv.Itoa(i)) + } + + // Tenant B floods the index well past the cap — enough to trigger + // multiple eviction cycles. + for i := 0; i < cap*4; i++ { + idx.Add(uint(10_000+i), "globex", "svc", "ERROR", "globex chatter filling the index "+strconv.Itoa(i)) + } + + // Every one of acme's canary rows must still be findable. + hits := idx.Search("acme", "acme-canary-marker alpha beta gamma", 20) + if len(hits) < 5 { + t.Fatalf("eviction unfairness: acme canaries evicted by globex flood. want >=5 hits, got %d", len(hits)) + } + seen := map[uint]bool{} + for _, h := range hits { + if h.Tenant != "acme" { + t.Fatalf("cross-tenant leak during eviction test: id=%d tenant=%q", h.LogID, h.Tenant) + } + seen[h.LogID] = true + } + for id := uint(1); id <= 5; id++ { + if !seen[id] { + t.Fatalf("acme canary id=%d missing after globex flood", id) + } + } +} + +// TestConcurrentTenantAddSearch pins down race-detector cleanliness and +// cross-tenant isolation under concurrent readers/writers. +func TestConcurrentTenantAddSearch(t *testing.T) { + idx := New(5_000) + var wg sync.WaitGroup + + for _, tenant := range []string{"acme", "globex"} { + wg.Add(2) + go func(ten string) { + defer wg.Done() + for i := 0; i < 500; i++ { + idx.Add(uint(i), ten, "svc", "ERROR", ten+" error kafka partition "+strconv.Itoa(i)) + } + }(tenant) + go func(ten string) { + defer wg.Done() + for i := 0; i < 500; i++ { + for _, h := range idx.Search(ten, "kafka partition", 5) { + if h.Tenant != ten { + t.Errorf("tenant %s saw cross-tenant hit tenant=%q body=%q", ten, h.Tenant, h.Body) + return + } + } + } + }(tenant) + } + wg.Wait() +} diff --git a/main.go b/main.go index d3d3d2c..83f96cb 100644 --- a/main.go +++ b/main.go @@ -331,15 +331,15 @@ func main() { // Hydrate vector index from recent ERROR/WARN logs on startup (non-blocking). // Uses appCtx so SIGTERM during boot cancels the query before repo.Close(). + // Hydration is cross-tenant by design: each row lands tagged with its own + // TenantID via vectorIdx.Add so isolation is preserved at query time. The + // previous tenant-scoped GetLogsV2 call silently hydrated only the default + // tenant's rows — non-default tenants lost their warm index on every + // restart. bootWG.Add(1) go func() { defer bootWG.Done() - recentLogs, _, err := repo.GetLogsV2(appCtx, storage.LogFilter{ - Severity: "ERROR", - StartTime: time.Now().Add(-24 * time.Hour), - EndTime: time.Now(), - Limit: 5000, - }) + recentLogs, err := repo.ListRecentHighSeverityLogsAllTenants(appCtx, "ERROR", time.Now().Add(-24*time.Hour), time.Now(), 5000) if err == nil { for _, l := range recentLogs { vectorIdx.Add(l.ID, l.TenantID, l.ServiceName, l.Severity, l.Body) From 48d6ba8b549147808b8c513b347bd44126dba3f0 Mon Sep 17 00:00:00 2001 From: Amit Kumar Date: Sat, 25 Apr 2026 15:14:12 +0000 Subject: [PATCH 5/5] docs(api): clarify handleGetSimilarLogs tenant-scoping comment (RAN-20) Tightens the docstring to spell out that results are scoped to the tenant on r.Context() (set by TenantMiddleware from X-Tenant-ID) and cross-tenant rows are never returned. Behavior unchanged. Co-Authored-By: Paperclip --- internal/api/similar_handler.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/api/similar_handler.go b/internal/api/similar_handler.go index ac0fe57..8cd369c 100644 --- a/internal/api/similar_handler.go +++ b/internal/api/similar_handler.go @@ -9,7 +9,9 @@ import ( ) // handleGetSimilarLogs handles GET /api/logs/similar?q=&limit=10 -// Returns logs semantically similar to the query string using TF-IDF cosine similarity. +// Returns logs semantically similar to the query string using TF-IDF cosine +// similarity, scoped to the tenant on r.Context() (set by TenantMiddleware +// from X-Tenant-ID). Cross-tenant rows are never returned. func (s *Server) handleGetSimilarLogs(w http.ResponseWriter, r *http.Request) { if s.vectorIdx == nil { http.Error(w, "vector index not initialized", http.StatusServiceUnavailable)