Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion internal/api/similar_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/json"
"net/http"
"strconv"

"github.com/RandomCodeSpace/otelcontext/internal/storage"
)

// handleGetSimilarLogs handles GET /api/logs/similar?q=<text>&limit=10
Expand All @@ -30,7 +32,8 @@ func (s *Server) handleGetSimilarLogs(w http.ResponseWriter, r *http.Request) {
limit = 50
}

results := s.vectorIdx.Search(query, limit)
tenant := storage.TenantFromContext(r.Context())
results := s.vectorIdx.Search(tenant, query, limit)

w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{
Expand Down
117 changes: 117 additions & 0 deletions internal/api/similar_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package api

import (
"encoding/json"
"net/http"
"net/http/httptest"
"net/url"
"testing"

"github.com/RandomCodeSpace/otelcontext/internal/config"
"github.com/RandomCodeSpace/otelcontext/internal/vectordb"
)

// TestSimilarHandler_TenantIsolation is the RAN-20 acceptance bar for the HTTP
// surface. Two tenants with distinct corpora query /api/logs/similar; each
// sees ZERO rows belonging to the other tenant.
func TestSimilarHandler_TenantIsolation(t *testing.T) {
idx := vectordb.New(1_000)
idx.Add(101, "acme", "checkout", "ERROR", "payment gateway timeout charging customer")
idx.Add(102, "acme", "checkout", "ERROR", "payment gateway refused charge insufficient funds")
idx.Add(201, "globex", "auth", "ERROR", "payment gateway token expired for session")
idx.Add(202, "globex", "auth", "ERROR", "payment gateway 500 internal error while authenticating")

srv := &Server{vectorIdx: idx}
mux := http.NewServeMux()
mux.HandleFunc("GET /api/logs/similar", srv.handleGetSimilarLogs)
handler := TenantMiddleware(&config.Config{DefaultTenant: "default"})(mux)

acmeIDs := map[float64]bool{101: true, 102: true}
globexIDs := map[float64]bool{201: true, 202: true}

q := url.Values{}
q.Set("q", "payment gateway")
q.Set("limit", "50")
path := "/api/logs/similar?" + q.Encode()

// Tenant A
aRec := httptest.NewRecorder()
aReq := httptest.NewRequest(http.MethodGet, path, nil)
aReq.Header.Set(TenantHeader, "acme")
handler.ServeHTTP(aRec, aReq)
if aRec.Code != http.StatusOK {
t.Fatalf("acme: want 200, got %d body=%q", aRec.Code, aRec.Body.String())
}
acme := decodeResults(t, aRec)
if len(acme) == 0 {
t.Fatalf("acme got zero hits despite matching corpus")
}
for _, r := range acme {
if !acmeIDs[r.ID] {
t.Fatalf("acme leaked cross-tenant id=%v tenant=%q body=%q", r.ID, r.Tenant, r.Body)
}
}

// Tenant B
gRec := httptest.NewRecorder()
gReq := httptest.NewRequest(http.MethodGet, path, nil)
gReq.Header.Set(TenantHeader, "globex")
handler.ServeHTTP(gRec, gReq)
if gRec.Code != http.StatusOK {
t.Fatalf("globex: want 200, got %d", gRec.Code)
}
globex := decodeResults(t, gRec)
if len(globex) == 0 {
t.Fatalf("globex got zero hits despite matching corpus")
}
for _, r := range globex {
if !globexIDs[r.ID] {
t.Fatalf("globex leaked cross-tenant id=%v tenant=%q body=%q", r.ID, r.Tenant, r.Body)
}
}
}

// TestSimilarHandler_UnknownTenantReturnsEmpty confirms a request bearing an
// unknown tenant header returns zero results — the handler must not silently
// fall back to another tenant's rows.
func TestSimilarHandler_UnknownTenantReturnsEmpty(t *testing.T) {
idx := vectordb.New(100)
idx.Add(1, "acme", "svc", "ERROR", "database connection refused upstream")

srv := &Server{vectorIdx: idx}
mux := http.NewServeMux()
mux.HandleFunc("GET /api/logs/similar", srv.handleGetSimilarLogs)
handler := TenantMiddleware(&config.Config{DefaultTenant: "default"})(mux)

rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/api/logs/similar?q=database+connection", nil)
req.Header.Set(TenantHeader, "initech")
handler.ServeHTTP(rec, req)

if rec.Code != http.StatusOK {
t.Fatalf("want 200, got %d", rec.Code)
}
if r := decodeResults(t, rec); len(r) != 0 {
t.Fatalf("unknown tenant saw %d cross-tenant hits", len(r))
}
}

type similarResult struct {
ID float64 `json:"LogID"`
Tenant string `json:"Tenant"`
ServiceName string `json:"ServiceName"`
Severity string `json:"Severity"`
Body string `json:"Body"`
Score float64 `json:"Score"`
}

func decodeResults(t *testing.T, rec *httptest.ResponseRecorder) []similarResult {
t.Helper()
var env struct {
Results []similarResult `json:"results"`
}
if err := json.Unmarshal(rec.Body.Bytes(), &env); err != nil {
t.Fatalf("decode response: %v (body=%q)", err, rec.Body.String())
}
return env.Results
}
13 changes: 13 additions & 0 deletions internal/graphrag/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,19 @@ func (g *GraphRAG) DroppedMetricsCount() int64 { return g.droppedMetrics.Load()
// tests to assert cooldown behavior without requiring a live repo.
func (g *GraphRAG) InvestigationInsertCount() int64 { return g.invInserts.Load() }

// RegisterAnomaly inserts an anomaly into the AnomalyStore for tenant.
// Mirrors PersistInvestigation's "tenant accepted explicitly" shape so
// out-of-band anomaly producers (synthetic detectors, integration tests,
// future external anomaly feeds) can land directly on the right tenant
// slice without going through the metric/error detection loops. Empty
// tenant collapses to storage.DefaultTenantID.
func (g *GraphRAG) RegisterAnomaly(tenant string, anomaly AnomalyNode) {
if tenant == "" {
tenant = storage.DefaultTenantID
}
g.storesForTenant(tenant).anomalies.AddAnomaly(anomaly)
}

// recordEventDrop increments the per-signal atomic counter and — when
// a telemetry registry is wired — the Prometheus counter vec.
func (g *GraphRAG) recordEventDrop(signal string) {
Expand Down
15 changes: 7 additions & 8 deletions internal/graphrag/clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"context"
"fmt"
"time"

"github.com/RandomCodeSpace/otelcontext/internal/storage"
)

// clusterLog runs the log body through Drain and upserts a LogClusterNode
Expand Down Expand Up @@ -78,14 +80,11 @@ func (g *GraphRAG) SimilarErrors(ctx context.Context, clusterID string, k int) [
if query == "" && len(cluster.TemplateTokens) > 0 {
query = joinTokens(cluster.TemplateTokens)
}
// TODO(RAN-20): vectordb.Index.Search itself is not yet tenant-scoped on
// `main`, so we call the 2-arg signature here. Tenant isolation for the
// SignalStore lookup above is already enforced via storesFor(ctx); the
// vector hits are then narrowed by the EmittedBy edges in this tenant's
// SignalStore on lines below, so cross-tenant hits cannot surface even
// while the underlying vector index is shared.
_ = ctx
results := g.vectorIdx.Search(query, k*2) // over-fetch to filter
// vectordb.Index.Search takes the tenant string directly; resolve it
// from ctx via the same storage helper used by storesFor so both sides
// agree on coercion rules (empty → DefaultTenantID).
tenant := storage.TenantFromContext(ctx)
results := g.vectorIdx.Search(tenant, query, k*2) // over-fetch to filter

// Map results back to log clusters.
seen := map[string]bool{clusterID: true}
Expand Down
Loading
Loading