diff --git a/internal/api/similar_handler.go b/internal/api/similar_handler.go index 1668801..ac0fe57 100644 --- a/internal/api/similar_handler.go +++ b/internal/api/similar_handler.go @@ -4,6 +4,8 @@ import ( "encoding/json" "net/http" "strconv" + + "github.com/RandomCodeSpace/otelcontext/internal/storage" ) // handleGetSimilarLogs handles GET /api/logs/similar?q=&limit=10 @@ -30,7 +32,8 @@ func (s *Server) handleGetSimilarLogs(w http.ResponseWriter, r *http.Request) { limit = 50 } - results := s.vectorIdx.Search(query, limit) + tenant := storage.TenantFromContext(r.Context()) + results := s.vectorIdx.Search(tenant, query, limit) w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{ diff --git a/internal/api/similar_handler_test.go b/internal/api/similar_handler_test.go new file mode 100644 index 0000000..69af324 --- /dev/null +++ b/internal/api/similar_handler_test.go @@ -0,0 +1,117 @@ +package api + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/RandomCodeSpace/otelcontext/internal/config" + "github.com/RandomCodeSpace/otelcontext/internal/vectordb" +) + +// TestSimilarHandler_TenantIsolation is the RAN-20 acceptance bar for the HTTP +// surface. Two tenants with distinct corpora query /api/logs/similar; each +// sees ZERO rows belonging to the other tenant. +func TestSimilarHandler_TenantIsolation(t *testing.T) { + idx := vectordb.New(1_000) + idx.Add(101, "acme", "checkout", "ERROR", "payment gateway timeout charging customer") + idx.Add(102, "acme", "checkout", "ERROR", "payment gateway refused charge insufficient funds") + idx.Add(201, "globex", "auth", "ERROR", "payment gateway token expired for session") + idx.Add(202, "globex", "auth", "ERROR", "payment gateway 500 internal error while authenticating") + + srv := &Server{vectorIdx: idx} + mux := http.NewServeMux() + mux.HandleFunc("GET /api/logs/similar", srv.handleGetSimilarLogs) + handler := TenantMiddleware(&config.Config{DefaultTenant: "default"})(mux) + + acmeIDs := map[float64]bool{101: true, 102: true} + globexIDs := map[float64]bool{201: true, 202: true} + + q := url.Values{} + q.Set("q", "payment gateway") + q.Set("limit", "50") + path := "/api/logs/similar?" + q.Encode() + + // Tenant A + aRec := httptest.NewRecorder() + aReq := httptest.NewRequest(http.MethodGet, path, nil) + aReq.Header.Set(TenantHeader, "acme") + handler.ServeHTTP(aRec, aReq) + if aRec.Code != http.StatusOK { + t.Fatalf("acme: want 200, got %d body=%q", aRec.Code, aRec.Body.String()) + } + acme := decodeResults(t, aRec) + if len(acme) == 0 { + t.Fatalf("acme got zero hits despite matching corpus") + } + for _, r := range acme { + if !acmeIDs[r.ID] { + t.Fatalf("acme leaked cross-tenant id=%v tenant=%q body=%q", r.ID, r.Tenant, r.Body) + } + } + + // Tenant B + gRec := httptest.NewRecorder() + gReq := httptest.NewRequest(http.MethodGet, path, nil) + gReq.Header.Set(TenantHeader, "globex") + handler.ServeHTTP(gRec, gReq) + if gRec.Code != http.StatusOK { + t.Fatalf("globex: want 200, got %d", gRec.Code) + } + globex := decodeResults(t, gRec) + if len(globex) == 0 { + t.Fatalf("globex got zero hits despite matching corpus") + } + for _, r := range globex { + if !globexIDs[r.ID] { + t.Fatalf("globex leaked cross-tenant id=%v tenant=%q body=%q", r.ID, r.Tenant, r.Body) + } + } +} + +// TestSimilarHandler_UnknownTenantReturnsEmpty confirms a request bearing an +// unknown tenant header returns zero results — the handler must not silently +// fall back to another tenant's rows. +func TestSimilarHandler_UnknownTenantReturnsEmpty(t *testing.T) { + idx := vectordb.New(100) + idx.Add(1, "acme", "svc", "ERROR", "database connection refused upstream") + + srv := &Server{vectorIdx: idx} + mux := http.NewServeMux() + mux.HandleFunc("GET /api/logs/similar", srv.handleGetSimilarLogs) + handler := TenantMiddleware(&config.Config{DefaultTenant: "default"})(mux) + + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/api/logs/similar?q=database+connection", nil) + req.Header.Set(TenantHeader, "initech") + handler.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("want 200, got %d", rec.Code) + } + if r := decodeResults(t, rec); len(r) != 0 { + t.Fatalf("unknown tenant saw %d cross-tenant hits", len(r)) + } +} + +type similarResult struct { + ID float64 `json:"LogID"` + Tenant string `json:"Tenant"` + ServiceName string `json:"ServiceName"` + Severity string `json:"Severity"` + Body string `json:"Body"` + Score float64 `json:"Score"` +} + +func decodeResults(t *testing.T, rec *httptest.ResponseRecorder) []similarResult { + t.Helper() + var env struct { + Results []similarResult `json:"results"` + } + if err := json.Unmarshal(rec.Body.Bytes(), &env); err != nil { + t.Fatalf("decode response: %v (body=%q)", err, rec.Body.String()) + } + return env.Results +} diff --git a/internal/graphrag/builder.go b/internal/graphrag/builder.go index 5653463..0de3519 100644 --- a/internal/graphrag/builder.go +++ b/internal/graphrag/builder.go @@ -154,6 +154,19 @@ func (g *GraphRAG) DroppedMetricsCount() int64 { return g.droppedMetrics.Load() // tests to assert cooldown behavior without requiring a live repo. func (g *GraphRAG) InvestigationInsertCount() int64 { return g.invInserts.Load() } +// RegisterAnomaly inserts an anomaly into the AnomalyStore for tenant. +// Mirrors PersistInvestigation's "tenant accepted explicitly" shape so +// out-of-band anomaly producers (synthetic detectors, integration tests, +// future external anomaly feeds) can land directly on the right tenant +// slice without going through the metric/error detection loops. Empty +// tenant collapses to storage.DefaultTenantID. +func (g *GraphRAG) RegisterAnomaly(tenant string, anomaly AnomalyNode) { + if tenant == "" { + tenant = storage.DefaultTenantID + } + g.storesForTenant(tenant).anomalies.AddAnomaly(anomaly) +} + // recordEventDrop increments the per-signal atomic counter and — when // a telemetry registry is wired — the Prometheus counter vec. func (g *GraphRAG) recordEventDrop(signal string) { diff --git a/internal/graphrag/clustering.go b/internal/graphrag/clustering.go index d1735bb..574a6ec 100644 --- a/internal/graphrag/clustering.go +++ b/internal/graphrag/clustering.go @@ -9,6 +9,8 @@ import ( "context" "fmt" "time" + + "github.com/RandomCodeSpace/otelcontext/internal/storage" ) // clusterLog runs the log body through Drain and upserts a LogClusterNode @@ -78,14 +80,11 @@ func (g *GraphRAG) SimilarErrors(ctx context.Context, clusterID string, k int) [ if query == "" && len(cluster.TemplateTokens) > 0 { query = joinTokens(cluster.TemplateTokens) } - // TODO(RAN-20): vectordb.Index.Search itself is not yet tenant-scoped on - // `main`, so we call the 2-arg signature here. Tenant isolation for the - // SignalStore lookup above is already enforced via storesFor(ctx); the - // vector hits are then narrowed by the EmittedBy edges in this tenant's - // SignalStore on lines below, so cross-tenant hits cannot surface even - // while the underlying vector index is shared. - _ = ctx - results := g.vectorIdx.Search(query, k*2) // over-fetch to filter + // vectordb.Index.Search takes the tenant string directly; resolve it + // from ctx via the same storage helper used by storesFor so both sides + // agree on coercion rules (empty → DefaultTenantID). + tenant := storage.TenantFromContext(ctx) + results := g.vectorIdx.Search(tenant, query, k*2) // over-fetch to filter // Map results back to log clusters. seen := map[string]bool{clusterID: true} diff --git a/internal/mcp/tenant_isolation_test.go b/internal/mcp/tenant_isolation_test.go new file mode 100644 index 0000000..653ab90 --- /dev/null +++ b/internal/mcp/tenant_isolation_test.go @@ -0,0 +1,627 @@ +// Package mcp tests the merge-gate invariant for RAN-19/RAN-39: every +// GraphRAG-backed MCP tool (and the legacy svcGraph-backed tools rewired +// onto GraphRAG) must scope its response to the tenant carried by the +// X-Tenant-ID header — overlapping data ingested for two tenants under +// the same service_name, trace_id, span_id, log template, and snapshot +// time must never leak across tenant boundaries. +package mcp + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/RandomCodeSpace/otelcontext/internal/graphrag" + "github.com/RandomCodeSpace/otelcontext/internal/storage" + "github.com/RandomCodeSpace/otelcontext/internal/vectordb" +) + +// tenants exercised by the test. The third row uses an empty header to +// prove that absence-of-header collapses to storage.DefaultTenantID, which +// is also a real ingest target so we get a meaningful response (not just +// vacuous emptiness). +var isolationCallers = []struct { + name string + header string + scoped string + otherSeeded []string +}{ + {name: "acme", header: "acme", scoped: "acme", otherSeeded: []string{"beta", storage.DefaultTenantID}}, + {name: "beta", header: "beta", scoped: "beta", otherSeeded: []string{"acme", storage.DefaultTenantID}}, + {name: "no_header_default", header: "", scoped: storage.DefaultTenantID, otherSeeded: []string{"acme", "beta"}}, +} + +// allTenants is the set of tenants we actually seed. Used by leak scans +// regardless of which caller is currently being asserted. +var allTenants = []string{"acme", "beta", storage.DefaultTenantID} + +// markersFor builds the (own, others) marker pair for a given caller. +// Markers are tenant-stamped strings that appear inside service names, +// operation names, log bodies, and anomaly evidence — so a textual scan +// of the JSON response is sufficient to detect a cross-tenant leak. +func markersFor(scoped string, others []string) (own []string, leak []string) { + own = []string{ + scoped + "-orders", + scoped + "-marker", + scoped + "-op", + } + for _, t := range others { + leak = append(leak, + t+"-orders", + t+"-marker", + t+"-op", + t+"-anomaly-marker", + ) + } + return own, leak +} + +// setupTenantIsolationServer wires an in-process MCP server against an +// in-memory SQLite repo and a started GraphRAG. The background refresh, +// snapshot, and anomaly loops are stretched to "never" inside the test +// window so the only state that lands in the stores is the data the test +// seeds explicitly — making leak assertions deterministic. +func setupTenantIsolationServer(t *testing.T) (*httptest.Server, *graphrag.GraphRAG, *storage.Repository, *vectordb.Index) { + t.Helper() + + db, err := storage.NewDatabase("sqlite", ":memory:") + if err != nil { + t.Fatalf("NewDatabase: %v", err) + } + if err := storage.AutoMigrateModels(db, "sqlite"); err != nil { + t.Fatalf("AutoMigrateModels: %v", err) + } + if err := graphrag.AutoMigrateGraphRAG(db); err != nil { + t.Fatalf("AutoMigrateGraphRAG: %v", err) + } + repo := storage.NewRepositoryFromDB(db, "sqlite") + + vIdx := vectordb.New(1000) + + cfg := graphrag.DefaultConfig() + cfg.RefreshEvery = 24 * time.Hour + cfg.SnapshotEvery = 24 * time.Hour + cfg.AnomalyEvery = 24 * time.Hour + cfg.WorkerCount = 4 + + g := graphrag.New(repo, vIdx, nil, nil, cfg) + bgCtx, cancel := context.WithCancel(context.Background()) + go g.Start(bgCtx) + + srv := New(repo, nil, nil, vIdx) + srv.SetGraphRAG(g) + + httpSrv := httptest.NewServer(srv.Handler()) + + t.Cleanup(func() { + httpSrv.Close() + cancel() + g.Stop() + _ = repo.Close() + }) + + return httpSrv, g, repo, vIdx +} + +// seedTenant ingests a small but representative slice of telemetry for +// tenant T: a parent OK span, a child ERROR span, a matching ERROR log, +// a vector-index doc, an injected anomaly, a persisted investigation, +// and a graph snapshot row. All identifiers (trace_id, span_id) collide +// across tenants on purpose — the tenant slice is the only thing keeping +// them apart. +func seedTenant(t *testing.T, g *graphrag.GraphRAG, repo *storage.Repository, vIdx *vectordb.Index, tenant string, ts time.Time) { + t.Helper() + + service := tenant + "-orders" + op := tenant + "-op-checkout" + logBody := tenant + "-marker connection refused upstream" + traceID := "trace-shared" + rootSpanID := "span-root" + childSpanID := "span-child" + + // Root span (OK). + g.OnSpanIngested(storage.Span{ + TenantID: tenant, + TraceID: traceID, + SpanID: rootSpanID, + OperationName: "/checkout", + ServiceName: service, + Status: "STATUS_CODE_OK", + StartTime: ts, + EndTime: ts.Add(2 * time.Millisecond), + Duration: 2000, + }) + + // Child span (ERROR), parented to root → upstream walk lands on the + // per-tenant root span. + g.OnSpanIngested(storage.Span{ + TenantID: tenant, + TraceID: traceID, + SpanID: childSpanID, + ParentSpanID: rootSpanID, + OperationName: op, + ServiceName: service, + Status: "STATUS_CODE_ERROR", + StartTime: ts.Add(time.Millisecond), + EndTime: ts.Add(2 * time.Millisecond), + Duration: 1000, + }) + + // Log carrying the per-tenant marker — drives Drain clustering and + // CorrelatedSignals; the body is also stored in the vector index. + g.OnLogIngested(storage.Log{ + TenantID: tenant, + TraceID: traceID, + SpanID: childSpanID, + ServiceName: service, + Severity: "ERROR", + Body: logBody, + Timestamp: ts.Add(2 * time.Millisecond), + }) + + // Vector index doc — find_similar_logs path is keyed by tenant. + vIdx.Add(0, tenant, service, "ERROR", logBody) + + // Inject a per-tenant anomaly directly so AnomalyTimeline has + // something to return without depending on the anomaly detector + // loop (which is throttled to 24h in this fixture). + g.RegisterAnomaly(tenant, graphrag.AnomalyNode{ + ID: tenant + "-anomaly-1", + Type: graphrag.AnomalyErrorSpike, + Severity: graphrag.SeverityCritical, + Service: service, + Evidence: tenant + "-anomaly-marker error_rate=0.95", + Timestamp: ts.Add(3 * time.Millisecond), + }) + + // Snapshot row — insert directly so we control the tenant_id and ID + // (takeSnapshot is the production loop, but it is package-private). + snap := graphrag.GraphSnapshot{ + TenantID: tenant, + ID: "snap-" + tenant, + CreatedAt: ts, + Nodes: json.RawMessage(`[{"name":"` + service + `","marker":"` + tenant + `-marker"}]`), + Edges: json.RawMessage(`[]`), + ServiceCount: 1, + AvgHealthScore: 0.5, + } + if err := repo.DB().Create(&snap).Error; err != nil { + t.Fatalf("seed snapshot for %q: %v", tenant, err) + } +} + +// waitForServiceMaps polls until every seeded tenant's ServiceMap reflects +// at least one service. Required because OnSpanIngested is async. +func waitForServiceMaps(t *testing.T, g *graphrag.GraphRAG, tenants []string) { + t.Helper() + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + ok := true + for _, tn := range tenants { + ctx := storage.WithTenantContext(context.Background(), tn) + if len(g.ServiceMap(ctx, 0)) == 0 { + ok = false + break + } + } + if ok { + return + } + time.Sleep(20 * time.Millisecond) + } + t.Fatalf("timed out waiting for ServiceMap to reflect ingested spans for %v", tenants) +} + +// seedInvestigations relies on the in-memory state already being warm +// (see waitForServiceMaps). PersistInvestigation reaches into ImpactAnalysis +// internally, which reads from the per-tenant ServiceStore. +func seedInvestigations(t *testing.T, g *graphrag.GraphRAG, ts time.Time) { + t.Helper() + for _, tenant := range allTenants { + service := tenant + "-orders" + chain := graphrag.ErrorChainResult{ + RootCause: &graphrag.RootCauseInfo{ + Service: service, + Operation: tenant + "-op-checkout", + ErrorMessage: tenant + "-marker connection refused upstream", + SpanID: "span-child", + TraceID: "trace-shared", + }, + SpanChain: []graphrag.SpanNode{{ + ID: "span-child", + TraceID: "trace-shared", + Service: service, + Operation: tenant + "-op-checkout", + IsError: true, + Timestamp: ts, + }}, + TraceID: "trace-shared", + } + g.PersistInvestigation(tenant, service, []graphrag.ErrorChainResult{chain}, nil) + } +} + +// callTool sends a JSON-RPC tools/call request to the test MCP server +// with the given X-Tenant-ID header (omitted when empty) and returns the +// inner ToolCallResult — i.e., the structure the LLM client would see. +// Also returns the concatenated text payload across content items, which +// is what tenant-leak assertions actually scan. +func callTool(t *testing.T, ts *httptest.Server, headerTenant, name string, args map[string]any) (ToolCallResult, string) { + t.Helper() + if args == nil { + args = map[string]any{} + } + rpcReq := map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": map[string]any{ + "name": name, + "arguments": args, + }, + } + body, err := json.Marshal(rpcReq) + if err != nil { + t.Fatalf("marshal rpc: %v", err) + } + req, err := http.NewRequest(http.MethodPost, ts.URL, bytes.NewReader(body)) + if err != nil { + t.Fatalf("new request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + if headerTenant != "" { + req.Header.Set("X-Tenant-ID", headerTenant) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("rpc do: %v", err) + } + defer resp.Body.Close() + raw, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("read body: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("rpc status %d: %s", resp.StatusCode, raw) + } + + var rpcResp struct { + JSONRPC string `json:"jsonrpc"` + ID any `json:"id"` + Result ToolCallResult `json:"result"` + Error *RPCError `json:"error"` + } + if err := json.Unmarshal(raw, &rpcResp); err != nil { + t.Fatalf("unmarshal rpc: %v\nraw: %s", err, raw) + } + if rpcResp.Error != nil { + t.Fatalf("rpc error %d: %s", rpcResp.Error.Code, rpcResp.Error.Message) + } + + var sb strings.Builder + for _, c := range rpcResp.Result.Content { + sb.WriteString(c.Text) + if c.Resource != nil { + sb.WriteString(c.Resource.Text) + } + } + return rpcResp.Result, sb.String() +} + +// assertNoLeak fails if any of leakMarkers appears in body. ownMarker is +// optional — when non-empty it must appear, proving the tool returned +// real per-tenant data and not just a vacuous empty result. +func assertNoLeak(t *testing.T, label, body, ownMarker string, leakMarkers []string) { + t.Helper() + if ownMarker != "" && !strings.Contains(body, ownMarker) { + t.Errorf("[%s] expected own marker %q in response, body=%s", label, ownMarker, truncate(body)) + } + for _, m := range leakMarkers { + if strings.Contains(body, m) { + t.Errorf("[%s] CROSS-TENANT LEAK: foreign marker %q present in response, body=%s", label, m, truncate(body)) + } + } +} + +func truncate(s string) string { + const max = 800 + if len(s) <= max { + return s + } + return s[:max] + "…(truncated)" +} + +// TestMCP_TenantIsolation_AllGraphRAGTools is the merge gate for RAN-19. +// For every GraphRAG-backed (and GraphRAG-rewired) MCP tool, it issues +// the same call from three callers — X-Tenant-ID: acme, X-Tenant-ID: beta, +// no header — against overlapping seeded data and asserts each response +// contains only the caller-tenant's data and never leaks another tenant's +// service name, log marker, operation, anomaly, or snapshot row. +func TestMCP_TenantIsolation_AllGraphRAGTools(t *testing.T) { + ts, g, repo, vIdx := setupTenantIsolationServer(t) + + now := time.Now().Add(-time.Minute) // a hair in the past so since=now-15m sees us + + for _, tenant := range allTenants { + seedTenant(t, g, repo, vIdx, tenant, now) + } + waitForServiceMaps(t, g, allTenants) + seedInvestigations(t, g, now) + + // Resolve investigation IDs per tenant (PersistInvestigation generates + // them internally; we discover them by querying after the fact, then + // hand them back into get_investigation in the per-caller assertions). + invIDsByTenant := map[string]string{} + for _, tenant := range allTenants { + ctx := storage.WithTenantContext(context.Background(), tenant) + invs, err := g.GetInvestigations(ctx, "", "", "", 10) + if err != nil { + t.Fatalf("GetInvestigations(%s): %v", tenant, err) + } + if len(invs) == 0 { + t.Fatalf("expected at least one persisted investigation for %s, got 0", tenant) + } + invIDsByTenant[tenant] = invs[0].ID + } + + // snapshot lookup time — slightly in the future so "<= at" matches every + // seeded row regardless of microsecond drift. + snapAt := time.Now().Add(time.Minute).UTC().Format(time.RFC3339) + + for _, caller := range isolationCallers { + caller := caller + ownMarkers, leakMarkers := markersFor(caller.scoped, caller.otherSeeded) + // At minimum the response should reference the caller's service + // for queries that are service-shaped. ownMarkers is intentionally + // kept as the canonical "anything tenant-tagged" set in case future + // assertions want it; per-tool checks pick the most relevant one. + ownService := caller.scoped + "-orders" + ownLogMarker := caller.scoped + "-marker" + ownAnomalyMarker := caller.scoped + "-anomaly-marker" + _ = ownMarkers + + // --- in-memory GraphRAG tools --- + + t.Run(caller.name+"/get_service_map", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_service_map", nil) + assertNoLeak(t, "get_service_map", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/get_service_health", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_service_health", map[string]any{ + "service_name": ownService, + }) + assertNoLeak(t, "get_service_health", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/get_error_chains", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_error_chains", map[string]any{ + "service": ownService, + "time_range": "1h", + "limit": 10, + }) + assertNoLeak(t, "get_error_chains", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/trace_graph", func(t *testing.T) { + // trace_id collides across tenants; correct routing must surface + // only the caller's per-tenant operation/service. + _, body := callTool(t, ts, caller.header, "trace_graph", map[string]any{ + "trace_id": "trace-shared", + }) + assertNoLeak(t, "trace_graph", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/impact_analysis", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "impact_analysis", map[string]any{ + "service": ownService, + "depth": 3, + }) + assertNoLeak(t, "impact_analysis", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/root_cause_analysis", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "root_cause_analysis", map[string]any{ + "service": ownService, + "time_range": "1h", + }) + // RankedCause carries Service + Operation, so the caller's + // own service name MUST appear; an empty result here would + // silently regress the tool to a vacuous "[]" response that + // trivially "passes" leak checks (review feedback fix). + assertNoLeak(t, "root_cause_analysis", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/correlated_signals", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "correlated_signals", map[string]any{ + "service": ownService, + "time_range": "1h", + }) + // CorrelatedSignals collects logs/metrics for the service, so the + // per-tenant log marker should appear. + assertNoLeak(t, "correlated_signals", body, ownLogMarker, leakMarkers) + }) + + t.Run(caller.name+"/get_anomaly_timeline", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_anomaly_timeline", nil) + assertNoLeak(t, "get_anomaly_timeline", body, ownAnomalyMarker, leakMarkers) + }) + + // --- DB-backed GraphRAG tools --- + + t.Run(caller.name+"/get_investigations", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_investigations", nil) + assertNoLeak(t, "get_investigations", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/get_investigation_by_id_own_tenant", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_investigation", map[string]any{ + "investigation_id": invIDsByTenant[caller.scoped], + }) + assertNoLeak(t, "get_investigation/own", body, ownService, leakMarkers) + }) + + t.Run(caller.name+"/get_investigation_by_id_other_tenant_blocks", func(t *testing.T) { + // Asking by another tenant's ID must NOT return that row — id- + // guessing would otherwise leak across tenants. The handler + // surfaces a tool-level error result, which is fine; what + // matters is that the foreign tenant's data does not appear. + otherTenant := caller.otherSeeded[0] + _, body := callTool(t, ts, caller.header, "get_investigation", map[string]any{ + "investigation_id": invIDsByTenant[otherTenant], + }) + assertNoLeak(t, "get_investigation/cross-tenant", body, "", leakMarkers) + }) + + t.Run(caller.name+"/get_graph_snapshot", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_graph_snapshot", map[string]any{ + "time": snapAt, + }) + // Snapshot rows are tagged with the tenant marker so the leak + // scan covers both ID prefixes (snap-acme/snap-beta/snap-default) + // and the inline node markers. + assertNoLeak(t, "get_graph_snapshot", body, "snap-"+caller.scoped, leakMarkers) + }) + + // --- vectordb-backed tool (Drain path is exercised by ingestion above) --- + + t.Run(caller.name+"/find_similar_logs", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "find_similar_logs", map[string]any{ + "query": "connection refused upstream", + "limit": 10, + }) + assertNoLeak(t, "find_similar_logs", body, ownLogMarker, leakMarkers) + }) + + // --- Legacy/rewired surface --- + // get_system_graph is rewired onto GraphRAG by RAN-39, so the same + // per-tenant invariants apply. + t.Run(caller.name+"/get_system_graph", func(t *testing.T) { + _, body := callTool(t, ts, caller.header, "get_system_graph", nil) + assertNoLeak(t, "get_system_graph", body, ownService, leakMarkers) + }) + } +} + +// TestMCP_TenantIsolation_DrainClusterIDsStayPerTenant proves that two +// tenants writing identical log bodies under an *identical* service name +// do not share a single shared LogClusterNode. Drain itself is currently +// a shared miner — without per-tenant SignalStore partitioning the same +// (template, service) pair would collapse to one cluster row visible to +// both tenants. The test inspects the actual LogClusterNodes returned by +// CorrelatedSignals (not just the response text) and asserts each tenant +// only ever sees rows tagged with its own marker. +func TestMCP_TenantIsolation_DrainClusterIDsStayPerTenant(t *testing.T) { + ts, g, _, _ := setupTenantIsolationServer(t) + now := time.Now().Add(-time.Minute) + + // Identical service AND identical log template across tenants — Drain + // is a shared miner so the (service, templateID) cluster key would + // collide if SignalStore weren't tenant-partitioned. The body marker + // is the only per-tenant differentiator. + const sharedService = "shared-orders" + const sharedTrace = "trace-shared" + const sharedSpan = "span-shared" + + for _, tenant := range []string{"acme", "beta"} { + g.OnSpanIngested(storage.Span{ + TenantID: tenant, + TraceID: sharedTrace, + SpanID: sharedSpan, + ServiceName: sharedService, + OperationName: "/checkout", + Status: "STATUS_CODE_ERROR", + StartTime: now, + EndTime: now.Add(time.Millisecond), + Duration: 1000, + }) + g.OnLogIngested(storage.Log{ + TenantID: tenant, + TraceID: sharedTrace, + SpanID: sharedSpan, + ServiceName: sharedService, + Severity: "ERROR", + Body: tenant + "-marker upstream connection refused", + Timestamp: now.Add(time.Millisecond), + }) + } + + ctxA := storage.WithTenantContext(context.Background(), "acme") + ctxB := storage.WithTenantContext(context.Background(), "beta") + + // Wait for both tenants' SignalStores to surface the cluster row. + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + a := g.CorrelatedSignals(ctxA, sharedService, now.Add(-time.Hour)) + b := g.CorrelatedSignals(ctxB, sharedService, now.Add(-time.Hour)) + if a != nil && b != nil && len(a.ErrorLogs) >= 1 && len(b.ErrorLogs) >= 1 { + break + } + time.Sleep(20 * time.Millisecond) + } + + sigA := g.CorrelatedSignals(ctxA, sharedService, now.Add(-time.Hour)) + sigB := g.CorrelatedSignals(ctxB, sharedService, now.Add(-time.Hour)) + if sigA == nil || len(sigA.ErrorLogs) == 0 { + t.Fatalf("acme CorrelatedSignals returned no ErrorLogs — Drain/SignalStore did not see the seeded log") + } + if sigB == nil || len(sigB.ErrorLogs) == 0 { + t.Fatalf("beta CorrelatedSignals returned no ErrorLogs — Drain/SignalStore did not see the seeded log") + } + + // Per-tenant cluster row content must carry only that tenant's marker. + // We probe both Template and SampleLog because Drain stores the + // templated form on Template and the original body on SampleLog, and + // both should be uncontaminated. + checkClusters := func(name string, clusters []graphrag.LogClusterNode, ownMarker, foreignMarker string) []string { + t.Helper() + var ids []string + for _, lc := range clusters { + ids = append(ids, lc.ID) + joined := lc.Template + "\n" + lc.SampleLog + if !strings.Contains(joined, ownMarker) { + t.Errorf("[%s] cluster %q missing own marker %q (template=%q sample=%q)", name, lc.ID, ownMarker, lc.Template, lc.SampleLog) + } + if strings.Contains(joined, foreignMarker) { + t.Errorf("[%s] cluster %q LEAKED foreign marker %q (template=%q sample=%q)", name, lc.ID, foreignMarker, lc.Template, lc.SampleLog) + } + } + return ids + } + idsA := checkClusters("acme", sigA.ErrorLogs, "acme-marker", "beta-marker") + idsB := checkClusters("beta", sigB.ErrorLogs, "beta-marker", "acme-marker") + + // The cluster IDs themselves can be identical across tenants (Drain ID + // is service-scoped, not tenant-scoped) — that is precisely WHY the + // SignalStore partition matters: without it, the same key would point + // at one shared row. Surface this fact in the test record so a future + // refactor that makes IDs tenant-stamped doesn't accidentally weaken + // the assertion above. + t.Logf("drain cluster IDs: acme=%v beta=%v", idsA, idsB) + + // End-to-end probe: the same isolation must hold via the MCP HTTP + // surface, not just the in-process API. + for _, scoped := range []string{"acme", "beta"} { + _, body := callTool(t, ts, scoped, "correlated_signals", map[string]any{ + "service": sharedService, + "time_range": "1h", + }) + other := "beta" + if scoped == "beta" { + other = "acme" + } + if !strings.Contains(body, scoped+"-marker") { + t.Errorf("%s correlated_signals (HTTP) missing own marker, body=%s", scoped, truncate(body)) + } + if strings.Contains(body, other+"-marker") { + t.Errorf("%s correlated_signals (HTTP) leaked %s marker, body=%s", scoped, other, truncate(body)) + } + } +} + diff --git a/internal/mcp/tools.go b/internal/mcp/tools.go index 7d5b8af..96b1fe1 100644 --- a/internal/mcp/tools.go +++ b/internal/mcp/tools.go @@ -286,9 +286,9 @@ func (s *Server) toolHandler(ctx context.Context, name string, args map[string]a }() switch name { case "get_system_graph": - return s.toolGetSystemGraph(args) + return s.toolGetSystemGraph(ctx, args) case "get_service_health": - return s.toolGetServiceHealth(args) + return s.toolGetServiceHealth(ctx, args) case "search_logs": return s.toolSearchLogs(ctx, args) case "tail_logs": @@ -334,7 +334,28 @@ func (s *Server) toolHandler(ctx context.Context, name string, args map[string]a // --- Tool implementations --- -func (s *Server) toolGetSystemGraph(_ map[string]any) ToolCallResult { +// toolGetSystemGraph returns a tenant-scoped service topology snapshot. +// +// When GraphRAG is wired (the default in production) the response is built +// from its per-tenant ServiceMap and AllServiceEdges, so two tenants with +// overlapping service names cannot see each other's nodes or edges. The +// legacy *graph.Graph remains as a fallback for boot windows when GraphRAG +// is still warming up; that fallback is cross-tenant by construction and +// is the documented legacy code path called out in RAN-39. +func (s *Server) toolGetSystemGraph(ctx context.Context, _ map[string]any) ToolCallResult { + if s.graphRAG != nil { + entries := s.graphRAG.ServiceMap(mcpCtx(ctx), 0) + edges := s.graphRAG.AllServiceEdges(mcpCtx(ctx)) + payload := map[string]any{ + "services": entries, + "edges": edges, + } + data, err := json.MarshalIndent(payload, "", " ") + if err != nil { + return errorResult(fmt.Sprintf("failed to marshal system graph: %v", err)) + } + return textResult(string(data)) + } if s.svcGraph == nil { return errorResult("service graph not yet initialized") } @@ -346,11 +367,26 @@ func (s *Server) toolGetSystemGraph(_ map[string]any) ToolCallResult { return textResult(string(data)) } -func (s *Server) toolGetServiceHealth(args map[string]any) ToolCallResult { +// toolGetServiceHealth returns the ServiceMap entry for svcName scoped to +// the tenant on ctx. Falls back to the legacy svcGraph snapshot when +// GraphRAG is not yet wired. +func (s *Server) toolGetServiceHealth(ctx context.Context, args map[string]any) ToolCallResult { svcName, _ := args["service_name"].(string) if svcName == "" { return errorResult("service_name is required") } + if s.graphRAG != nil { + for _, entry := range s.graphRAG.ServiceMap(mcpCtx(ctx), 0) { + if entry.Service != nil && entry.Service.Name == svcName { + data, err := json.MarshalIndent(entry, "", " ") + if err != nil { + return errorResult(fmt.Sprintf("failed to marshal service health: %v", err)) + } + return textResult(string(data)) + } + } + return textResult(fmt.Sprintf("service %q not found in the current tenant window", svcName)) + } if s.svcGraph == nil { return errorResult("service graph not yet initialized") } @@ -590,11 +626,8 @@ func (s *Server) toolFindSimilarLogs(ctx context.Context, args map[string]any) T if s.vectorIdx == nil { return errorResult("vector index not yet initialized") } - // TODO(RAN-20): vectordb.Index.Search itself is not yet tenant-scoped on - // `main`. The tenant filter for log similarity will move into the - // vector index in the RAN-20 follow-up; until then this call is shared. - _ = ctx - results := s.vectorIdx.Search(query, limit) + tenant := storage.TenantFromContext(mcpCtx(ctx)) + results := s.vectorIdx.Search(tenant, query, limit) data, err := json.MarshalIndent(results, "", " ") if err != nil { return errorResult(fmt.Sprintf("failed to marshal similar logs: %v", err)) diff --git a/internal/mcp/tools_ran20_test.go b/internal/mcp/tools_ran20_test.go new file mode 100644 index 0000000..7477ae5 --- /dev/null +++ b/internal/mcp/tools_ran20_test.go @@ -0,0 +1,79 @@ +package mcp + +import ( + "context" + "strings" + "testing" + + "github.com/RandomCodeSpace/otelcontext/internal/storage" + "github.com/RandomCodeSpace/otelcontext/internal/vectordb" +) + +// TestFindSimilarLogs_TenantIsolation is the RAN-20 acceptance bar for the MCP +// surface. Two tenants with unique marker strings in their log bodies query +// find_similar_logs; each tenant's response must never contain the other's +// markers. +func TestFindSimilarLogs_TenantIsolation(t *testing.T) { + idx := vectordb.New(1_000) + idx.Add(101, "acme", "checkout", "ERROR", "payment gateway timeout acme-secret-charge-id-abc") + idx.Add(102, "acme", "checkout", "ERROR", "payment gateway refused acme-only-marker-xyz") + idx.Add(201, "globex", "auth", "ERROR", "payment gateway token expired globex-secret-session-123") + idx.Add(202, "globex", "auth", "ERROR", "payment gateway 500 internal globex-only-marker-qqq") + + srv := &Server{vectorIdx: idx, defaultTenant: storage.DefaultTenantID} + args := map[string]any{"query": "payment gateway", "limit": float64(50)} + + // Acme + acmeRes := srv.toolFindSimilarLogs(storage.WithTenantContext(context.Background(), "acme"), args) + if acmeRes.IsError { + t.Fatalf("acme call errored: %+v", acmeRes) + } + acmeBody := concatContent(acmeRes.Content) + for _, forbidden := range []string{"globex-secret-session-123", "globex-only-marker-qqq", `"LogID": 201`, `"LogID": 202`} { + if strings.Contains(acmeBody, forbidden) { + t.Fatalf("acme leaked globex content %q in body:\n%s", forbidden, acmeBody) + } + } + if !strings.Contains(acmeBody, "acme-secret-charge-id-abc") && !strings.Contains(acmeBody, "acme-only-marker-xyz") { + t.Fatalf("acme did not receive its own rows:\n%s", acmeBody) + } + + // Globex + gRes := srv.toolFindSimilarLogs(storage.WithTenantContext(context.Background(), "globex"), args) + if gRes.IsError { + t.Fatalf("globex call errored: %+v", gRes) + } + gBody := concatContent(gRes.Content) + for _, forbidden := range []string{"acme-secret-charge-id-abc", "acme-only-marker-xyz", `"LogID": 101`, `"LogID": 102`} { + if strings.Contains(gBody, forbidden) { + t.Fatalf("globex leaked acme content %q in body:\n%s", forbidden, gBody) + } + } +} + +// TestFindSimilarLogs_NoTenantFallsBackToDefault proves that a context with no +// tenant value is coerced to the server default — it must NOT bleed into +// another tenant's rows. +func TestFindSimilarLogs_NoTenantFallsBackToDefault(t *testing.T) { + idx := vectordb.New(100) + idx.Add(1, "acme", "svc", "ERROR", "acme secret body only") + + srv := &Server{vectorIdx: idx, defaultTenant: storage.DefaultTenantID} + args := map[string]any{"query": "secret body"} + + res := srv.toolFindSimilarLogs(context.Background(), args) + if res.IsError { + t.Fatalf("unexpected error: %+v", res) + } + if strings.Contains(concatContent(res.Content), "acme secret body only") { + t.Fatalf("no-tenant call leaked acme content:\n%s", concatContent(res.Content)) + } +} + +func concatContent(items []ContentItem) string { + var b strings.Builder + for _, c := range items { + b.WriteString(c.Text) + } + return b.String() +} diff --git a/internal/storage/log_repo.go b/internal/storage/log_repo.go index dfaae97..e57be6b 100644 --- a/internal/storage/log_repo.go +++ b/internal/storage/log_repo.go @@ -143,6 +143,33 @@ func (r *Repository) UpdateLogInsight(ctx context.Context, logID uint, insight s return nil } +// ListRecentHighSeverityLogsAllTenants returns recent logs of the given +// severity across EVERY tenant, each row carrying its own TenantID. This is an +// administrative read used exclusively by the vector index's startup +// hydration path, which fans rows out to per-tenant shards. It is not exposed +// on any tenant-scoped API surface — tenant isolation for read paths must +// otherwise be preserved via the context-driven WHERE clause. +func (r *Repository) ListRecentHighSeverityLogsAllTenants(ctx context.Context, severity string, since, until time.Time, limit int) ([]Log, error) { + if limit <= 0 { + limit = 5000 + } + q := r.db.WithContext(ctx).Model(&Log{}) + if severity != "" { + q = q.Where("severity = ?", severity) + } + if !since.IsZero() { + q = q.Where("timestamp >= ?", since) + } + if !until.IsZero() { + q = q.Where("timestamp <= ?", until) + } + var logs []Log + if err := q.Order("timestamp desc").Limit(limit).Find(&logs).Error; err != nil { + return nil, fmt.Errorf("failed to list recent logs all tenants: %w", err) + } + return logs, nil +} + // PurgeLogs deletes logs older than the given timestamp in a single statement. // Suitable for SQLite; for Postgres at large retention volumes prefer PurgeLogsBatched. func (r *Repository) PurgeLogs(olderThan time.Time) (int64, error) { diff --git a/internal/vectordb/index.go b/internal/vectordb/index.go index 1333d57..99b804a 100644 --- a/internal/vectordb/index.go +++ b/internal/vectordb/index.go @@ -11,9 +11,20 @@ import ( "unicode" ) +// defaultTenantID is the tenant assigned when the caller passes an empty +// tenant string. Mirrors storage.DefaultTenantID; duplicated here to avoid +// pulling internal/storage into vectordb's import graph. +const defaultTenantID = "default" + // LogVector represents an indexed log entry. +// +// Tenant scopes the document so Search can return only the caller's tenant +// rows. The TF-IDF table is shared across tenants — global IDF still gives +// the right rarity signal — but the per-document tenant tag is enforced at +// query time so two tenants with overlapping log bodies stay isolated. type LogVector struct { LogID uint + Tenant string ServiceName string Severity string Body string @@ -23,6 +34,7 @@ type LogVector struct { // SearchResult is a single similarity hit. type SearchResult struct { LogID uint + Tenant string ServiceName string Severity string Body string @@ -50,8 +62,10 @@ func New(maxSize int) *Index { } } -// Add adds a log to the index. Thread-safe. -func (idx *Index) Add(logID uint, serviceName, severity, body string) { +// Add adds a log to the index. Thread-safe. Tenant is recorded with the +// document so Search can filter by it; an empty tenant collapses to +// the platform default at the boundary, matching storage.TenantFromContext. +func (idx *Index) Add(logID uint, tenant, serviceName, severity, body string) { if !shouldIndex(severity) { return } @@ -61,20 +75,48 @@ func (idx *Index) Add(logID uint, serviceName, severity, body string) { } tf := computeTF(tokens) + if tenant == "" { + tenant = defaultTenantID + } + idx.mu.Lock() defer idx.mu.Unlock() - // FIFO eviction — copy to new slice to release old backing array memory + // Tenant-aware FIFO eviction. When at cap, remove up to maxSize/10 of the + // oldest entries belonging to the inserting tenant so a noisy tenant + // cannot push another tenant's warm rows out of the index (availability + // isolation — the confidentiality invariant is enforced separately by + // doc.Tenant filtering in Search). The new backing slice also releases + // the old array memory on the next GC cycle. if len(idx.docs) >= idx.maxSize { - keep := idx.docs[idx.maxSize/10:] - newDocs := make([]LogVector, len(keep), idx.maxSize) - copy(newDocs, keep) - idx.docs = newDocs + toDrop := idx.maxSize / 10 + if toDrop < 1 { + toDrop = 1 + } + kept := make([]LogVector, 0, idx.maxSize) + droppedSame := 0 + for _, d := range idx.docs { + if droppedSame < toDrop && d.Tenant == tenant { + droppedSame++ + continue + } + kept = append(kept, d) + } + // Edge case: the inserting tenant has no prior entries while the + // index is at cap with other tenants' rows. Drop one globally-oldest + // entry so the new tenant can take its first slot. This is the only + // path where a tenant's entry can be evicted by another tenant, and + // it costs at most one row per brand-new tenant. + if droppedSame == 0 && len(kept) > 0 { + kept = kept[1:] + } + idx.docs = kept idx.dirty = true } idx.docs = append(idx.docs, LogVector{ LogID: logID, + Tenant: tenant, ServiceName: serviceName, Severity: severity, Body: body, @@ -83,11 +125,17 @@ func (idx *Index) Add(logID uint, serviceName, severity, body string) { idx.dirty = true } -// Search finds the top-k logs most similar to the query string. -func (idx *Index) Search(query string, k int) []SearchResult { +// Search finds the top-k logs most similar to the query string within +// tenant. Documents from other tenants are excluded — the IDF table stays +// global so rarity is computed against the whole corpus, but result rows +// are filtered to the caller's tenant. +func (idx *Index) Search(tenant, query string, k int) []SearchResult { if k <= 0 { k = 10 } + if tenant == "" { + tenant = defaultTenantID + } tokens := tokenize(query) if len(tokens) == 0 { return nil @@ -124,6 +172,9 @@ func (idx *Index) Search(query string, k int) []SearchResult { } results := make([]scored, 0, len(docs)) for _, doc := range docs { + if doc.Tenant != tenant { + continue + } docVec := make(map[string]float64, len(doc.vec)) for term, tf := range doc.vec { docVec[term] = tf * idfSnap[term] @@ -145,6 +196,7 @@ func (idx *Index) Search(query string, k int) []SearchResult { for i, r := range results { out[i] = SearchResult{ LogID: r.doc.LogID, + Tenant: r.doc.Tenant, ServiceName: r.doc.ServiceName, Severity: r.doc.Severity, Body: r.doc.Body, diff --git a/internal/vectordb/index_test.go b/internal/vectordb/index_test.go new file mode 100644 index 0000000..9b9186c --- /dev/null +++ b/internal/vectordb/index_test.go @@ -0,0 +1,136 @@ +package vectordb + +import ( + "strconv" + "sync" + "testing" +) + +// TestTenantIsolation_Search is the RAN-20 confidentiality bar: a query on +// tenant A never returns a document indexed under tenant B, even when the +// vocabularies collide on the query terms. +func TestTenantIsolation_Search(t *testing.T) { + idx := New(1_000) + + idx.Add(1, "acme", "checkout", "ERROR", "payment gateway timeout upstream") + idx.Add(2, "acme", "checkout", "ERROR", "payment gateway refused charge") + idx.Add(10, "globex", "auth", "ERROR", "payment gateway token expired") + idx.Add(11, "globex", "auth", "ERROR", "payment gateway 500 internal error") + + acmeHits := idx.Search("acme", "payment gateway timeout", 10) + if len(acmeHits) == 0 { + t.Fatalf("acme search returned zero hits despite matching docs") + } + for _, h := range acmeHits { + if h.Tenant != "acme" || h.LogID >= 10 { + t.Fatalf("acme search leaked id=%d tenant=%q body=%q", h.LogID, h.Tenant, h.Body) + } + } + + globexHits := idx.Search("globex", "payment gateway token", 10) + if len(globexHits) == 0 { + t.Fatalf("globex search returned zero hits despite matching docs") + } + for _, h := range globexHits { + if h.Tenant != "globex" || h.LogID < 10 { + t.Fatalf("globex search leaked id=%d tenant=%q body=%q", h.LogID, h.Tenant, h.Body) + } + } +} + +// TestUnknownTenantReturnsEmpty proves a tenant with no indexed docs returns +// nothing even when other tenants have matching content. +func TestUnknownTenantReturnsEmpty(t *testing.T) { + idx := New(100) + idx.Add(1, "acme", "svc", "ERROR", "database connection refused upstream") + + if got := idx.Search("initech", "database connection", 10); len(got) != 0 { + t.Fatalf("unknown tenant saw %d cross-tenant hits", len(got)) + } +} + +// TestEmptyTenantCoercedToDefault verifies Add and Search coerce an empty +// tenant to the platform default so untenanted callers stay isolated from +// real tenants. +func TestEmptyTenantCoercedToDefault(t *testing.T) { + idx := New(100) + idx.Add(1, "", "svc", "ERROR", "network unreachable upstream host") + + if hits := idx.Search("", "network unreachable", 10); len(hits) != 1 { + t.Fatalf("search with empty tenant: want 1 hit, got %d", len(hits)) + } + if hits := idx.Search(defaultTenantID, "network unreachable", 10); len(hits) != 1 { + t.Fatalf("search with default tenant id: want 1 hit, got %d", len(hits)) + } + if hits := idx.Search("acme", "network unreachable", 10); len(hits) != 0 { + t.Fatalf("acme saw %d cross-tenant hits for default-tenant doc", len(hits)) + } +} + +// TestFIFOEvictionFairness is TechLead's requested assertion: a tenant that +// writes near-cap volume cannot evict another tenant's documents from the +// shared index. Under a naive global-FIFO policy tenant B's flood would +// remove tenant A's older entries and A would silently "lose" its warm +// rows — a confidentiality-safe but availability-breaking failure mode. +func TestFIFOEvictionFairness(t *testing.T) { + const cap = 200 + idx := New(cap) + + // Tenant A writes a small set of distinctive markers. + for i := 0; i < 5; i++ { + idx.Add(uint(1+i), "acme", "checkout", "ERROR", "acme-canary-marker alpha beta gamma "+strconv.Itoa(i)) + } + + // Tenant B floods the index well past the cap — enough to trigger + // multiple eviction cycles. + for i := 0; i < cap*4; i++ { + idx.Add(uint(10_000+i), "globex", "svc", "ERROR", "globex chatter filling the index "+strconv.Itoa(i)) + } + + // Every one of acme's canary rows must still be findable. + hits := idx.Search("acme", "acme-canary-marker alpha beta gamma", 20) + if len(hits) < 5 { + t.Fatalf("eviction unfairness: acme canaries evicted by globex flood. want >=5 hits, got %d", len(hits)) + } + seen := map[uint]bool{} + for _, h := range hits { + if h.Tenant != "acme" { + t.Fatalf("cross-tenant leak during eviction test: id=%d tenant=%q", h.LogID, h.Tenant) + } + seen[h.LogID] = true + } + for id := uint(1); id <= 5; id++ { + if !seen[id] { + t.Fatalf("acme canary id=%d missing after globex flood", id) + } + } +} + +// TestConcurrentTenantAddSearch pins down race-detector cleanliness and +// cross-tenant isolation under concurrent readers/writers. +func TestConcurrentTenantAddSearch(t *testing.T) { + idx := New(5_000) + var wg sync.WaitGroup + + for _, tenant := range []string{"acme", "globex"} { + wg.Add(2) + go func(ten string) { + defer wg.Done() + for i := 0; i < 500; i++ { + idx.Add(uint(i), ten, "svc", "ERROR", ten+" error kafka partition "+strconv.Itoa(i)) + } + }(tenant) + go func(ten string) { + defer wg.Done() + for i := 0; i < 500; i++ { + for _, h := range idx.Search(ten, "kafka partition", 5) { + if h.Tenant != ten { + t.Errorf("tenant %s saw cross-tenant hit tenant=%q body=%q", ten, h.Tenant, h.Body) + return + } + } + } + }(tenant) + } + wg.Wait() +} diff --git a/main.go b/main.go index c94be26..83f96cb 100644 --- a/main.go +++ b/main.go @@ -331,18 +331,18 @@ func main() { // Hydrate vector index from recent ERROR/WARN logs on startup (non-blocking). // Uses appCtx so SIGTERM during boot cancels the query before repo.Close(). + // Hydration is cross-tenant by design: each row lands tagged with its own + // TenantID via vectorIdx.Add so isolation is preserved at query time. The + // previous tenant-scoped GetLogsV2 call silently hydrated only the default + // tenant's rows — non-default tenants lost their warm index on every + // restart. bootWG.Add(1) go func() { defer bootWG.Done() - recentLogs, _, err := repo.GetLogsV2(appCtx, storage.LogFilter{ - Severity: "ERROR", - StartTime: time.Now().Add(-24 * time.Hour), - EndTime: time.Now(), - Limit: 5000, - }) + recentLogs, err := repo.ListRecentHighSeverityLogsAllTenants(appCtx, "ERROR", time.Now().Add(-24*time.Hour), time.Now(), 5000) if err == nil { for _, l := range recentLogs { - vectorIdx.Add(l.ID, l.ServiceName, l.Severity, l.Body) + vectorIdx.Add(l.ID, l.TenantID, l.ServiceName, l.Severity, l.Body) } slog.Info("🔍 Vector index hydrated from recent ERROR logs", "count", len(recentLogs)) } @@ -412,7 +412,7 @@ func main() { Timestamp: l.Timestamp, }) aiService.EnqueueLog(l) - vectorIdx.Add(l.ID, l.ServiceName, l.Severity, l.Body) + vectorIdx.Add(l.ID, l.TenantID, l.ServiceName, l.Severity, l.Body) eventHub.NotifyRefresh() if time.Since(start) > 100*time.Millisecond { slog.Warn("Slow broadcast/enqueue", "duration", time.Since(start))