diff --git a/internal/api/similar_handler.go b/internal/api/similar_handler.go index 1668801..8cd369c 100644 --- a/internal/api/similar_handler.go +++ b/internal/api/similar_handler.go @@ -4,10 +4,14 @@ import ( "encoding/json" "net/http" "strconv" + + "github.com/RandomCodeSpace/otelcontext/internal/storage" ) // handleGetSimilarLogs handles GET /api/logs/similar?q=&limit=10 -// Returns logs semantically similar to the query string using TF-IDF cosine similarity. +// Returns logs semantically similar to the query string using TF-IDF cosine +// similarity, scoped to the tenant on r.Context() (set by TenantMiddleware +// from X-Tenant-ID). Cross-tenant rows are never returned. func (s *Server) handleGetSimilarLogs(w http.ResponseWriter, r *http.Request) { if s.vectorIdx == nil { http.Error(w, "vector index not initialized", http.StatusServiceUnavailable) @@ -30,7 +34,8 @@ func (s *Server) handleGetSimilarLogs(w http.ResponseWriter, r *http.Request) { limit = 50 } - results := s.vectorIdx.Search(query, limit) + tenant := storage.TenantFromContext(r.Context()) + results := s.vectorIdx.Search(tenant, query, limit) w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{ diff --git a/internal/api/similar_handler_test.go b/internal/api/similar_handler_test.go new file mode 100644 index 0000000..69af324 --- /dev/null +++ b/internal/api/similar_handler_test.go @@ -0,0 +1,117 @@ +package api + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/RandomCodeSpace/otelcontext/internal/config" + "github.com/RandomCodeSpace/otelcontext/internal/vectordb" +) + +// TestSimilarHandler_TenantIsolation is the RAN-20 acceptance bar for the HTTP +// surface. Two tenants with distinct corpora query /api/logs/similar; each +// sees ZERO rows belonging to the other tenant. +func TestSimilarHandler_TenantIsolation(t *testing.T) { + idx := vectordb.New(1_000) + idx.Add(101, "acme", "checkout", "ERROR", "payment gateway timeout charging customer") + idx.Add(102, "acme", "checkout", "ERROR", "payment gateway refused charge insufficient funds") + idx.Add(201, "globex", "auth", "ERROR", "payment gateway token expired for session") + idx.Add(202, "globex", "auth", "ERROR", "payment gateway 500 internal error while authenticating") + + srv := &Server{vectorIdx: idx} + mux := http.NewServeMux() + mux.HandleFunc("GET /api/logs/similar", srv.handleGetSimilarLogs) + handler := TenantMiddleware(&config.Config{DefaultTenant: "default"})(mux) + + acmeIDs := map[float64]bool{101: true, 102: true} + globexIDs := map[float64]bool{201: true, 202: true} + + q := url.Values{} + q.Set("q", "payment gateway") + q.Set("limit", "50") + path := "/api/logs/similar?" + q.Encode() + + // Tenant A + aRec := httptest.NewRecorder() + aReq := httptest.NewRequest(http.MethodGet, path, nil) + aReq.Header.Set(TenantHeader, "acme") + handler.ServeHTTP(aRec, aReq) + if aRec.Code != http.StatusOK { + t.Fatalf("acme: want 200, got %d body=%q", aRec.Code, aRec.Body.String()) + } + acme := decodeResults(t, aRec) + if len(acme) == 0 { + t.Fatalf("acme got zero hits despite matching corpus") + } + for _, r := range acme { + if !acmeIDs[r.ID] { + t.Fatalf("acme leaked cross-tenant id=%v tenant=%q body=%q", r.ID, r.Tenant, r.Body) + } + } + + // Tenant B + gRec := httptest.NewRecorder() + gReq := httptest.NewRequest(http.MethodGet, path, nil) + gReq.Header.Set(TenantHeader, "globex") + handler.ServeHTTP(gRec, gReq) + if gRec.Code != http.StatusOK { + t.Fatalf("globex: want 200, got %d", gRec.Code) + } + globex := decodeResults(t, gRec) + if len(globex) == 0 { + t.Fatalf("globex got zero hits despite matching corpus") + } + for _, r := range globex { + if !globexIDs[r.ID] { + t.Fatalf("globex leaked cross-tenant id=%v tenant=%q body=%q", r.ID, r.Tenant, r.Body) + } + } +} + +// TestSimilarHandler_UnknownTenantReturnsEmpty confirms a request bearing an +// unknown tenant header returns zero results — the handler must not silently +// fall back to another tenant's rows. +func TestSimilarHandler_UnknownTenantReturnsEmpty(t *testing.T) { + idx := vectordb.New(100) + idx.Add(1, "acme", "svc", "ERROR", "database connection refused upstream") + + srv := &Server{vectorIdx: idx} + mux := http.NewServeMux() + mux.HandleFunc("GET /api/logs/similar", srv.handleGetSimilarLogs) + handler := TenantMiddleware(&config.Config{DefaultTenant: "default"})(mux) + + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/api/logs/similar?q=database+connection", nil) + req.Header.Set(TenantHeader, "initech") + handler.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("want 200, got %d", rec.Code) + } + if r := decodeResults(t, rec); len(r) != 0 { + t.Fatalf("unknown tenant saw %d cross-tenant hits", len(r)) + } +} + +type similarResult struct { + ID float64 `json:"LogID"` + Tenant string `json:"Tenant"` + ServiceName string `json:"ServiceName"` + Severity string `json:"Severity"` + Body string `json:"Body"` + Score float64 `json:"Score"` +} + +func decodeResults(t *testing.T, rec *httptest.ResponseRecorder) []similarResult { + t.Helper() + var env struct { + Results []similarResult `json:"results"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &env); err != nil { + t.Fatalf("decode response: %v (body=%q)", err, rec.Body.String()) + } + return env.Results +} diff --git a/internal/graphrag/builder.go b/internal/graphrag/builder.go index 143842b..0de3519 100644 --- a/internal/graphrag/builder.go +++ b/internal/graphrag/builder.go @@ -154,6 +154,19 @@ func (g *GraphRAG) DroppedMetricsCount() int64 { return g.droppedMetrics.Load() // tests to assert cooldown behavior without requiring a live repo. func (g *GraphRAG) InvestigationInsertCount() int64 { return g.invInserts.Load() } +// RegisterAnomaly inserts an anomaly into the AnomalyStore for tenant. +// Mirrors PersistInvestigation's "tenant accepted explicitly" shape so +// out-of-band anomaly producers (synthetic detectors, integration tests, +// future external anomaly feeds) can land directly on the right tenant +// slice without going through the metric/error detection loops. Empty +// tenant collapses to storage.DefaultTenantID. +func (g *GraphRAG) RegisterAnomaly(tenant string, anomaly AnomalyNode) { + if tenant == "" { + tenant = storage.DefaultTenantID + } + g.storesForTenant(tenant).anomalies.AddAnomaly(anomaly) +} + // recordEventDrop increments the per-signal atomic counter and — when // a telemetry registry is wired — the Prometheus counter vec. func (g *GraphRAG) recordEventDrop(signal string) { @@ -238,8 +251,13 @@ func New(repo *storage.Repository, vectorIdx *vectordb.Index, tsdbAgg *tsdb.Aggr // Restore persisted Drain templates so log clustering survives restarts. // A missing table (fresh install) or transient DB error is non-fatal — // ingestion will rebuild templates from scratch. + // + // The Drain miner is currently a single shared instance, so we treat its + // learned templates as belonging to DefaultTenantID. The persistence layer + // is already keyed by (tenant_id, id) so a future per-tenant Drain miner + // can load each tenant's slice without colliding cluster IDs. if repo != nil && repo.DB() != nil { - if tpls, err := LoadDrainTemplates(repo.DB()); err != nil { + if tpls, err := LoadDrainTemplates(repo.DB(), storage.DefaultTenantID); err != nil { slog.Info("GraphRAG: drain template restore skipped", "reason", err) } else if len(tpls) > 0 { g.drain.LoadTemplates(tpls) @@ -294,7 +312,7 @@ func (g *GraphRAG) Stop() { // Best-effort final Drain template persistence — losing the most recent // updates on an unclean shutdown would force rebuilding from scratch. if g.repo != nil && g.repo.DB() != nil && g.drain != nil { - if err := SaveDrainTemplates(g.repo.DB(), g.drain.Templates()); err != nil { + if err := SaveDrainTemplates(g.repo.DB(), storage.DefaultTenantID, g.drain.Templates()); err != nil { slog.Warn("GraphRAG: final drain template save failed", "error", err) } } diff --git a/internal/graphrag/drain.go b/internal/graphrag/drain.go index c357ee3..90ad1ac 100644 --- a/internal/graphrag/drain.go +++ b/internal/graphrag/drain.go @@ -33,6 +33,7 @@ import ( "sync" "time" + "github.com/RandomCodeSpace/otelcontext/internal/storage" "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -504,13 +505,20 @@ func (d *Drain) Len() int { // --- Persistence --- // SaveDrainTemplates upserts the given templates into the drain_templates -// table. Tokens are JSON-encoded. Uses GORM's clause.OnConflict upsert which -// is dialect-aware (ON CONFLICT for SQLite/PostgreSQL, ON DUPLICATE KEY -// UPDATE for MySQL). -func SaveDrainTemplates(db *gorm.DB, templates []Template) error { +// table under the supplied tenant. Tokens are JSON-encoded. Uses GORM's +// clause.OnConflict upsert which is dialect-aware (ON CONFLICT for SQLite/ +// PostgreSQL, ON DUPLICATE KEY UPDATE for MySQL). +// +// The conflict target is the composite (tenant_id, id) primary key so the +// same Drain template hash can coexist across tenants — a future per-tenant +// Drain miner can rely on this to keep cluster IDs stable per tenant. +func SaveDrainTemplates(db *gorm.DB, tenant string, templates []Template) error { if db == nil || len(templates) == 0 { return nil } + if tenant == "" { + tenant = storage.DefaultTenantID + } rows := make([]DrainTemplateRow, 0, len(templates)) for _, t := range templates { tokensJSON, err := json.Marshal(t.Tokens) @@ -518,6 +526,7 @@ func SaveDrainTemplates(db *gorm.DB, templates []Template) error { return fmt.Errorf("marshal drain tokens id=%d: %w", t.ID, err) } rows = append(rows, DrainTemplateRow{ + TenantID: tenant, ID: int64(t.ID), //nolint:gosec // intentional bit-reinterpret of FNV-64 hash for DB portability Tokens: string(tokensJSON), Count: t.Count, @@ -527,22 +536,25 @@ func SaveDrainTemplates(db *gorm.DB, templates []Template) error { }) } return db.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "id"}}, + Columns: []clause.Column{{Name: "tenant_id"}, {Name: "id"}}, DoUpdates: clause.AssignmentColumns([]string{ "tokens", "count", "last_seen", "sample", }), }).CreateInBatches(&rows, 500).Error } -// LoadDrainTemplates reads all persisted Drain templates from the DB and -// returns them in a format ready to pass to Drain.LoadTemplates. Returns an -// empty slice (and nil error) if the table is empty. -func LoadDrainTemplates(db *gorm.DB) ([]Template, error) { +// LoadDrainTemplates reads persisted Drain templates for the supplied tenant +// and returns them in a format ready to pass to Drain.LoadTemplates. Returns +// an empty slice (and nil error) if no rows match. +func LoadDrainTemplates(db *gorm.DB, tenant string) ([]Template, error) { if db == nil { return nil, nil } + if tenant == "" { + tenant = storage.DefaultTenantID + } var rows []DrainTemplateRow - if err := db.Find(&rows).Error; err != nil { + if err := db.Where("tenant_id = ?", tenant).Find(&rows).Error; err != nil { return nil, err } out := make([]Template, 0, len(rows)) diff --git a/internal/graphrag/drain_persist_test.go b/internal/graphrag/drain_persist_test.go index dd573c5..49a8633 100644 --- a/internal/graphrag/drain_persist_test.go +++ b/internal/graphrag/drain_persist_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/RandomCodeSpace/otelcontext/internal/storage" "github.com/glebarez/sqlite" "gorm.io/gorm" ) @@ -50,12 +51,12 @@ func TestDrainPersistence_RoundTrip(t *testing.T) { } // Persist. - if err := SaveDrainTemplates(db, d1.Templates()); err != nil { + if err := SaveDrainTemplates(db, storage.DefaultTenantID, d1.Templates()); err != nil { t.Fatalf("SaveDrainTemplates: %v", err) } // Reload. - loaded, err := LoadDrainTemplates(db) + loaded, err := LoadDrainTemplates(db, storage.DefaultTenantID) if err != nil { t.Fatalf("LoadDrainTemplates: %v", err) } @@ -91,7 +92,7 @@ func TestDrainPersistence_Upsert(t *testing.T) { for i := 0; i < 10; i++ { d.Match(fmt.Sprintf("upsert kind %c event", 'a'+byte(i)), t0) } - if err := SaveDrainTemplates(db, d.Templates()); err != nil { + if err := SaveDrainTemplates(db, storage.DefaultTenantID, d.Templates()); err != nil { t.Fatalf("first save: %v", err) } @@ -108,7 +109,7 @@ func TestDrainPersistence_Upsert(t *testing.T) { for i := 0; i < 10; i++ { d.Match(fmt.Sprintf("upsert kind %c event", 'a'+byte(i)), t1) } - if err := SaveDrainTemplates(db, d.Templates()); err != nil { + if err := SaveDrainTemplates(db, storage.DefaultTenantID, d.Templates()); err != nil { t.Fatalf("second save: %v", err) } @@ -139,7 +140,7 @@ func TestDrainPersistence_Upsert(t *testing.T) { // returns an empty slice and nil error (fresh-install path). func TestDrainPersistence_EmptyDB(t *testing.T) { db := newTestDrainDB(t) - tpls, err := LoadDrainTemplates(db) + tpls, err := LoadDrainTemplates(db, storage.DefaultTenantID) if err != nil { t.Fatalf("LoadDrainTemplates on empty table: %v", err) } @@ -148,7 +149,7 @@ func TestDrainPersistence_EmptyDB(t *testing.T) { } // Calling with nil DB is also safe. - tpls, err = LoadDrainTemplates(nil) + tpls, err = LoadDrainTemplates(nil, storage.DefaultTenantID) if err != nil { t.Fatalf("LoadDrainTemplates(nil): %v", err) } @@ -157,7 +158,7 @@ func TestDrainPersistence_EmptyDB(t *testing.T) { } // Saving an empty slice is a no-op (no error, no rows). - if err := SaveDrainTemplates(db, nil); err != nil { + if err := SaveDrainTemplates(db, storage.DefaultTenantID, nil); err != nil { t.Fatalf("SaveDrainTemplates(nil): %v", err) } var cnt int64 diff --git a/internal/graphrag/investigation.go b/internal/graphrag/investigation.go index e3dbd31..7aa45d8 100644 --- a/internal/graphrag/investigation.go +++ b/internal/graphrag/investigation.go @@ -10,7 +10,6 @@ import ( "time" "github.com/RandomCodeSpace/otelcontext/internal/storage" - "gorm.io/gorm" ) // investigationCooldown suppresses repeated PersistInvestigation calls with @@ -43,13 +42,14 @@ func (c *investigationCooldown) allow(key string, now time.Time) bool { } // cooldownKey builds a case- and whitespace-insensitive key from the tuple -// (trigger_service, root_service, root_operation). Service names emitted -// from different instrumentations occasionally differ in casing or have -// trailing whitespace; canonicalizing here prevents those variants from -// bypassing the cooldown guard. -func cooldownKey(triggerService, rootService, rootOperation string) string { +// (tenant, trigger_service, root_service, root_operation). Tenant scopes the +// guard so an error in tenant-A doesn't suppress the same error pattern in +// tenant-B. Service names emitted from different instrumentations occasionally +// differ in casing or have trailing whitespace; canonicalizing here prevents +// those variants from bypassing the cooldown guard. +func cooldownKey(tenant, triggerService, rootService, rootOperation string) string { norm := func(s string) string { return strings.ToLower(strings.TrimSpace(s)) } - return norm(triggerService) + "|" + norm(rootService) + "|" + norm(rootOperation) + return norm(tenant) + "|" + norm(triggerService) + "|" + norm(rootService) + "|" + norm(rootOperation) } // prune drops entries older than cutoff to bound map size. Called from @@ -65,9 +65,14 @@ func (c *investigationCooldown) prune(cutoff time.Time) { } // Investigation is a persisted record of an automated error investigation. +// +// TenantID scopes the row to its originating tenant. The composite +// (tenant_id, created_at) index supports the recency-ordered "investigations +// for tenant X" query that GetInvestigations runs on every read. type Investigation struct { + TenantID string `gorm:"size:64;default:'default';not null;index:idx_investigations_tenant_created,priority:1" json:"tenant_id"` ID string `gorm:"primaryKey;size:64" json:"id"` - CreatedAt time.Time `json:"created_at"` + CreatedAt time.Time `gorm:"index:idx_investigations_tenant_created,priority:2" json:"created_at"` Status string `gorm:"size:20" json:"status"` // detected, triaged, resolved Severity string `gorm:"size:20" json:"severity"` // critical, warning, info TriggerService string `gorm:"size:255;index" json:"trigger_service"` @@ -88,20 +93,17 @@ func (Investigation) TableName() string { return "investigations" } -// AutoMigrateGraphRAG runs GORM auto-migration for GraphRAG models. -func AutoMigrateGraphRAG(db *gorm.DB) error { - return db.AutoMigrate(&Investigation{}, &GraphSnapshot{}, &DrainTemplateRow{}) -} - // PersistInvestigation saves an investigation record from an error chain // analysis. Tenant is accepted explicitly so the caller (the per-tenant -// anomaly loop) can re-enter ImpactAnalysis on the correct tenant slice. The -// `investigations` table itself does not yet carry a tenant_id column — -// Subtask B (RAN-38) owns that migration. +// anomaly loop) can re-enter ImpactAnalysis on the correct tenant slice and so +// the persisted row carries its originating tenant_id. func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains []ErrorChainResult, anomalies []*AnomalyNode) { if len(chains) == 0 { return } + if tenant == "" { + tenant = storage.DefaultTenantID + } firstChain := chains[0] if firstChain.RootCause == nil { @@ -111,10 +113,12 @@ func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains [] now := time.Now() // Cooldown: suppress repeat investigations for the same - // (trigger_service, root_service, root_operation) inside a sliding window. - // Keys are canonicalized (lower + trim) so "Orders" and "orders " share a - // bucket — otherwise trivial casing differences would bypass the guard. - key := cooldownKey(triggerService, firstChain.RootCause.Service, firstChain.RootCause.Operation) + // (tenant, trigger_service, root_service, root_operation) inside a sliding + // window. Keys are canonicalized (lower + trim) so "Orders" and "orders " + // share a bucket — otherwise trivial casing differences would bypass the + // guard. Tenant scoping prevents an error in one tenant from suppressing + // the same pattern in another. + key := cooldownKey(tenant, triggerService, firstChain.RootCause.Service, firstChain.RootCause.Operation) if g.invCooldown != nil && !g.invCooldown.allow(key, now) { return } @@ -180,6 +184,7 @@ func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains [] } inv := Investigation{ + TenantID: tenant, ID: id, CreatedAt: now, Status: "detected", @@ -202,15 +207,17 @@ func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains [] return } if err := g.repo.DB().Create(&inv).Error; err != nil { - slog.Error("Failed to persist investigation", "error", err) + slog.Error("Failed to persist investigation", "tenant", tenant, "error", err) return } - slog.Info("Investigation persisted", "id", id, "service", triggerService, "severity", severity) + slog.Info("Investigation persisted", "id", id, "tenant", tenant, "service", triggerService, "severity", severity) } -// GetInvestigations queries persisted investigations. -func (g *GraphRAG) GetInvestigations(service, severity, status string, limit int) ([]Investigation, error) { +// GetInvestigations queries persisted investigations scoped to the tenant +// carried by ctx. The composite (tenant_id, created_at) index supports the +// recency-ordered scan. +func (g *GraphRAG) GetInvestigations(ctx context.Context, service, severity, status string, limit int) ([]Investigation, error) { if limit <= 0 { limit = 20 } @@ -218,7 +225,12 @@ func (g *GraphRAG) GetInvestigations(service, severity, status string, limit int limit = 100 } - db := g.repo.DB().Model(&Investigation{}).Order("created_at DESC").Limit(limit) + tenant := storage.TenantFromContext(ctx) + db := g.repo.DB(). + Model(&Investigation{}). + Where("tenant_id = ?", tenant). + Order("created_at DESC"). + Limit(limit) if service != "" { db = db.Where("trigger_service = ? OR root_service = ?", service, service) } @@ -236,10 +248,13 @@ func (g *GraphRAG) GetInvestigations(service, severity, status string, limit int return investigations, nil } -// GetInvestigation retrieves a single investigation by ID. -func (g *GraphRAG) GetInvestigation(id string) (*Investigation, error) { +// GetInvestigation retrieves a single investigation by ID, scoped to the +// tenant carried by ctx. Returning ErrRecordNotFound for cross-tenant lookups +// prevents id-guessing from leaking another tenant's row. +func (g *GraphRAG) GetInvestigation(ctx context.Context, id string) (*Investigation, error) { + tenant := storage.TenantFromContext(ctx) var inv Investigation - if err := g.repo.DB().Where("id = ?", id).First(&inv).Error; err != nil { + if err := g.repo.DB().Where("tenant_id = ? AND id = ?", tenant, id).First(&inv).Error; err != nil { return nil, err } return &inv, nil diff --git a/internal/graphrag/investigation_cooldown_test.go b/internal/graphrag/investigation_cooldown_test.go index 1e36eeb..fdfce6f 100644 --- a/internal/graphrag/investigation_cooldown_test.go +++ b/internal/graphrag/investigation_cooldown_test.go @@ -46,18 +46,30 @@ func TestPersistInvestigation_Cooldown(t *testing.T) { } // TestCooldownKey_Canonical verifies the key normalizes case and trims -// whitespace so "Orders" / "orders " / "ORDERS" land in the same bucket. +// whitespace so "Orders" / "orders " / "ORDERS" land in the same bucket +// within a tenant. func TestCooldownKey_Canonical(t *testing.T) { - cases := [][3]string{ - {"orders", "orders", "op"}, - {"Orders", "ORDERS", "op"}, - {" orders ", "orders", " op "}, - {"ORDERS", "Orders ", "OP"}, + cases := [][4]string{ + {"acme", "orders", "orders", "op"}, + {"Acme", "Orders", "ORDERS", "op"}, + {" acme ", " orders ", "orders", " op "}, + {"ACME", "ORDERS", "Orders ", "OP"}, } - want := cooldownKey(cases[0][0], cases[0][1], cases[0][2]) + want := cooldownKey(cases[0][0], cases[0][1], cases[0][2], cases[0][3]) for _, c := range cases[1:] { - if got := cooldownKey(c[0], c[1], c[2]); got != want { + if got := cooldownKey(c[0], c[1], c[2], c[3]); got != want { t.Errorf("cooldownKey%v = %q, want %q", c, got, want) } } } + +// TestCooldownKey_TenantIsolated asserts that two tenants emitting the same +// (trigger, root, op) tuple produce distinct cooldown keys, so an error in +// tenant-A doesn't suppress the same pattern in tenant-B. +func TestCooldownKey_TenantIsolated(t *testing.T) { + a := cooldownKey("tenant-a", "orders", "orders", "op") + b := cooldownKey("tenant-b", "orders", "orders", "op") + if a == b { + t.Fatalf("tenant scoping missing: tenant-a and tenant-b share key %q", a) + } +} diff --git a/internal/graphrag/migrate.go b/internal/graphrag/migrate.go new file mode 100644 index 0000000..133593c --- /dev/null +++ b/internal/graphrag/migrate.go @@ -0,0 +1,213 @@ +package graphrag + +// GraphRAG persistence migrations. +// +// AutoMigrateGraphRAG runs the standard GORM AutoMigrate over the three +// persisted GraphRAG models, then performs two idempotent post-migration +// passes that prepare older databases for tenant-scoped reads: +// +// 1. backfillTenantIDs — sets tenant_id = DefaultTenantID on rows that pre-date +// the column being added (or that somehow ended up empty). New rows always +// receive the column default at insert time. +// +// 2. ensureDrainTemplatesCompositePK — promotes drain_templates' single-column +// primary key (id) to the composite (tenant_id, id) when an older schema +// is detected. The composite PK matters because the same Drain template +// hash can legitimately recur across tenants once the in-memory miner is +// partitioned per tenant; without it the second tenant's row would collide +// on insert. +// +// Both passes are safe to call repeatedly. SQLite and PostgreSQL are +// supported explicitly; other dialects skip the PK promotion and log so an +// operator can apply the equivalent DDL by hand. + +import ( + "errors" + "fmt" + "log/slog" + + "github.com/RandomCodeSpace/otelcontext/internal/storage" + "gorm.io/gorm" +) + +// graphRAGTables are the three persisted tables that carry tenant_id after +// RAN-38. Order matches AutoMigrate order so log lines line up. +var graphRAGTables = []string{"investigations", "graph_snapshots", "drain_templates"} + +// AutoMigrateGraphRAG runs GORM auto-migration for GraphRAG models and +// applies tenant backfill + drain_templates composite-PK promotion. Safe to +// call repeatedly. +func AutoMigrateGraphRAG(db *gorm.DB) error { + if db == nil { + return nil + } + if err := db.AutoMigrate(&Investigation{}, &GraphSnapshot{}, &DrainTemplateRow{}); err != nil { + return fmt.Errorf("graphrag automigrate: %w", err) + } + if err := backfillTenantIDs(db); err != nil { + return fmt.Errorf("graphrag tenant backfill: %w", err) + } + if err := ensureDrainTemplatesCompositePK(db); err != nil { + // Non-fatal: new writes still carry tenant_id; only existing rows + // retain the legacy single-column PK. Surfaced as a warn so an + // operator can intervene on dialects we don't auto-migrate. + slog.Warn("graphrag: drain_templates composite PK promotion skipped", "error", err) + } + return nil +} + +// backfillTenantIDs sets tenant_id = DefaultTenantID for any row in the three +// GraphRAG tables that ended up with NULL or empty tenant_id. AutoMigrate +// already supplies a column default for new inserts; this pass covers the +// "added the column to a populated table" path on dialects that don't +// retroactively backfill via DEFAULT (and serves as belt-and-braces on those +// that do). +func backfillTenantIDs(db *gorm.DB) error { + for _, tbl := range graphRAGTables { + // Only touch tables that exist — fresh installs may race with a + // half-migrated DB on first boot. HasTable is dialect-aware and + // returns false rather than erroring on missing tables. + if !db.Migrator().HasTable(tbl) { + continue + } + sql := fmt.Sprintf(`UPDATE %s SET tenant_id = ? WHERE tenant_id IS NULL OR tenant_id = ''`, tbl) //nolint:gosec // table name is from a fixed allow-list + if err := db.Exec(sql, storage.DefaultTenantID).Error; err != nil { + return fmt.Errorf("backfill %s: %w", tbl, err) + } + } + return nil +} + +// ensureDrainTemplatesCompositePK promotes drain_templates' primary key from +// (id) to (tenant_id, id) on existing databases. Fresh installs already get +// the composite PK from AutoMigrate. On dialects this function does not +// understand it returns nil so AutoMigrateGraphRAG can move on. +func ensureDrainTemplatesCompositePK(db *gorm.DB) error { + if db == nil || !db.Migrator().HasTable("drain_templates") { + return nil + } + switch db.Dialector.Name() { + case "sqlite": + return ensureDrainPKSQLite(db) + case "postgres": + return ensureDrainPKPostgres(db) + default: + // MySQL / MSSQL aren't covered by this ticket. Log so an operator + // running those dialects can apply the equivalent DDL by hand. + slog.Info("graphrag: drain_templates PK promotion skipped — unsupported dialect", + "dialect", db.Dialector.Name()) + return nil + } +} + +// ensureDrainPKSQLite uses PRAGMA table_info to detect whether tenant_id is +// already part of the primary key. If not, it rebuilds the table via the +// canonical SQLite recipe (rename → create → copy → drop) inside a +// transaction. CreateTable here mirrors whatever GORM would build for a fresh +// install, so we don't have to hand-maintain a parallel CREATE TABLE. +func ensureDrainPKSQLite(db *gorm.DB) error { + type pragmaCol struct { + Cid int `gorm:"column:cid"` + Name string `gorm:"column:name"` + Type string `gorm:"column:type"` + Notnull int `gorm:"column:notnull"` + DfltValue any `gorm:"column:dflt_value"` + Pk int `gorm:"column:pk"` + } + var cols []pragmaCol + if err := db.Raw(`PRAGMA table_info('drain_templates')`).Scan(&cols).Error; err != nil { + return fmt.Errorf("pragma table_info: %w", err) + } + if len(cols) == 0 { + // Table doesn't exist; AutoMigrate would have created it with the + // composite PK already. Nothing to do. + return nil + } + pkCols := map[string]bool{} + for _, c := range cols { + if c.Pk > 0 { + pkCols[c.Name] = true + } + } + if pkCols["tenant_id"] && pkCols["id"] { + return nil // already composite + } + + return db.Transaction(func(tx *gorm.DB) error { + // Drop any leftover scratch from an interrupted earlier attempt. + if err := tx.Exec(`DROP TABLE IF EXISTS drain_templates__legacy`).Error; err != nil { + return fmt.Errorf("drop scratch: %w", err) + } + if err := tx.Exec(`ALTER TABLE drain_templates RENAME TO drain_templates__legacy`).Error; err != nil { + return fmt.Errorf("rename legacy: %w", err) + } + // Recreate with the current GORM schema (composite PK + indexes). + if err := tx.Migrator().CreateTable(&DrainTemplateRow{}); err != nil { + return fmt.Errorf("create new table: %w", err) + } + // Copy rows over, defaulting tenant_id where the legacy table + // didn't carry one (the column may have been added by AutoMigrate + // before this pass ran). + insert := `INSERT INTO drain_templates (tenant_id, id, tokens, count, first_seen, last_seen, sample) +SELECT COALESCE(NULLIF(tenant_id, ''), ?), id, tokens, count, first_seen, last_seen, sample +FROM drain_templates__legacy` + if err := tx.Exec(insert, storage.DefaultTenantID).Error; err != nil { + // Older schemas may not have the tenant_id column at all — fall + // back to a tenant-less SELECT, defaulting every row. + insertFallback := `INSERT INTO drain_templates (tenant_id, id, tokens, count, first_seen, last_seen, sample) +SELECT ?, id, tokens, count, first_seen, last_seen, sample +FROM drain_templates__legacy` + if err2 := tx.Exec(insertFallback, storage.DefaultTenantID).Error; err2 != nil { + return fmt.Errorf("copy rows: %w (fallback: %w)", err, err2) + } + } + if err := tx.Exec(`DROP TABLE drain_templates__legacy`).Error; err != nil { + return fmt.Errorf("drop legacy: %w", err) + } + slog.Info("graphrag: promoted drain_templates PK to (tenant_id, id) on SQLite") + return nil + }) +} + +// ensureDrainPKPostgres queries pg_index for the current PK column set and, +// when it doesn't already include tenant_id, drops the implicit +// drain_templates_pkey constraint and recreates it as a composite PK on +// (tenant_id, id). DROP/ADD runs inside one statement so the table is never +// without a PK between steps. +func ensureDrainPKPostgres(db *gorm.DB) error { + var pkCols []string + err := db.Raw(` + SELECT a.attname::text + FROM pg_index i + JOIN pg_attribute a + ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) + WHERE i.indrelid = 'drain_templates'::regclass + AND i.indisprimary + `).Scan(&pkCols).Error + if err != nil { + // Most likely the table doesn't exist yet — nothing to migrate. + return nil + } + hasTenant, hasID := false, false + for _, c := range pkCols { + switch c { + case "tenant_id": + hasTenant = true + case "id": + hasID = true + } + } + if hasTenant && hasID { + return nil + } + if !hasID { + // Defensive: an alien schema without `id` in the PK is not something + // this migration should silently overwrite. + return errors.New("drain_templates primary key has unexpected shape; manual migration required") + } + if err := db.Exec(`ALTER TABLE drain_templates DROP CONSTRAINT IF EXISTS drain_templates_pkey, ADD PRIMARY KEY (tenant_id, id)`).Error; err != nil { + return fmt.Errorf("alter pk: %w", err) + } + slog.Info("graphrag: promoted drain_templates PK to (tenant_id, id) on PostgreSQL") + return nil +} diff --git a/internal/graphrag/migrate_test.go b/internal/graphrag/migrate_test.go new file mode 100644 index 0000000..2cc5df8 --- /dev/null +++ b/internal/graphrag/migrate_test.go @@ -0,0 +1,297 @@ +package graphrag + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/RandomCodeSpace/otelcontext/internal/storage" + "github.com/glebarez/sqlite" + "gorm.io/gorm" +) + +// newTestGraphRAGDB stands up an in-memory SQLite DB ready for the GraphRAG +// migrations. Local helper so the migrate tests don't depend on storage's +// _test-only fixtures. +func newTestGraphRAGDB(t *testing.T) *gorm.DB { + t.Helper() + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + if err != nil { + t.Fatalf("open sqlite: %v", err) + } + return db +} + +// newTestGraphRAGWithDB returns a GraphRAG wired to a real (in-memory SQLite) +// repo so tenant-scoped read tests can exercise the actual GORM query path. +// The returned *gorm.DB is the same handle the GraphRAG uses, suitable for +// seeding rows directly. +func newTestGraphRAGWithDB(t *testing.T) (*GraphRAG, *gorm.DB) { + t.Helper() + db := newTestGraphRAGDB(t) + repo := storage.NewRepositoryFromDB(db, "sqlite") + g := New(repo, nil, nil, nil, DefaultConfig()) + t.Cleanup(func() { g.Stop() }) + return g, db +} + +// TestAutoMigrateGraphRAG_CreatesTenantCompositeIndexes asserts that the +// composite indexes declared on the three persisted GraphRAG models are +// actually materialised on SQLite. Mirrors +// storage.TestAutoMigrate_CreatesTenantCompositeIndexes for the GraphRAG side. +func TestAutoMigrateGraphRAG_CreatesTenantCompositeIndexes(t *testing.T) { + db := newTestGraphRAGDB(t) + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("AutoMigrateGraphRAG: %v", err) + } + expected := []struct { + table string + index string + }{ + {"investigations", "idx_investigations_tenant_created"}, + {"graph_snapshots", "idx_graph_snapshots_tenant_created"}, + } + for _, tc := range expected { + var count int + if err := db.Raw( + "SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND tbl_name=? AND name=?", + tc.table, tc.index, + ).Scan(&count).Error; err != nil { + t.Fatalf("sqlite_master query for %s.%s: %v", tc.table, tc.index, err) + } + if count != 1 { + t.Errorf("expected composite index %s on table %s (count=%d)", tc.index, tc.table, count) + } + } +} + +// TestAutoMigrateGraphRAG_DrainTemplatesCompositePK asserts the PK on +// drain_templates is the composite (tenant_id, id) on a fresh install. +// Without this the same Drain template hash from two tenants would collide +// once a per-tenant Drain miner ships. +func TestAutoMigrateGraphRAG_DrainTemplatesCompositePK(t *testing.T) { + db := newTestGraphRAGDB(t) + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("AutoMigrateGraphRAG: %v", err) + } + type col struct { + Name string `gorm:"column:name"` + Pk int `gorm:"column:pk"` + } + var cols []col + if err := db.Raw(`PRAGMA table_info('drain_templates')`).Scan(&cols).Error; err != nil { + t.Fatalf("pragma table_info: %v", err) + } + pk := map[string]bool{} + for _, c := range cols { + if c.Pk > 0 { + pk[c.Name] = true + } + } + if !pk["tenant_id"] || !pk["id"] { + t.Fatalf("drain_templates PK should be composite (tenant_id, id); got %+v", pk) + } +} + +// TestAutoMigrateGraphRAG_IsIdempotent calls AutoMigrateGraphRAG repeatedly +// to assert the backfill UPDATEs and PK-promotion path are safe to re-run. +func TestAutoMigrateGraphRAG_IsIdempotent(t *testing.T) { + db := newTestGraphRAGDB(t) + for i := 0; i < 3; i++ { + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("AutoMigrateGraphRAG pass %d: %v", i, err) + } + } +} + +// TestAutoMigrateGraphRAG_BackfillsLegacyRows pre-populates the three +// GraphRAG tables with rows that have empty tenant_id and asserts the backfill +// pass fills DefaultTenantID on every one. Mimics the upgrade path for an +// existing install whose first boot brings in the tenant_id column. +func TestAutoMigrateGraphRAG_BackfillsLegacyRows(t *testing.T) { + db := newTestGraphRAGDB(t) + // Run once so the tables exist with the new schema. + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("first migrate: %v", err) + } + // Insert rows with empty tenant_id directly via raw SQL — Investigation, + // GraphSnapshot and DrainTemplateRow's GORM defaults would otherwise fill + // the column on insert. + now := time.Now().UTC() + if err := db.Exec(`INSERT INTO investigations (tenant_id, id, created_at, status, severity, trigger_service, trigger_operation, error_message, root_service, root_operation, causal_chain, trace_ids, error_logs, anomalous_metrics, affected_services, span_chain) VALUES ('', 'inv_legacy', ?, 'detected', 'warning', 'svc', 'op', 'boom', 'svc', 'op', '[]', '[]', '[]', '[]', '[]', '[]')`, now).Error; err != nil { + t.Fatalf("seed legacy investigation: %v", err) + } + if err := db.Exec(`INSERT INTO graph_snapshots (tenant_id, id, created_at, nodes, edges, service_count, total_calls, avg_health_score) VALUES ('', 'snap_legacy', ?, '[]', '[]', 0, 0, 0)`, now).Error; err != nil { + t.Fatalf("seed legacy snapshot: %v", err) + } + // Drain rows: tenant_id is part of the PK so we must give it *something* + // — empty string is allowed by SQLite. The backfill is expected to fix it. + if err := db.Exec(`INSERT INTO drain_templates (tenant_id, id, tokens, count, first_seen, last_seen, sample) VALUES ('', 1, '["a","b"]', 1, ?, ?, 'sample')`, now, now).Error; err != nil { + t.Fatalf("seed legacy drain row: %v", err) + } + + // Re-run migration; backfill should populate every empty tenant_id. + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("second migrate: %v", err) + } + + for _, tbl := range graphRAGTables { + var stragglers int + if err := db.Raw(`SELECT COUNT(*) FROM `+tbl+` WHERE tenant_id IS NULL OR tenant_id = ''`).Scan(&stragglers).Error; err != nil { + t.Fatalf("count empty tenant in %s: %v", tbl, err) + } + if stragglers != 0 { + t.Errorf("%s: %d rows still have empty tenant_id after backfill", tbl, stragglers) + } + var defaults int + if err := db.Raw(`SELECT COUNT(*) FROM `+tbl+` WHERE tenant_id = ?`, storage.DefaultTenantID).Scan(&defaults).Error; err != nil { + t.Fatalf("count default tenant in %s: %v", tbl, err) + } + if defaults < 1 { + t.Errorf("%s: backfill produced no DefaultTenantID rows", tbl) + } + } +} + +// TestSaveLoadDrainTemplates_TenantIsolation asserts that Save with one tenant +// and Load with another returns nothing — the per-tenant scoping is enforced +// at the DB layer, not just at the in-memory miner. +func TestSaveLoadDrainTemplates_TenantIsolation(t *testing.T) { + db := newTestGraphRAGDB(t) + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("migrate: %v", err) + } + tplsA := []Template{{ID: 0xA1, Tokens: []string{"a", "b"}, Count: 1, FirstSeen: time.Now(), LastSeen: time.Now()}} + tplsB := []Template{{ID: 0xB2, Tokens: []string{"c", "d"}, Count: 1, FirstSeen: time.Now(), LastSeen: time.Now()}} + // Identical hash across tenants is the interesting case — confirms the + // composite PK lets the same template ID coexist in two tenants. + tplsBSameID := []Template{{ID: 0xA1, Tokens: []string{"x", "y"}, Count: 1, FirstSeen: time.Now(), LastSeen: time.Now()}} + + if err := SaveDrainTemplates(db, "tenant-a", tplsA); err != nil { + t.Fatalf("save tenant-a: %v", err) + } + if err := SaveDrainTemplates(db, "tenant-b", tplsB); err != nil { + t.Fatalf("save tenant-b: %v", err) + } + if err := SaveDrainTemplates(db, "tenant-b", tplsBSameID); err != nil { + t.Fatalf("save tenant-b colliding id: %v", err) + } + + loadedA, err := LoadDrainTemplates(db, "tenant-a") + if err != nil { + t.Fatalf("load tenant-a: %v", err) + } + if len(loadedA) != 1 || loadedA[0].ID != 0xA1 || loadedA[0].Tokens[0] != "a" { + t.Errorf("tenant-a load mismatch: %+v", loadedA) + } + + loadedB, err := LoadDrainTemplates(db, "tenant-b") + if err != nil { + t.Fatalf("load tenant-b: %v", err) + } + if len(loadedB) != 2 { + t.Errorf("tenant-b should have two templates (distinct + colliding-id); got %d", len(loadedB)) + } + + loadedC, err := LoadDrainTemplates(db, "tenant-empty") + if err != nil { + t.Fatalf("load tenant-empty: %v", err) + } + if len(loadedC) != 0 { + t.Errorf("tenant-empty should have no templates; got %d", len(loadedC)) + } +} + +// TestGraphRAG_GetInvestigations_TenantScoped seeds investigations under two +// tenants and asserts each tenant only sees its own rows via ctx scoping. +func TestGraphRAG_GetInvestigations_TenantScoped(t *testing.T) { + g, db := newTestGraphRAGWithDB(t) + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("migrate: %v", err) + } + now := time.Now() + rows := []Investigation{ + {TenantID: "acme", ID: "inv-acme-1", CreatedAt: now, Status: "detected", Severity: "critical", TriggerService: "orders"}, + {TenantID: "acme", ID: "inv-acme-2", CreatedAt: now.Add(time.Second), Status: "detected", Severity: "warning", TriggerService: "payments"}, + {TenantID: "globex", ID: "inv-globex-1", CreatedAt: now, Status: "detected", Severity: "critical", TriggerService: "orders"}, + } + for _, r := range rows { + if err := db.Create(&r).Error; err != nil { + t.Fatalf("seed %s: %v", r.ID, err) + } + } + + acmeCtx := storage.WithTenantContext(context.Background(), "acme") + globexCtx := storage.WithTenantContext(context.Background(), "globex") + + acme, err := g.GetInvestigations(acmeCtx, "", "", "", 100) + if err != nil { + t.Fatalf("acme list: %v", err) + } + if len(acme) != 2 { + t.Errorf("acme should see 2 rows; got %d", len(acme)) + } + for _, r := range acme { + if r.TenantID != "acme" { + t.Errorf("acme list leaked %s row: %s", r.TenantID, r.ID) + } + } + + globex, err := g.GetInvestigations(globexCtx, "", "", "", 100) + if err != nil { + t.Fatalf("globex list: %v", err) + } + if len(globex) != 1 || globex[0].ID != "inv-globex-1" { + t.Errorf("globex should see only inv-globex-1; got %+v", globex) + } + + // Cross-tenant ID lookup must miss — id-guessing should not leak. + if _, err := g.GetInvestigation(acmeCtx, "inv-globex-1"); err == nil { + t.Errorf("acme ctx should NOT find globex investigation") + } + got, err := g.GetInvestigation(globexCtx, "inv-globex-1") + if err != nil { + t.Fatalf("globex own lookup: %v", err) + } + if got.TenantID != "globex" { + t.Errorf("expected globex row; got tenant=%q", got.TenantID) + } +} + +// TestGraphRAG_GetGraphSnapshot_TenantScoped seeds two snapshots (one per +// tenant) at the same instant and asserts each tenant only retrieves its own. +func TestGraphRAG_GetGraphSnapshot_TenantScoped(t *testing.T) { + g, db := newTestGraphRAGWithDB(t) + if err := AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("migrate: %v", err) + } + now := time.Now().UTC() + for _, tenant := range []string{"acme", "globex"} { + snap := GraphSnapshot{ + TenantID: tenant, + ID: "snap_" + tenant, + CreatedAt: now, + Nodes: []byte(`[]`), + Edges: []byte(`[]`), + ServiceCount: 1, + AvgHealthScore: 1, + } + if err := db.Create(&snap).Error; err != nil { + t.Fatalf("seed %s: %v", tenant, err) + } + } + for _, tenant := range []string{"acme", "globex"} { + ctx := storage.WithTenantContext(context.Background(), tenant) + snap, err := g.GetGraphSnapshot(ctx, now.Add(time.Second)) + if err != nil { + t.Fatalf("get %s: %v", tenant, err) + } + if snap.TenantID != tenant { + t.Errorf("ctx %s returned snapshot for tenant %q", tenant, snap.TenantID) + } + if !strings.HasSuffix(snap.ID, tenant) { + t.Errorf("ctx %s returned snapshot id %s", tenant, snap.ID) + } + } +} diff --git a/internal/graphrag/refresh.go b/internal/graphrag/refresh.go index 43a941e..b16d8bc 100644 --- a/internal/graphrag/refresh.go +++ b/internal/graphrag/refresh.go @@ -77,7 +77,7 @@ func (g *GraphRAG) persistDrainTemplates() { if len(tpls) == 0 { return } - if err := SaveDrainTemplates(g.repo.DB(), tpls); err != nil { + if err := SaveDrainTemplates(g.repo.DB(), storage.DefaultTenantID, tpls); err != nil { slog.Error("Failed to persist drain templates", "error", err) return } diff --git a/internal/graphrag/schema.go b/internal/graphrag/schema.go index 81b0754..fed9e4d 100644 --- a/internal/graphrag/schema.go +++ b/internal/graphrag/schema.go @@ -222,7 +222,13 @@ type RankedCause struct { // standard SQL drivers reject uint64 values with the high bit set, and signed // int64 carries the same 64 bits without loss. Conversion happens in the // persistence helpers. +// +// The primary key is composite (tenant_id, id): the same template tokens can +// legitimately recur across tenants, and we want the cluster ID to stay stable +// per tenant once the in-memory Drain miner is partitioned per-tenant. TenantID +// is declared first so it leads the PK index. type DrainTemplateRow struct { + TenantID string `gorm:"primaryKey;size:64;default:'default';not null" json:"tenant_id"` ID int64 `gorm:"primaryKey;autoIncrement:false" json:"id"` // int64(Template.ID) Tokens string `gorm:"type:text;not null" json:"tokens"` // JSON-encoded []string Count int `json:"count"` diff --git a/internal/graphrag/snapshot.go b/internal/graphrag/snapshot.go index 3a77328..aef160f 100644 --- a/internal/graphrag/snapshot.go +++ b/internal/graphrag/snapshot.go @@ -12,14 +12,14 @@ import ( // GraphSnapshot is a periodic snapshot of the service topology persisted to DB. // -// Tenant identity is not yet a column on this model — Subtask B (RAN-38) adds -// `tenant_id` to the schema along with the persistence-layer filtering. For -// now, snapshots are written one row per tenant per tick and remain queryable -// only across the whole table; callers must not treat pre-Subtask-B rows as -// tenant-scoped. +// TenantID scopes the row to the tenant slice it was captured from. The +// composite (tenant_id, created_at) index supports the +// "most recent snapshot at-or-before T for tenant X" lookup that +// GetGraphSnapshot runs on every read. type GraphSnapshot struct { + TenantID string `gorm:"size:64;default:'default';not null;index:idx_graph_snapshots_tenant_created,priority:1" json:"tenant_id"` ID string `gorm:"primaryKey;size:64" json:"id"` - CreatedAt time.Time `json:"created_at"` + CreatedAt time.Time `gorm:"index:idx_graph_snapshots_tenant_created,priority:2" json:"created_at"` Nodes json.RawMessage `gorm:"type:text" json:"nodes"` Edges json.RawMessage `gorm:"type:text" json:"edges"` ServiceCount int `json:"service_count"` @@ -117,6 +117,7 @@ func (g *GraphRAG) takeSnapshotForTenant(_ context.Context, tenant string, store edgesJSON, _ := json.Marshal(snapEdges) snap := GraphSnapshot{ + TenantID: tenant, ID: fmt.Sprintf("snap_%s_%d", tenant, time.Now().UnixNano()), CreatedAt: time.Now(), Nodes: nodesJSON, @@ -155,12 +156,14 @@ func (g *GraphRAG) pruneOldSnapshots() { } } -// GetGraphSnapshot retrieves the snapshot closest to the requested time. -// TODO(RAN-38, Subtask B): scope by tenant once tenant_id lands on the table. -func (g *GraphRAG) GetGraphSnapshot(at time.Time) (*GraphSnapshot, error) { +// GetGraphSnapshot retrieves the snapshot closest to the requested time, +// scoped to the tenant carried by ctx. The composite (tenant_id, created_at) +// index supports the descending lookup. +func (g *GraphRAG) GetGraphSnapshot(ctx context.Context, at time.Time) (*GraphSnapshot, error) { + tenant := storage.TenantFromContext(ctx) var snap GraphSnapshot err := g.repo.DB(). - Where("created_at <= ?", at). + Where("tenant_id = ? AND created_at <= ?", tenant, at). Order("created_at DESC"). First(&snap).Error if err != nil { diff --git a/internal/mcp/tenant_isolation_test.go b/internal/mcp/tenant_isolation_test.go new file mode 100644 index 0000000..653ab90 --- /dev/null +++ b/internal/mcp/tenant_isolation_test.go @@ -0,0 +1,627 @@ +// Package mcp tests the merge-gate invariant for RAN-19/RAN-39: every +// GraphRAG-backed MCP tool (and the legacy svcGraph-backed tools rewired +// onto GraphRAG) must scope its response to the tenant carried by the +// X-Tenant-ID header — overlapping data ingested for two tenants under +// the same service_name, trace_id, span_id, log template, and snapshot +// time must never leak across tenant boundaries. +package mcp + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/RandomCodeSpace/otelcontext/internal/graphrag" + "github.com/RandomCodeSpace/otelcontext/internal/storage" + "github.com/RandomCodeSpace/otelcontext/internal/vectordb" +) + +// tenants exercised by the test. The third row uses an empty header to +// prove that absence-of-header collapses to storage.DefaultTenantID, which +// is also a real ingest target so we get a meaningful response (not just +// vacuous emptiness). +var isolationCallers = []struct { + name string + header string + scoped string + otherSeeded []string +}{ + {name: "acme", header: "acme", scoped: "acme", otherSeeded: []string{"beta", storage.DefaultTenantID}}, + {name: "beta", header: "beta", scoped: "beta", otherSeeded: []string{"acme", storage.DefaultTenantID}}, + {name: "no_header_default", header: "", scoped: storage.DefaultTenantID, otherSeeded: []string{"acme", "beta"}}, +} + +// allTenants is the set of tenants we actually seed. Used by leak scans +// regardless of which caller is currently being asserted. +var allTenants = []string{"acme", "beta", storage.DefaultTenantID} + +// markersFor builds the (own, others) marker pair for a given caller. +// Markers are tenant-stamped strings that appear inside service names, +// operation names, log bodies, and anomaly evidence — so a textual scan +// of the JSON response is sufficient to detect a cross-tenant leak. +func markersFor(scoped string, others []string) (own []string, leak []string) { + own = []string{ + scoped + "-orders", + scoped + "-marker", + scoped + "-op", + } + for _, t := range others { + leak = append(leak, + t+"-orders", + t+"-marker", + t+"-op", + t+"-anomaly-marker", + ) + } + return own, leak +} + +// setupTenantIsolationServer wires an in-process MCP server against an +// in-memory SQLite repo and a started GraphRAG. The background refresh, +// snapshot, and anomaly loops are stretched to "never" inside the test +// window so the only state that lands in the stores is the data the test +// seeds explicitly — making leak assertions deterministic. +func setupTenantIsolationServer(t *testing.T) (*httptest.Server, *graphrag.GraphRAG, *storage.Repository, *vectordb.Index) { + t.Helper() + + db, err := storage.NewDatabase("sqlite", ":memory:") + if err != nil { + t.Fatalf("NewDatabase: %v", err) + } + if err := storage.AutoMigrateModels(db, "sqlite"); err != nil { + t.Fatalf("AutoMigrateModels: %v", err) + } + if err := graphrag.AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("AutoMigrateGraphRAG: %v", err) + } + repo := storage.NewRepositoryFromDB(db, "sqlite") + + vIdx := vectordb.New(1000) + + cfg := graphrag.DefaultConfig() + cfg.RefreshEvery = 24 * time.Hour + cfg.SnapshotEvery = 24 * time.Hour + cfg.AnomalyEvery = 24 * time.Hour + cfg.WorkerCount = 4 + + g := graphrag.New(repo, vIdx, nil, nil, cfg) + bgCtx, cancel := context.WithCancel(context.Background()) + go g.Start(bgCtx) + + srv := New(repo, nil, nil, vIdx) + srv.SetGraphRAG(g) + + httpSrv := httptest.NewServer(srv.Handler()) + + t.Cleanup(func() { + httpSrv.Close() + cancel() + g.Stop() + _ = repo.Close() + }) + + return httpSrv, g, repo, vIdx +} + +// seedTenant ingests a small but representative slice of telemetry for +// tenant T: a parent OK span, a child ERROR span, a matching ERROR log, +// a vector-index doc, an injected anomaly, a persisted investigation, +// and a graph snapshot row. All identifiers (trace_id, span_id) collide +// across tenants on purpose — the tenant slice is the only thing keeping +// them apart. +func seedTenant(t *testing.T, g *graphrag.GraphRAG, repo *storage.Repository, vIdx *vectordb.Index, tenant string, ts time.Time) { + t.Helper() + + service := tenant + "-orders" + op := tenant + "-op-checkout" + logBody := tenant + "-marker connection refused upstream" + traceID := "trace-shared" + rootSpanID := "span-root" + childSpanID := "span-child" + + // Root span (OK). + g.OnSpanIngested(storage.Span{ + TenantID: tenant, + TraceID: traceID, + SpanID: rootSpanID, + OperationName: "/checkout", + ServiceName: service, + Status: "STATUS_CODE_OK", + StartTime: ts, + EndTime: ts.Add(2 * time.Millisecond), + Duration: 2000, + }) + + // Child span (ERROR), parented to root → upstream walk lands on the + // per-tenant root span. + g.OnSpanIngested(storage.Span{ + TenantID: tenant, + TraceID: traceID, + SpanID: childSpanID, + ParentSpanID: rootSpanID, + OperationName: op, + ServiceName: service, + Status: "STATUS_CODE_ERROR", + StartTime: ts.Add(time.Millisecond), + EndTime: ts.Add(2 * time.Millisecond), + Duration: 1000, + }) + + // Log carrying the per-tenant marker — drives Drain clustering and + // CorrelatedSignals; the body is also stored in the vector index. + g.OnLogIngested(storage.Log{ + TenantID: tenant, + TraceID: traceID, + SpanID: childSpanID, + ServiceName: service, + Severity: "ERROR", + Body: logBody, + Timestamp: ts.Add(2 * time.Millisecond), + }) + + // Vector index doc — find_similar_logs path is keyed by tenant. + vIdx.Add(0, tenant, service, "ERROR", logBody) + + // Inject a per-tenant anomaly directly so AnomalyTimeline has + // something to return without depending on the anomaly detector + // loop (which is throttled to 24h in this fixture). + g.RegisterAnomaly(tenant, graphrag.AnomalyNode{ + ID: tenant + "-anomaly-1", + Type: graphrag.AnomalyErrorSpike, + Severity: graphrag.SeverityCritical, + Service: service, + Evidence: tenant + "-anomaly-marker error_rate=0.95", + Timestamp: ts.Add(3 * time.Millisecond), + }) + + // Snapshot row — insert directly so we control the tenant_id and ID + // (takeSnapshot is the production loop, but it is package-private). + snap := graphrag.GraphSnapshot{ + TenantID: tenant, + ID: "snap-" + tenant, + CreatedAt: ts, + Nodes: json.RawMessage(`[{"name":"` + service + `","marker":"` + tenant + `-marker"}]`), + Edges: json.RawMessage(`[]`), + ServiceCount: 1, + AvgHealthScore: 0.5, + } + if err := repo.DB().Create(&snap).Error; err != nil { + t.Fatalf("seed snapshot for %q: %v", tenant, err) + } +} + +// waitForServiceMaps polls until every seeded tenant's ServiceMap reflects +// at least one service. Required because OnSpanIngested is async. +func waitForServiceMaps(t *testing.T, g *graphrag.GraphRAG, tenants []string) { + t.Helper() + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + ok := true + for _, tn := range tenants { + ctx := storage.WithTenantContext(context.Background(), tn) + if len(g.ServiceMap(ctx, 0)) == 0 { + ok = false + break + } + } + if ok { + return + } + time.Sleep(20 * time.Millisecond) + } + t.Fatalf("timed out waiting for ServiceMap to reflect ingested spans for %v", tenants) +} + +// seedInvestigations relies on the in-memory state already being warm +// (see waitForServiceMaps). PersistInvestigation reaches into ImpactAnalysis +// internally, which reads from the per-tenant ServiceStore. +func seedInvestigations(t *testing.T, g *graphrag.GraphRAG, ts time.Time) { + t.Helper() + for _, tenant := range allTenants { + service := tenant + "-orders" + chain := graphrag.ErrorChainResult{ + RootCause: &graphrag.RootCauseInfo{ + Service: service, + Operation: tenant + "-op-checkout", + ErrorMessage: tenant + "-marker connection refused upstream", + SpanID: "span-child", + TraceID: "trace-shared", + }, + SpanChain: []graphrag.SpanNode{{ + ID: "span-child", + TraceID: "trace-shared", + Service: service, + Operation: tenant + "-op-checkout", + IsError: true, + Timestamp: ts, + }}, + TraceID: "trace-shared", + } + g.PersistInvestigation(tenant, service, []graphrag.ErrorChainResult{chain}, nil) + } +} + +// callTool sends a JSON-RPC tools/call request to the test MCP server +// with the given X-Tenant-ID header (omitted when empty) and returns the +// inner ToolCallResult — i.e., the structure the LLM client would see. +// Also returns the concatenated text payload across content items, which +// is what tenant-leak assertions actually scan. +func callTool(t *testing.T, ts *httptest.Server, headerTenant, name string, args map[string]any) (ToolCallResult, string) { + t.Helper() + if args == nil { + args = map[string]any{} + } + rpcReq := map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": map[string]any{ + "name": name, + "arguments": args, + }, + } + body, err := json.Marshal(rpcReq) + if err != nil { + t.Fatalf("marshal rpc: %v", err) + } + req, err := http.NewRequest(http.MethodPost, ts.URL, bytes.NewReader(body)) + if err != nil { + t.Fatalf("new request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + if headerTenant != "" { + req.Header.Set("X-Tenant-ID", headerTenant) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("rpc do: %v", err) + } + defer resp.Body.Close() + raw, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("read body: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("rpc status %d: %s", resp.StatusCode, raw) + } + + var rpcResp struct { + JSONRPC string `json:"jsonrpc"` + ID any `json:"id"` + Result ToolCallResult `json:"result"` + Error *RPCError `json:"error"` + } + if err := json.Unmarshal(raw, &rpcResp); err != nil { + t.Fatalf("unmarshal rpc: %v\nraw: %s", err, raw) + } + if rpcResp.Error != nil { + t.Fatalf("rpc error %d: %s", rpcResp.Error.Code, rpcResp.Error.Message) + } + + var sb strings.Builder + for _, c := range rpcResp.Result.Content { + sb.WriteString(c.Text) + if c.Resource != nil { + sb.WriteString(c.Resource.Text) + } + } + return rpcResp.Result, sb.String() +} + +// assertNoLeak fails if any of leakMarkers appears in body. ownMarker is +// optional — when non-empty it must appear, proving the tool returned +// real per-tenant data and not just a vacuous empty result. +func assertNoLeak(t *testing.T, label, body, ownMarker string, leakMarkers []string) { + t.Helper() + if ownMarker != "" && !strings.Contains(body, ownMarker) { + t.Errorf("[%s] expected own marker %q in response, body=%s", label, ownMarker, truncate(body)) + } + for _, m := range leakMarkers { + if strings.Contains(body, m) { + t.Errorf("[%s] CROSS-TENANT LEAK: foreign marker %q present in response, body=%s", label, m, truncate(body)) + } + } +} + +func truncate(s string) string { + const max = 800 + if len(s) <= max { + return s + } + return s[:max] + "…(truncated)" +} + +// TestMCP_TenantIsolation_AllGraphRAGTools is the merge gate for RAN-19. +// For every GraphRAG-backed (and GraphRAG-rewired) MCP tool, it issues +// the same call from three callers — X-Tenant-ID: acme, X-Tenant-ID: beta, +// no header — against overlapping seeded data and asserts each response +// contains only the caller-tenant's data and never leaks another tenant's +// service name, log marker, operation, anomaly, or snapshot row. +func TestMCP_TenantIsolation_AllGraphRAGTools(t *testing.T) { + ts, g, repo, vIdx := setupTenantIsolationServer(t) + + now := time.Now().Add(-time.Minute) // a hair in the past so since=now-15m sees us + + for _, tenant := range allTenants { + seedTenant(t, g, repo, vIdx, tenant, now) + } + waitForServiceMaps(t, g, allTenants) + seedInvestigations(t, g, now) + + // Resolve investigation IDs per tenant (PersistInvestigation generates + // them internally; we discover them by querying after the fact, then + // hand them back into get_investigation in the per-caller assertions). + invIDsByTenant := map[string]string{} + for _, tenant := range allTenants { + ctx := storage.WithTenantContext(context.Background(), tenant) + invs, err := g.GetInvestigations(ctx, "", "", "", 10) + if err != nil { + t.Fatalf("GetInvestigations(%s): %v", tenant, err) + } + if len(invs) == 0 { + t.Fatalf("expected at least one persisted investigation for %s, got 0", tenant) + } + invIDsByTenant[tenant] = invs[0].ID + } + + // snapshot lookup time — slightly in the future so "<= at" matches every + // seeded row regardless of microsecond drift. + snapAt := time.Now().Add(time.Minute).UTC().Format(time.RFC3339) + + for _, caller := range isolationCallers { + caller := caller + ownMarkers, leakMarkers := markersFor(caller.scoped, caller.otherSeeded) + // At minimum the response should reference the caller's service + // for queries that are service-shaped. ownMarkers is intentionally + // kept as the canonical "anything tenant-tagged" set in case future + // assertions want it; per-tool checks pick the most relevant one. + ownService := caller.scoped + "-orders" + ownLogMarker := caller.scoped + "-marker" + ownAnomalyMarker := caller.scoped + "-anomaly-marker" + _ = ownMarkers + + // --- in-memory GraphRAG tools --- + + t.Run(caller.name+"/get_service_map", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_service_map", nil) + assertNoLeak(t, "get_service_map", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/get_service_health", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_service_health", map[string]any{ + "service_name": ownService, + }) + assertNoLeak(t, "get_service_health", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/get_error_chains", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_error_chains", map[string]any{ + "service": ownService, + "time_range": "1h", + "limit": 10, + }) + assertNoLeak(t, "get_error_chains", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/trace_graph", func(t *testing.T) { + // trace_id collides across tenants; correct routing must surface + // only the caller's per-tenant operation/service. + _, body := callTool(t, ts, caller.header, "trace_graph", map[string]any{ + "trace_id": "trace-shared", + }) + assertNoLeak(t, "trace_graph", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/impact_analysis", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "impact_analysis", map[string]any{ + "service": ownService, + "depth": 3, + }) + assertNoLeak(t, "impact_analysis", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/root_cause_analysis", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "root_cause_analysis", map[string]any{ + "service": ownService, + "time_range": "1h", + }) + // RankedCause carries Service + Operation, so the caller's + // own service name MUST appear; an empty result here would + // silently regress the tool to a vacuous "[]" response that + // trivially "passes" leak checks (review feedback fix). + assertNoLeak(t, "root_cause_analysis", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/correlated_signals", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "correlated_signals", map[string]any{ + "service": ownService, + "time_range": "1h", + }) + // CorrelatedSignals collects logs/metrics for the service, so the + // per-tenant log marker should appear. + assertNoLeak(t, "correlated_signals", body, ownLogMarker, leakMarkers) + }) + + t.Run(caller.name+"/get_anomaly_timeline", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_anomaly_timeline", nil) + assertNoLeak(t, "get_anomaly_timeline", body, ownAnomalyMarker, leakMarkers) + }) + + // --- DB-backed GraphRAG tools --- + + t.Run(caller.name+"/get_investigations", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_investigations", nil) + assertNoLeak(t, "get_investigations", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/get_investigation_by_id_own_tenant", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_investigation", map[string]any{ + "investigation_id": invIDsByTenant[caller.scoped], + }) + assertNoLeak(t, "get_investigation/own", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/get_investigation_by_id_other_tenant_blocks", func(t *testing.T) { + // Asking by another tenant's ID must NOT return that row — id- + // guessing would otherwise leak across tenants. The handler + // surfaces a tool-level error result, which is fine; what + // matters is that the foreign tenant's data does not appear. + otherTenant := caller.otherSeeded[0] + _, body := callTool(t, ts, caller.header, "get_investigation", map[string]any{ + "investigation_id": invIDsByTenant[otherTenant], + }) + assertNoLeak(t, "get_investigation/cross-tenant", body, "", leakMarkers) + }) + + t.Run(caller.name+"/get_graph_snapshot", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_graph_snapshot", map[string]any{ + "time": snapAt, + }) + // Snapshot rows are tagged with the tenant marker so the leak + // scan covers both ID prefixes (snap-acme/snap-beta/snap-default) + // and the inline node markers. + assertNoLeak(t, "get_graph_snapshot", body, "snap-"+caller.scoped, leakMarkers) + }) + + // --- vectordb-backed tool (Drain path is exercised by ingestion above) --- + + t.Run(caller.name+"/find_similar_logs", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "find_similar_logs", map[string]any{ + "query": "connection refused upstream", + "limit": 10, + }) + assertNoLeak(t, "find_similar_logs", body, ownLogMarker, leakMarkers) + }) + + // --- Legacy/rewired surface --- + // get_system_graph is rewired onto GraphRAG by RAN-39, so the same + // per-tenant invariants apply. + t.Run(caller.name+"/get_system_graph", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_system_graph", nil) + assertNoLeak(t, "get_system_graph", body, ownService, leakMarkers) + }) + } +} + +// TestMCP_TenantIsolation_DrainClusterIDsStayPerTenant proves that two +// tenants writing identical log bodies under an *identical* service name +// do not share a single shared LogClusterNode. Drain itself is currently +// a shared miner — without per-tenant SignalStore partitioning the same +// (template, service) pair would collapse to one cluster row visible to +// both tenants. The test inspects the actual LogClusterNodes returned by +// CorrelatedSignals (not just the response text) and asserts each tenant +// only ever sees rows tagged with its own marker. +func TestMCP_TenantIsolation_DrainClusterIDsStayPerTenant(t *testing.T) { + ts, g, _, _ := setupTenantIsolationServer(t) + now := time.Now().Add(-time.Minute) + + // Identical service AND identical log template across tenants — Drain + // is a shared miner so the (service, templateID) cluster key would + // collide if SignalStore weren't tenant-partitioned. The body marker + // is the only per-tenant differentiator. + const sharedService = "shared-orders" + const sharedTrace = "trace-shared" + const sharedSpan = "span-shared" + + for _, tenant := range []string{"acme", "beta"} { + g.OnSpanIngested(storage.Span{ + TenantID: tenant, + TraceID: sharedTrace, + SpanID: sharedSpan, + ServiceName: sharedService, + OperationName: "/checkout", + Status: "STATUS_CODE_ERROR", + StartTime: now, + EndTime: now.Add(time.Millisecond), + Duration: 1000, + }) + g.OnLogIngested(storage.Log{ + TenantID: tenant, + TraceID: sharedTrace, + SpanID: sharedSpan, + ServiceName: sharedService, + Severity: "ERROR", + Body: tenant + "-marker upstream connection refused", + Timestamp: now.Add(time.Millisecond), + }) + } + + ctxA := storage.WithTenantContext(context.Background(), "acme") + ctxB := storage.WithTenantContext(context.Background(), "beta") + + // Wait for both tenants' SignalStores to surface the cluster row. + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + a := g.CorrelatedSignals(ctxA, sharedService, now.Add(-time.Hour)) + b := g.CorrelatedSignals(ctxB, sharedService, now.Add(-time.Hour)) + if a != nil && b != nil && len(a.ErrorLogs) >= 1 && len(b.ErrorLogs) >= 1 { + break + } + time.Sleep(20 * time.Millisecond) + } + + sigA := g.CorrelatedSignals(ctxA, sharedService, now.Add(-time.Hour)) + sigB := g.CorrelatedSignals(ctxB, sharedService, now.Add(-time.Hour)) + if sigA == nil || len(sigA.ErrorLogs) == 0 { + t.Fatalf("acme CorrelatedSignals returned no ErrorLogs — Drain/SignalStore did not see the seeded log") + } + if sigB == nil || len(sigB.ErrorLogs) == 0 { + t.Fatalf("beta CorrelatedSignals returned no ErrorLogs — Drain/SignalStore did not see the seeded log") + } + + // Per-tenant cluster row content must carry only that tenant's marker. + // We probe both Template and SampleLog because Drain stores the + // templated form on Template and the original body on SampleLog, and + // both should be uncontaminated. + checkClusters := func(name string, clusters []graphrag.LogClusterNode, ownMarker, foreignMarker string) []string { + t.Helper() + var ids []string + for _, lc := range clusters { + ids = append(ids, lc.ID) + joined := lc.Template + "\n" + lc.SampleLog + if !strings.Contains(joined, ownMarker) { + t.Errorf("[%s] cluster %q missing own marker %q (template=%q sample=%q)", name, lc.ID, ownMarker, lc.Template, lc.SampleLog) + } + if strings.Contains(joined, foreignMarker) { + t.Errorf("[%s] cluster %q LEAKED foreign marker %q (template=%q sample=%q)", name, lc.ID, foreignMarker, lc.Template, lc.SampleLog) + } + } + return ids + } + idsA := checkClusters("acme", sigA.ErrorLogs, "acme-marker", "beta-marker") + idsB := checkClusters("beta", sigB.ErrorLogs, "beta-marker", "acme-marker") + + // The cluster IDs themselves can be identical across tenants (Drain ID + // is service-scoped, not tenant-scoped) — that is precisely WHY the + // SignalStore partition matters: without it, the same key would point + // at one shared row. Surface this fact in the test record so a future + // refactor that makes IDs tenant-stamped doesn't accidentally weaken + // the assertion above. + t.Logf("drain cluster IDs: acme=%v beta=%v", idsA, idsB) + + // End-to-end probe: the same isolation must hold via the MCP HTTP + // surface, not just the in-process API. + for _, scoped := range []string{"acme", "beta"} { + _, body := callTool(t, ts, scoped, "correlated_signals", map[string]any{ + "service": sharedService, + "time_range": "1h", + }) + other := "beta" + if scoped == "beta" { + other = "acme" + } + if !strings.Contains(body, scoped+"-marker") { + t.Errorf("%s correlated_signals (HTTP) missing own marker, body=%s", scoped, truncate(body)) + } + if strings.Contains(body, other+"-marker") { + t.Errorf("%s correlated_signals (HTTP) leaked %s marker, body=%s", scoped, other, truncate(body)) + } + } +} + diff --git a/internal/mcp/tools.go b/internal/mcp/tools.go index 91df67b..96b1fe1 100644 --- a/internal/mcp/tools.go +++ b/internal/mcp/tools.go @@ -286,9 +286,9 @@ func (s *Server) toolHandler(ctx context.Context, name string, args map[string]a }() switch name { case "get_system_graph": - return s.toolGetSystemGraph(args) + return s.toolGetSystemGraph(ctx, args) case "get_service_health": - return s.toolGetServiceHealth(args) + return s.toolGetServiceHealth(ctx, args) case "search_logs": return s.toolSearchLogs(ctx, args) case "tail_logs": @@ -320,11 +320,11 @@ func (s *Server) toolHandler(ctx context.Context, name string, args map[string]a case "correlated_signals": return s.toolCorrelatedSignals(ctx, args) case "get_investigations": - return s.toolGetInvestigations(args) + return s.toolGetInvestigations(ctx, args) case "get_investigation": - return s.toolGetInvestigationByID(args) + return s.toolGetInvestigationByID(ctx, args) case "get_graph_snapshot": - return s.toolGetGraphSnapshot(args) + return s.toolGetGraphSnapshot(ctx, args) case "get_anomaly_timeline": return s.toolGetAnomalyTimeline(ctx, args) default: @@ -334,7 +334,28 @@ func (s *Server) toolHandler(ctx context.Context, name string, args map[string]a // --- Tool implementations --- -func (s *Server) toolGetSystemGraph(_ map[string]any) ToolCallResult { +// toolGetSystemGraph returns a tenant-scoped service topology snapshot. +// +// When GraphRAG is wired (the default in production) the response is built +// from its per-tenant ServiceMap and AllServiceEdges, so two tenants with +// overlapping service names cannot see each other's nodes or edges. The +// legacy *graph.Graph remains as a fallback for boot windows when GraphRAG +// is still warming up; that fallback is cross-tenant by construction and +// is the documented legacy code path called out in RAN-39. +func (s *Server) toolGetSystemGraph(ctx context.Context, _ map[string]any) ToolCallResult { + if s.graphRAG != nil { + entries := s.graphRAG.ServiceMap(mcpCtx(ctx), 0) + edges := s.graphRAG.AllServiceEdges(mcpCtx(ctx)) + payload := map[string]any{ + "services": entries, + "edges": edges, + } + data, err := json.MarshalIndent(payload, "", " ") + if err != nil { + return errorResult(fmt.Sprintf("failed to marshal system graph: %v", err)) + } + return textResult(string(data)) + } if s.svcGraph == nil { return errorResult("service graph not yet initialized") } @@ -346,11 +367,26 @@ func (s *Server) toolGetSystemGraph(_ map[string]any) ToolCallResult { return textResult(string(data)) } -func (s *Server) toolGetServiceHealth(args map[string]any) ToolCallResult { +// toolGetServiceHealth returns the ServiceMap entry for svcName scoped to +// the tenant on ctx. Falls back to the legacy svcGraph snapshot when +// GraphRAG is not yet wired. +func (s *Server) toolGetServiceHealth(ctx context.Context, args map[string]any) ToolCallResult { svcName, _ := args["service_name"].(string) if svcName == "" { return errorResult("service_name is required") } + if s.graphRAG != nil { + for _, entry := range s.graphRAG.ServiceMap(mcpCtx(ctx), 0) { + if entry.Service != nil && entry.Service.Name == svcName { + data, err := json.MarshalIndent(entry, "", " ") + if err != nil { + return errorResult(fmt.Sprintf("failed to marshal service health: %v", err)) + } + return textResult(string(data)) + } + } + return textResult(fmt.Sprintf("service %q not found in the current tenant window", svcName)) + } if s.svcGraph == nil { return errorResult("service graph not yet initialized") } @@ -749,7 +785,7 @@ func (s *Server) toolCorrelatedSignals(ctx context.Context, args map[string]any) return textResult(string(data)) } -func (s *Server) toolGetInvestigations(args map[string]any) ToolCallResult { +func (s *Server) toolGetInvestigations(ctx context.Context, args map[string]any) ToolCallResult { if s.graphRAG == nil { return errorResult("GraphRAG not initialized") } @@ -758,7 +794,7 @@ func (s *Server) toolGetInvestigations(args map[string]any) ToolCallResult { status, _ := args["status"].(string) limit := argInt(args, "limit", 20) - investigations, err := s.graphRAG.GetInvestigations(service, severity, status, limit) + investigations, err := s.graphRAG.GetInvestigations(ctx, service, severity, status, limit) if err != nil { return errorResult(fmt.Sprintf("failed to query investigations: %v", err)) } @@ -769,7 +805,7 @@ func (s *Server) toolGetInvestigations(args map[string]any) ToolCallResult { return textResult(string(data)) } -func (s *Server) toolGetInvestigationByID(args map[string]any) ToolCallResult { +func (s *Server) toolGetInvestigationByID(ctx context.Context, args map[string]any) ToolCallResult { if s.graphRAG == nil { return errorResult("GraphRAG not initialized") } @@ -777,7 +813,7 @@ func (s *Server) toolGetInvestigationByID(args map[string]any) ToolCallResult { if id == "" { return errorResult("investigation_id is required") } - inv, err := s.graphRAG.GetInvestigation(id) + inv, err := s.graphRAG.GetInvestigation(ctx, id) if err != nil { return errorResult(fmt.Sprintf("investigation not found: %v", err)) } @@ -788,7 +824,7 @@ func (s *Server) toolGetInvestigationByID(args map[string]any) ToolCallResult { return textResult(string(data)) } -func (s *Server) toolGetGraphSnapshot(args map[string]any) ToolCallResult { +func (s *Server) toolGetGraphSnapshot(ctx context.Context, args map[string]any) ToolCallResult { if s.graphRAG == nil { return errorResult("GraphRAG not initialized") } @@ -797,7 +833,7 @@ func (s *Server) toolGetGraphSnapshot(args map[string]any) ToolCallResult { if at.IsZero() { at = time.Now() } - snap, err := s.graphRAG.GetGraphSnapshot(at) + snap, err := s.graphRAG.GetGraphSnapshot(ctx, at) if err != nil { return errorResult(fmt.Sprintf("no snapshot found: %v", err)) } diff --git a/internal/mcp/tools_ran20_test.go b/internal/mcp/tools_ran20_test.go new file mode 100644 index 0000000..7477ae5 --- /dev/null +++ b/internal/mcp/tools_ran20_test.go @@ -0,0 +1,79 @@ +package mcp + +import ( + "context" + "strings" + "testing" + + "github.com/RandomCodeSpace/otelcontext/internal/storage" + "github.com/RandomCodeSpace/otelcontext/internal/vectordb" +) + +// TestFindSimilarLogs_TenantIsolation is the RAN-20 acceptance bar for the MCP +// surface. Two tenants with unique marker strings in their log bodies query +// find_similar_logs; each tenant's response must never contain the other's +// markers. +func TestFindSimilarLogs_TenantIsolation(t *testing.T) { + idx := vectordb.New(1_000) + idx.Add(101, "acme", "checkout", "ERROR", "payment gateway timeout acme-secret-charge-id-abc") + idx.Add(102, "acme", "checkout", "ERROR", "payment gateway refused acme-only-marker-xyz") + idx.Add(201, "globex", "auth", "ERROR", "payment gateway token expired globex-secret-session-123") + idx.Add(202, "globex", "auth", "ERROR", "payment gateway 500 internal globex-only-marker-qqq") + + srv := &Server{vectorIdx: idx, defaultTenant: storage.DefaultTenantID} + args := map[string]any{"query": "payment gateway", "limit": float64(50)} + + // Acme + acmeRes := srv.toolFindSimilarLogs(storage.WithTenantContext(context.Background(), "acme"), args) + if acmeRes.IsError { + t.Fatalf("acme call errored: %+v", acmeRes) + } + acmeBody := concatContent(acmeRes.Content) + for _, forbidden := range []string{"globex-secret-session-123", "globex-only-marker-qqq", `"LogID": 201`, `"LogID": 202`} { + if strings.Contains(acmeBody, forbidden) { + t.Fatalf("acme leaked globex content %q in body:\n%s", forbidden, acmeBody) + } + } + if !strings.Contains(acmeBody, "acme-secret-charge-id-abc") && !strings.Contains(acmeBody, "acme-only-marker-xyz") { + t.Fatalf("acme did not receive its own rows:\n%s", acmeBody) + } + + // Globex + gRes := srv.toolFindSimilarLogs(storage.WithTenantContext(context.Background(), "globex"), args) + if gRes.IsError { + t.Fatalf("globex call errored: %+v", gRes) + } + gBody := concatContent(gRes.Content) + for _, forbidden := range []string{"acme-secret-charge-id-abc", "acme-only-marker-xyz", `"LogID": 101`, `"LogID": 102`} { + if strings.Contains(gBody, forbidden) { + t.Fatalf("globex leaked acme content %q in body:\n%s", forbidden, gBody) + } + } +} + +// TestFindSimilarLogs_NoTenantFallsBackToDefault proves that a context with no +// tenant value is coerced to the server default — it must NOT bleed into +// another tenant's rows. +func TestFindSimilarLogs_NoTenantFallsBackToDefault(t *testing.T) { + idx := vectordb.New(100) + idx.Add(1, "acme", "svc", "ERROR", "acme secret body only") + + srv := &Server{vectorIdx: idx, defaultTenant: storage.DefaultTenantID} + args := map[string]any{"query": "secret body"} + + res := srv.toolFindSimilarLogs(context.Background(), args) + if res.IsError { + t.Fatalf("unexpected error: %+v", res) + } + if strings.Contains(concatContent(res.Content), "acme secret body only") { + t.Fatalf("no-tenant call leaked acme content:\n%s", concatContent(res.Content)) + } +} + +func concatContent(items []ContentItem) string { + var b strings.Builder + for _, c := range items { + b.WriteString(c.Text) + } + return b.String() +} diff --git a/internal/storage/factory.go b/internal/storage/factory.go index 6d2aa7c..733e142 100644 --- a/internal/storage/factory.go +++ b/internal/storage/factory.go @@ -178,6 +178,15 @@ func AutoMigrateModels(db *gorm.DB, driver string) error { return fmt.Errorf("failed to migrate database: %w", err) } + // RAN-21: retire the pre-composite standalone unique index on traces.trace_id. + // AutoMigrate never drops indexes that no longer appear on struct tags, so on + // pre-existing databases the old uniqueIndex would persist and still block + // cross-tenant trace_id reuse. This is idempotent across drivers and a no-op + // on fresh databases. + if err := dropLegacyTraceIDUniqueIndex(db, driver); err != nil { + log.Printf("⚠️ legacy trace_id unique index drop failed: %v", err) + } + // Drop foreign keys that AutoMigrate may have created (MySQL) if driver == "mysql" { db.Exec("ALTER TABLE spans DROP FOREIGN KEY fk_traces_spans") diff --git a/internal/storage/log_repo.go b/internal/storage/log_repo.go index dfaae97..e57be6b 100644 --- a/internal/storage/log_repo.go +++ b/internal/storage/log_repo.go @@ -143,6 +143,33 @@ func (r *Repository) UpdateLogInsight(ctx context.Context, logID uint, insight s return nil } +// ListRecentHighSeverityLogsAllTenants returns recent logs of the given +// severity across EVERY tenant, each row carrying its own TenantID. This is an +// administrative read used exclusively by the vector index's startup +// hydration path, which fans rows out to per-tenant shards. It is not exposed +// on any tenant-scoped API surface — tenant isolation for read paths must +// otherwise be preserved via the context-driven WHERE clause. +func (r *Repository) ListRecentHighSeverityLogsAllTenants(ctx context.Context, severity string, since, until time.Time, limit int) ([]Log, error) { + if limit <= 0 { + limit = 5000 + } + q := r.db.WithContext(ctx).Model(&Log{}) + if severity != "" { + q = q.Where("severity = ?", severity) + } + if !since.IsZero() { + q = q.Where("timestamp >= ?", since) + } + if !until.IsZero() { + q = q.Where("timestamp <= ?", until) + } + var logs []Log + if err := q.Order("timestamp desc").Limit(limit).Find(&logs).Error; err != nil { + return nil, fmt.Errorf("failed to list recent logs all tenants: %w", err) + } + return logs, nil +} + // PurgeLogs deletes logs older than the given timestamp in a single statement. // Suitable for SQLite; for Postgres at large retention volumes prefer PurgeLogsBatched. func (r *Repository) PurgeLogs(olderThan time.Time) (int64, error) { diff --git a/internal/storage/migrate_traces.go b/internal/storage/migrate_traces.go new file mode 100644 index 0000000..07558d2 --- /dev/null +++ b/internal/storage/migrate_traces.go @@ -0,0 +1,181 @@ +package storage + +import ( + "fmt" + "log" + "strings" + + "gorm.io/gorm" +) + +// dropLegacyTraceIDUniqueIndex removes any pre-RAN-21 standalone UNIQUE index +// that covers only traces.trace_id. From RAN-21 onward uniqueness is the +// composite idx_traces_tenant_trace_id on (tenant_id, trace_id); a surviving +// standalone unique index would silently block cross-tenant trace_id reuse. +// +// Discovery is structure-based (not name-based) because the legacy name varied +// across drivers and GORM versions. The composite index — which lists two +// columns — is never matched and therefore never dropped. +// +// Fresh databases never contain the legacy index, so this is a safe no-op on +// first boot. Invoked once per AutoMigrateModels call and is idempotent. +func dropLegacyTraceIDUniqueIndex(db *gorm.DB, driver string) error { + if db == nil { + return nil + } + driver = strings.ToLower(driver) + names, err := findLegacyTraceIDUniqueIndexes(db, driver) + if err != nil { + return err + } + for _, name := range names { + if name == "" { + continue + } + if err := dropIndexOnTraces(db, driver, name); err != nil { + return fmt.Errorf("drop legacy trace_id unique index %q: %w", name, err) + } + log.Printf("🧹 Dropped legacy single-column unique index on traces.trace_id: %s", name) + } + return nil +} + +// findLegacyTraceIDUniqueIndexes returns every UNIQUE index on the traces table +// whose single indexed column is trace_id. The composite RAN-21 index +// (tenant_id, trace_id) is excluded because it covers two columns. +func findLegacyTraceIDUniqueIndexes(db *gorm.DB, driver string) ([]string, error) { + switch driver { + case "sqlite", "": + // Enumerate every unique index on traces, then inspect its column list + // via PRAGMA index_info. SQLite auto-creates indexes for UNIQUE table + // constraints with names prefixed "sqlite_autoindex_" — those also + // surface here and are handled identically. Aliased to is_unique + // because SQLite treats "unique" as a reserved keyword even as an + // output alias. + type idxRow struct { + Name string `gorm:"column:name"` + IsUnique int `gorm:"column:is_unique"` + } + var idxs []idxRow + if err := db.Raw(`SELECT name, "unique" AS is_unique FROM pragma_index_list('traces')`).Scan(&idxs).Error; err != nil { + return nil, fmt.Errorf("pragma_index_list(traces): %w", err) + } + type colRow struct { + Name string `gorm:"column:name"` + } + var out []string + for _, ix := range idxs { + if ix.IsUnique != 1 { + continue + } + var cols []colRow + if err := db.Raw(fmt.Sprintf("SELECT name FROM pragma_index_info('%s')", ix.Name)).Scan(&cols).Error; err != nil { + return nil, fmt.Errorf("pragma_index_info(%s): %w", ix.Name, err) + } + if len(cols) == 1 && cols[0].Name == "trace_id" { + out = append(out, ix.Name) + } + } + return out, nil + + case "postgres", "postgresql": + // pg_index.indkey is an int2vector of attnums; join against + // pg_attribute to resolve column names. Filter to UNIQUE, non-primary + // indexes on the traces table covering exactly one column = trace_id. + var rows []indexNameRow + const q = ` +SELECT c.relname AS name +FROM pg_index i +JOIN pg_class c ON c.oid = i.indexrelid +JOIN pg_class t ON t.oid = i.indrelid +JOIN pg_namespace n ON n.oid = t.relnamespace +WHERE t.relname = 'traces' + AND n.nspname = ANY (current_schemas(false)) + AND i.indisunique + AND NOT i.indisprimary + AND i.indnatts = 1 + AND ( + SELECT attname FROM pg_attribute + WHERE attrelid = t.oid AND attnum = i.indkey[0] + ) = 'trace_id'` + if err := db.Raw(q).Scan(&rows).Error; err != nil { + return nil, fmt.Errorf("pg_index lookup: %w", err) + } + return flattenIndexNames(rows), nil + + case "mysql": + // information_schema.STATISTICS has one row per (index, seq_in_index). + // Group by index, count columns, and keep indexes where the sole + // column is trace_id and NON_UNIQUE=0. + var rows []indexNameRow + const q = ` +SELECT INDEX_NAME AS name +FROM information_schema.STATISTICS +WHERE TABLE_SCHEMA = DATABASE() + AND TABLE_NAME = 'traces' + AND NON_UNIQUE = 0 + AND INDEX_NAME <> 'PRIMARY' +GROUP BY INDEX_NAME +HAVING COUNT(*) = 1 + AND MAX(COLUMN_NAME) = 'trace_id'` + if err := db.Raw(q).Scan(&rows).Error; err != nil { + return nil, fmt.Errorf("information_schema.STATISTICS lookup: %w", err) + } + return flattenIndexNames(rows), nil + + case "sqlserver", "mssql": + // sys.indexes + sys.index_columns: one row per indexed column; keep + // unique, non-primary-key indexes on dbo.traces whose only column is + // trace_id. + var rows []indexNameRow + const q = ` +SELECT i.name AS name +FROM sys.indexes i +JOIN sys.objects t ON t.object_id = i.object_id +WHERE t.name = 'traces' + AND i.is_unique = 1 + AND i.is_primary_key = 0 + AND ( + SELECT COUNT(*) FROM sys.index_columns ic + WHERE ic.object_id = i.object_id AND ic.index_id = i.index_id + ) = 1 + AND EXISTS ( + SELECT 1 FROM sys.index_columns ic + JOIN sys.columns c ON c.object_id = ic.object_id AND c.column_id = ic.column_id + WHERE ic.object_id = i.object_id AND ic.index_id = i.index_id AND c.name = 'trace_id' + )` + if err := db.Raw(q).Scan(&rows).Error; err != nil { + return nil, fmt.Errorf("sys.indexes lookup: %w", err) + } + return flattenIndexNames(rows), nil + } + return nil, nil +} + +// indexNameRow is a single-column scan target shared by the non-SQLite branches. +// Keeping it package-level (rather than redeclared per-branch) lets +// flattenIndexNames project through a single concrete type. +type indexNameRow struct { + Name string `gorm:"column:name"` +} + +func flattenIndexNames(rows []indexNameRow) []string { + out := make([]string, 0, len(rows)) + for _, r := range rows { + out = append(out, r.Name) + } + return out +} + +// dropIndexOnTraces removes an index by name, per-driver. GORM's Migrator +// handles SQLite/Postgres/MySQL cleanly; for SQL Server we must qualify the +// DROP with the table name. +func dropIndexOnTraces(db *gorm.DB, driver, name string) error { + switch driver { + case "sqlserver", "mssql": + // T-SQL: DROP INDEX name ON table + return db.Exec(fmt.Sprintf("DROP INDEX [%s] ON [traces]", name)).Error + default: + return db.Migrator().DropIndex(&Trace{}, name) + } +} diff --git a/internal/storage/models.go b/internal/storage/models.go index 8585c61..b00c22e 100644 --- a/internal/storage/models.go +++ b/internal/storage/models.go @@ -84,11 +84,14 @@ const DefaultTenantID = "default" // Index strategy: single-column tenant_id is redundant — every tenant-scoped // query joins tenant_id with another filter (timestamp, service_name). The // leftmost column of a composite index satisfies single-column tenant lookups, -// so we only declare composites. +// so we only declare composites. TraceID uniqueness is scoped to (tenant_id, +// trace_id): distinct tenants may legitimately ingest identical trace_ids +// (RAN-21). The old standalone `uniqueIndex` on trace_id is dropped at +// migration time by dropLegacyTraceIDUniqueIndex. type Trace struct { ID uint `gorm:"primaryKey" json:"id"` - TenantID string `gorm:"size:64;default:'default';not null;index:idx_traces_tenant_ts,priority:1;index:idx_traces_tenant_service,priority:1" json:"tenant_id"` - TraceID string `gorm:"uniqueIndex;size:32;not null" json:"trace_id"` + TenantID string `gorm:"size:64;default:'default';not null;index:idx_traces_tenant_ts,priority:1;index:idx_traces_tenant_service,priority:1;uniqueIndex:idx_traces_tenant_trace_id,priority:1" json:"tenant_id"` + TraceID string `gorm:"size:32;not null;uniqueIndex:idx_traces_tenant_trace_id,priority:2" json:"trace_id"` ServiceName string `gorm:"size:255;index:idx_traces_tenant_service,priority:2" json:"service_name"` Duration int64 `gorm:"index" json:"duration"` // Microseconds DurationMs float64 `gorm:"-" json:"duration_ms"` diff --git a/internal/storage/trace_repo.go b/internal/storage/trace_repo.go index 65a541c..05eaf61 100644 --- a/internal/storage/trace_repo.go +++ b/internal/storage/trace_repo.go @@ -56,6 +56,9 @@ func (r *Repository) BatchCreateSpans(spans []Span) error { } // BatchCreateTraces inserts traces, skipping duplicates. +// Duplicate is defined per the composite uniqueIndex idx_traces_tenant_trace_id +// on (tenant_id, trace_id): a trace_id clash within the same tenant is ignored, +// while the same trace_id under a different tenant inserts cleanly. func (r *Repository) BatchCreateTraces(traces []Trace) error { if len(traces) == 0 { return nil @@ -67,6 +70,8 @@ func (r *Repository) BatchCreateTraces(traces []Trace) error { } // CreateTrace inserts a new trace, skipping if it already exists. +// Uniqueness is per idx_traces_tenant_trace_id (tenant_id, trace_id), so the +// same trace_id across tenants is allowed. func (r *Repository) CreateTrace(trace Trace) error { if strings.ToLower(r.driver) == "mysql" { return r.db.Clauses(clause.Insert{Modifier: "IGNORE"}).Create(&trace).Error @@ -75,8 +80,9 @@ func (r *Repository) CreateTrace(trace Trace) error { } // GetTrace returns a trace by ID with its spans and logs, scoped to the tenant on ctx. -// The Preloaded Spans and Logs are additionally filtered so a trace ID collision -// across tenants cannot leak cross-tenant children. +// Trace uniqueness is composite (tenant_id, trace_id), so the same trace_id can +// legitimately exist in multiple tenants; the Preloaded Spans and Logs are +// filtered by tenant_id as defense-in-depth against cross-tenant child leakage. func (r *Repository) GetTrace(ctx context.Context, traceID string) (*Trace, error) { tenant := TenantFromContext(ctx) var trace Trace diff --git a/internal/storage/trace_tenant_test.go b/internal/storage/trace_tenant_test.go new file mode 100644 index 0000000..103603a --- /dev/null +++ b/internal/storage/trace_tenant_test.go @@ -0,0 +1,232 @@ +package storage + +import ( + "sort" + "testing" + "time" +) + +// TestCreateTrace_SameTraceIDAcrossTenants_Succeeds proves that identical +// trace_ids under distinct tenants coexist after RAN-21. Before the fix the +// second insert silently collapsed into a no-op via OnConflict.DoNothing +// because the old standalone uniqueIndex on trace_id matched regardless of +// tenant. +func TestCreateTrace_SameTraceIDAcrossTenants_Succeeds(t *testing.T) { + repo := newTestRepo(t) + now := time.Now().UTC() + traceID := "shared-trace-id-0001" + + acme := Trace{TenantID: "acme", TraceID: traceID, ServiceName: "svc-a", Status: "OK", Timestamp: now} + beta := Trace{TenantID: "beta", TraceID: traceID, ServiceName: "svc-b", Status: "OK", Timestamp: now} + + if err := repo.CreateTrace(acme); err != nil { + t.Fatalf("CreateTrace(acme): %v", err) + } + if err := repo.CreateTrace(beta); err != nil { + t.Fatalf("CreateTrace(beta): %v", err) + } + + var rows []Trace + if err := repo.db.Where("trace_id = ?", traceID).Find(&rows).Error; err != nil { + t.Fatalf("Find: %v", err) + } + if got, want := len(rows), 2; got != want { + t.Fatalf("expected %d trace rows for shared trace_id, got %d", want, got) + } + seenTenants := map[string]string{} + for _, r := range rows { + seenTenants[r.TenantID] = r.ServiceName + } + if seenTenants["acme"] != "svc-a" || seenTenants["beta"] != "svc-b" { + t.Fatalf("tenant→service mismatch: %+v", seenTenants) + } + + acmeCtx := WithTenantContext(t.Context(), "acme") + betaCtx := WithTenantContext(t.Context(), "beta") + got, err := repo.GetTrace(acmeCtx, traceID) + if err != nil { + t.Fatalf("GetTrace(acme): %v", err) + } + if got.ServiceName != "svc-a" || got.TenantID != "acme" { + t.Fatalf("GetTrace(acme) returned wrong row: %+v", got) + } + got, err = repo.GetTrace(betaCtx, traceID) + if err != nil { + t.Fatalf("GetTrace(beta): %v", err) + } + if got.ServiceName != "svc-b" || got.TenantID != "beta" { + t.Fatalf("GetTrace(beta) returned wrong row: %+v", got) + } +} + +// TestCreateTrace_SameTraceIDSameTenant_IsIgnored proves the intra-tenant +// dedupe behaviour survives the composite-uniqueness change. Ingestion retries +// under the same tenant must still be absorbed by OnConflict.DoNothing. +func TestCreateTrace_SameTraceIDSameTenant_IsIgnored(t *testing.T) { + repo := newTestRepo(t) + now := time.Now().UTC() + traceID := "same-tenant-trace-0001" + + first := Trace{TenantID: "acme", TraceID: traceID, ServiceName: "svc-a", Status: "OK", Timestamp: now} + dup := Trace{TenantID: "acme", TraceID: traceID, ServiceName: "svc-a-renamed", Status: "OK", Timestamp: now.Add(time.Second)} + + if err := repo.CreateTrace(first); err != nil { + t.Fatalf("CreateTrace first: %v", err) + } + if err := repo.CreateTrace(dup); err != nil { + t.Fatalf("CreateTrace dup: %v", err) + } + + var rows []Trace + if err := repo.db.Where("tenant_id = ? AND trace_id = ?", "acme", traceID).Find(&rows).Error; err != nil { + t.Fatalf("Find: %v", err) + } + if got, want := len(rows), 1; got != want { + t.Fatalf("expected intra-tenant dedupe to keep %d row, got %d", want, got) + } + if rows[0].ServiceName != "svc-a" { + t.Fatalf("expected first-write-wins semantics, got %q", rows[0].ServiceName) + } +} + +// TestBatchCreateTraces_SameTraceIDAcrossTenants_Succeeds exercises the batch +// path, which uses the same OnConflict.DoNothing / INSERT IGNORE plumbing. +func TestBatchCreateTraces_SameTraceIDAcrossTenants_Succeeds(t *testing.T) { + repo := newTestRepo(t) + now := time.Now().UTC() + traceID := "batch-shared-0001" + + batch := []Trace{ + {TenantID: "acme", TraceID: traceID, ServiceName: "svc-a", Status: "OK", Timestamp: now}, + {TenantID: "beta", TraceID: traceID, ServiceName: "svc-b", Status: "OK", Timestamp: now}, + } + if err := repo.BatchCreateTraces(batch); err != nil { + t.Fatalf("BatchCreateTraces: %v", err) + } + + var rows []Trace + if err := repo.db.Where("trace_id = ?", traceID).Order("tenant_id ASC").Find(&rows).Error; err != nil { + t.Fatalf("Find: %v", err) + } + if got, want := len(rows), 2; got != want { + t.Fatalf("expected %d rows, got %d", want, got) + } + if rows[0].TenantID != "acme" || rows[1].TenantID != "beta" { + t.Fatalf("unexpected tenant ordering: %v", []string{rows[0].TenantID, rows[1].TenantID}) + } +} + +// TestGetTrace_IsolatesChildrenAcrossTenants confirms that when the same +// trace_id exists in two tenants, GetTrace returns each tenant's own spans +// and logs — no cross-tenant children leak through the Preload filter. +func TestGetTrace_IsolatesChildrenAcrossTenants(t *testing.T) { + repo := newTestRepo(t) + now := time.Now().UTC() + traceID := "preload-shared-0001" + + // Both tenants share trace_id but have distinct child payloads. + if err := repo.db.Create(&Trace{TenantID: "acme", TraceID: traceID, ServiceName: "svc-a", Status: "OK", Timestamp: now}).Error; err != nil { + t.Fatalf("Create acme trace: %v", err) + } + if err := repo.db.Create(&Trace{TenantID: "beta", TraceID: traceID, ServiceName: "svc-b", Status: "OK", Timestamp: now}).Error; err != nil { + t.Fatalf("Create beta trace: %v", err) + } + if err := repo.db.Create(&Span{TenantID: "acme", TraceID: traceID, SpanID: "acme-span", OperationName: "op-a", ServiceName: "svc-a", StartTime: now, EndTime: now}).Error; err != nil { + t.Fatalf("Create acme span: %v", err) + } + if err := repo.db.Create(&Span{TenantID: "beta", TraceID: traceID, SpanID: "beta-span", OperationName: "op-b", ServiceName: "svc-b", StartTime: now, EndTime: now}).Error; err != nil { + t.Fatalf("Create beta span: %v", err) + } + if err := repo.db.Create(&Log{TenantID: "acme", TraceID: traceID, SpanID: "acme-span", Severity: "INFO", Body: "acme log", ServiceName: "svc-a", Timestamp: now}).Error; err != nil { + t.Fatalf("Create acme log: %v", err) + } + if err := repo.db.Create(&Log{TenantID: "beta", TraceID: traceID, SpanID: "beta-span", Severity: "INFO", Body: "beta log", ServiceName: "svc-b", Timestamp: now}).Error; err != nil { + t.Fatalf("Create beta log: %v", err) + } + + cases := []struct { + tenant string + wantSvc string + wantOp string + wantLog string + }{ + {"acme", "svc-a", "op-a", "acme log"}, + {"beta", "svc-b", "op-b", "beta log"}, + } + for _, tc := range cases { + ctx := WithTenantContext(t.Context(), tc.tenant) + tr, err := repo.GetTrace(ctx, traceID) + if err != nil { + t.Fatalf("GetTrace(%s): %v", tc.tenant, err) + } + if tr.ServiceName != tc.wantSvc { + t.Fatalf("%s: wrong trace row: svc=%q", tc.tenant, tr.ServiceName) + } + if len(tr.Spans) != 1 || tr.Spans[0].OperationName != tc.wantOp { + names := []string{} + for _, s := range tr.Spans { + names = append(names, s.OperationName) + } + sort.Strings(names) + t.Fatalf("%s: span leak — got ops %v, want [%s]", tc.tenant, names, tc.wantOp) + } + if len(tr.Logs) != 1 || tr.Logs[0].Body != tc.wantLog { + bodies := []string{} + for _, l := range tr.Logs { + bodies = append(bodies, l.Body) + } + sort.Strings(bodies) + t.Fatalf("%s: log leak — got bodies %v, want [%s]", tc.tenant, bodies, tc.wantLog) + } + } +} + +// TestAutoMigrateModels_DropsLegacyTraceIDUniqueIndex proves the idempotent +// migration hook retires a pre-RAN-21 standalone unique index on +// traces.trace_id. Simulates an upgrade by creating the legacy index +// out-of-band and rerunning AutoMigrateModels. +func TestAutoMigrateModels_DropsLegacyTraceIDUniqueIndex(t *testing.T) { + repo := newTestRepo(t) + + // Install a legacy-shaped standalone unique index on traces.trace_id. + const legacyIdx = "idx_legacy_traces_trace_id" + if err := repo.db.Exec("CREATE UNIQUE INDEX " + legacyIdx + " ON traces(trace_id)").Error; err != nil { + t.Fatalf("create legacy index: %v", err) + } + if !repo.db.Migrator().HasIndex(&Trace{}, legacyIdx) { + t.Fatal("precondition: legacy index should exist") + } + + // Rerun migration — the hook must drop the legacy index. + if err := AutoMigrateModels(repo.db, "sqlite"); err != nil { + t.Fatalf("AutoMigrateModels: %v", err) + } + if repo.db.Migrator().HasIndex(&Trace{}, legacyIdx) { + t.Fatal("legacy standalone unique index on traces.trace_id was not dropped") + } + + // Cross-tenant reuse must now succeed end-to-end. + now := time.Now().UTC() + if err := repo.CreateTrace(Trace{TenantID: "acme", TraceID: "after-drop", ServiceName: "svc", Timestamp: now}); err != nil { + t.Fatalf("CreateTrace acme: %v", err) + } + if err := repo.CreateTrace(Trace{TenantID: "beta", TraceID: "after-drop", ServiceName: "svc", Timestamp: now}); err != nil { + t.Fatalf("CreateTrace beta: %v", err) + } + var n int64 + if err := repo.db.Model(&Trace{}).Where("trace_id = ?", "after-drop").Count(&n).Error; err != nil { + t.Fatalf("count: %v", err) + } + if n != 2 { + t.Fatalf("expected 2 rows across tenants, got %d", n) + } +} + +// TestAutoMigrateModels_CreatesCompositeTraceIDUniqueIndex asserts the new +// composite index landed with the expected name after a fresh migration. +func TestAutoMigrateModels_CreatesCompositeTraceIDUniqueIndex(t *testing.T) { + repo := newTestRepo(t) + if !repo.db.Migrator().HasIndex(&Trace{}, "idx_traces_tenant_trace_id") { + t.Fatal("expected composite uniqueIndex idx_traces_tenant_trace_id on traces") + } +} diff --git a/internal/vectordb/index.go b/internal/vectordb/index.go index 1333d57..99b804a 100644 --- a/internal/vectordb/index.go +++ b/internal/vectordb/index.go @@ -11,9 +11,20 @@ import ( "unicode" ) +// defaultTenantID is the tenant assigned when the caller passes an empty +// tenant string. Mirrors storage.DefaultTenantID; duplicated here to avoid +// pulling internal/storage into vectordb's import graph. +const defaultTenantID = "default" + // LogVector represents an indexed log entry. +// +// Tenant scopes the document so Search can return only the caller's tenant +// rows. The TF-IDF table is shared across tenants — global IDF still gives +// the right rarity signal — but the per-document tenant tag is enforced at +// query time so two tenants with overlapping log bodies stay isolated. type LogVector struct { LogID uint + Tenant string ServiceName string Severity string Body string @@ -23,6 +34,7 @@ type LogVector struct { // SearchResult is a single similarity hit. type SearchResult struct { LogID uint + Tenant string ServiceName string Severity string Body string @@ -50,8 +62,10 @@ func New(maxSize int) *Index { } } -// Add adds a log to the index. Thread-safe. -func (idx *Index) Add(logID uint, serviceName, severity, body string) { +// Add adds a log to the index. Thread-safe. Tenant is recorded with the +// document so Search can filter by it; an empty tenant collapses to +// the platform default at the boundary, matching storage.TenantFromContext. +func (idx *Index) Add(logID uint, tenant, serviceName, severity, body string) { if !shouldIndex(severity) { return } @@ -61,20 +75,48 @@ func (idx *Index) Add(logID uint, serviceName, severity, body string) { } tf := computeTF(tokens) + if tenant == "" { + tenant = defaultTenantID + } + idx.mu.Lock() defer idx.mu.Unlock() - // FIFO eviction — copy to new slice to release old backing array memory + // Tenant-aware FIFO eviction. When at cap, remove up to maxSize/10 of the + // oldest entries belonging to the inserting tenant so a noisy tenant + // cannot push another tenant's warm rows out of the index (availability + // isolation — the confidentiality invariant is enforced separately by + // doc.Tenant filtering in Search). The new backing slice also releases + // the old array memory on the next GC cycle. if len(idx.docs) >= idx.maxSize { - keep := idx.docs[idx.maxSize/10:] - newDocs := make([]LogVector, len(keep), idx.maxSize) - copy(newDocs, keep) - idx.docs = newDocs + toDrop := idx.maxSize / 10 + if toDrop < 1 { + toDrop = 1 + } + kept := make([]LogVector, 0, idx.maxSize) + droppedSame := 0 + for _, d := range idx.docs { + if droppedSame < toDrop && d.Tenant == tenant { + droppedSame++ + continue + } + kept = append(kept, d) + } + // Edge case: the inserting tenant has no prior entries while the + // index is at cap with other tenants' rows. Drop one globally-oldest + // entry so the new tenant can take its first slot. This is the only + // path where a tenant's entry can be evicted by another tenant, and + // it costs at most one row per brand-new tenant. + if droppedSame == 0 && len(kept) > 0 { + kept = kept[1:] + } + idx.docs = kept idx.dirty = true } idx.docs = append(idx.docs, LogVector{ LogID: logID, + Tenant: tenant, ServiceName: serviceName, Severity: severity, Body: body, @@ -83,11 +125,17 @@ func (idx *Index) Add(logID uint, serviceName, severity, body string) { idx.dirty = true } -// Search finds the top-k logs most similar to the query string. -func (idx *Index) Search(query string, k int) []SearchResult { +// Search finds the top-k logs most similar to the query string within +// tenant. Documents from other tenants are excluded — the IDF table stays +// global so rarity is computed against the whole corpus, but result rows +// are filtered to the caller's tenant. +func (idx *Index) Search(tenant, query string, k int) []SearchResult { if k <= 0 { k = 10 } + if tenant == "" { + tenant = defaultTenantID + } tokens := tokenize(query) if len(tokens) == 0 { return nil @@ -124,6 +172,9 @@ func (idx *Index) Search(query string, k int) []SearchResult { } results := make([]scored, 0, len(docs)) for _, doc := range docs { + if doc.Tenant != tenant { + continue + } docVec := make(map[string]float64, len(doc.vec)) for term, tf := range doc.vec { docVec[term] = tf * idfSnap[term] @@ -145,6 +196,7 @@ func (idx *Index) Search(query string, k int) []SearchResult { for i, r := range results { out[i] = SearchResult{ LogID: r.doc.LogID, + Tenant: r.doc.Tenant, ServiceName: r.doc.ServiceName, Severity: r.doc.Severity, Body: r.doc.Body, diff --git a/internal/vectordb/index_test.go b/internal/vectordb/index_test.go new file mode 100644 index 0000000..9b9186c --- /dev/null +++ b/internal/vectordb/index_test.go @@ -0,0 +1,136 @@ +package vectordb + +import ( + "strconv" + "sync" + "testing" +) + +// TestTenantIsolation_Search is the RAN-20 confidentiality bar: a query on +// tenant A never returns a document indexed under tenant B, even when the +// vocabularies collide on the query terms. +func TestTenantIsolation_Search(t *testing.T) { + idx := New(1_000) + + idx.Add(1, "acme", "checkout", "ERROR", "payment gateway timeout upstream") + idx.Add(2, "acme", "checkout", "ERROR", "payment gateway refused charge") + idx.Add(10, "globex", "auth", "ERROR", "payment gateway token expired") + idx.Add(11, "globex", "auth", "ERROR", "payment gateway 500 internal error") + + acmeHits := idx.Search("acme", "payment gateway timeout", 10) + if len(acmeHits) == 0 { + t.Fatalf("acme search returned zero hits despite matching docs") + } + for _, h := range acmeHits { + if h.Tenant != "acme" || h.LogID >= 10 { + t.Fatalf("acme search leaked id=%d tenant=%q body=%q", h.LogID, h.Tenant, h.Body) + } + } + + globexHits := idx.Search("globex", "payment gateway token", 10) + if len(globexHits) == 0 { + t.Fatalf("globex search returned zero hits despite matching docs") + } + for _, h := range globexHits { + if h.Tenant != "globex" || h.LogID < 10 { + t.Fatalf("globex search leaked id=%d tenant=%q body=%q", h.LogID, h.Tenant, h.Body) + } + } +} + +// TestUnknownTenantReturnsEmpty proves a tenant with no indexed docs returns +// nothing even when other tenants have matching content. +func TestUnknownTenantReturnsEmpty(t *testing.T) { + idx := New(100) + idx.Add(1, "acme", "svc", "ERROR", "database connection refused upstream") + + if got := idx.Search("initech", "database connection", 10); len(got) != 0 { + t.Fatalf("unknown tenant saw %d cross-tenant hits", len(got)) + } +} + +// TestEmptyTenantCoercedToDefault verifies Add and Search coerce an empty +// tenant to the platform default so untenanted callers stay isolated from +// real tenants. +func TestEmptyTenantCoercedToDefault(t *testing.T) { + idx := New(100) + idx.Add(1, "", "svc", "ERROR", "network unreachable upstream host") + + if hits := idx.Search("", "network unreachable", 10); len(hits) != 1 { + t.Fatalf("search with empty tenant: want 1 hit, got %d", len(hits)) + } + if hits := idx.Search(defaultTenantID, "network unreachable", 10); len(hits) != 1 { + t.Fatalf("search with default tenant id: want 1 hit, got %d", len(hits)) + } + if hits := idx.Search("acme", "network unreachable", 10); len(hits) != 0 { + t.Fatalf("acme saw %d cross-tenant hits for default-tenant doc", len(hits)) + } +} + +// TestFIFOEvictionFairness is TechLead's requested assertion: a tenant that +// writes near-cap volume cannot evict another tenant's documents from the +// shared index. Under a naive global-FIFO policy tenant B's flood would +// remove tenant A's older entries and A would silently "lose" its warm +// rows — a confidentiality-safe but availability-breaking failure mode. +func TestFIFOEvictionFairness(t *testing.T) { + const cap = 200 + idx := New(cap) + + // Tenant A writes a small set of distinctive markers. + for i := 0; i < 5; i++ { + idx.Add(uint(1+i), "acme", "checkout", "ERROR", "acme-canary-marker alpha beta gamma "+strconv.Itoa(i)) + } + + // Tenant B floods the index well past the cap — enough to trigger + // multiple eviction cycles. + for i := 0; i < cap*4; i++ { + idx.Add(uint(10_000+i), "globex", "svc", "ERROR", "globex chatter filling the index "+strconv.Itoa(i)) + } + + // Every one of acme's canary rows must still be findable. + hits := idx.Search("acme", "acme-canary-marker alpha beta gamma", 20) + if len(hits) < 5 { + t.Fatalf("eviction unfairness: acme canaries evicted by globex flood. want >=5 hits, got %d", len(hits)) + } + seen := map[uint]bool{} + for _, h := range hits { + if h.Tenant != "acme" { + t.Fatalf("cross-tenant leak during eviction test: id=%d tenant=%q", h.LogID, h.Tenant) + } + seen[h.LogID] = true + } + for id := uint(1); id <= 5; id++ { + if !seen[id] { + t.Fatalf("acme canary id=%d missing after globex flood", id) + } + } +} + +// TestConcurrentTenantAddSearch pins down race-detector cleanliness and +// cross-tenant isolation under concurrent readers/writers. +func TestConcurrentTenantAddSearch(t *testing.T) { + idx := New(5_000) + var wg sync.WaitGroup + + for _, tenant := range []string{"acme", "globex"} { + wg.Add(2) + go func(ten string) { + defer wg.Done() + for i := 0; i < 500; i++ { + idx.Add(uint(i), ten, "svc", "ERROR", ten+" error kafka partition "+strconv.Itoa(i)) + } + }(tenant) + go func(ten string) { + defer wg.Done() + for i := 0; i < 500; i++ { + for _, h := range idx.Search(ten, "kafka partition", 5) { + if h.Tenant != ten { + t.Errorf("tenant %s saw cross-tenant hit tenant=%q body=%q", ten, h.Tenant, h.Body) + return + } + } + } + }(tenant) + } + wg.Wait() +} diff --git a/main.go b/main.go index c94be26..83f96cb 100644 --- a/main.go +++ b/main.go @@ -331,18 +331,18 @@ func main() { // Hydrate vector index from recent ERROR/WARN logs on startup (non-blocking). // Uses appCtx so SIGTERM during boot cancels the query before repo.Close(). + // Hydration is cross-tenant by design: each row lands tagged with its own + // TenantID via vectorIdx.Add so isolation is preserved at query time. The + // previous tenant-scoped GetLogsV2 call silently hydrated only the default + // tenant's rows — non-default tenants lost their warm index on every + // restart. bootWG.Add(1) go func() { defer bootWG.Done() - recentLogs, _, err := repo.GetLogsV2(appCtx, storage.LogFilter{ - Severity: "ERROR", - StartTime: time.Now().Add(-24 * time.Hour), - EndTime: time.Now(), - Limit: 5000, - }) + recentLogs, err := repo.ListRecentHighSeverityLogsAllTenants(appCtx, "ERROR", time.Now().Add(-24*time.Hour), time.Now(), 5000) if err == nil { for _, l := range recentLogs { - vectorIdx.Add(l.ID, l.ServiceName, l.Severity, l.Body) + vectorIdx.Add(l.ID, l.TenantID, l.ServiceName, l.Severity, l.Body) } slog.Info("🔍 Vector index hydrated from recent ERROR logs", "count", len(recentLogs)) } @@ -412,7 +412,7 @@ func main() { Timestamp: l.Timestamp, }) aiService.EnqueueLog(l) - vectorIdx.Add(l.ID, l.ServiceName, l.Severity, l.Body) + vectorIdx.Add(l.ID, l.TenantID, l.ServiceName, l.Severity, l.Body) eventHub.NotifyRefresh() if time.Since(start) > 100*time.Millisecond { slog.Warn("Slow broadcast/enqueue", "duration", time.Since(start))