Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions internal/api/similar_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import (
"encoding/json"
"net/http"
"strconv"

"github.com/RandomCodeSpace/otelcontext/internal/storage"
)

// handleGetSimilarLogs handles GET /api/logs/similar?q=<text>&limit=10
// Returns logs semantically similar to the query string using TF-IDF cosine similarity.
// Returns logs semantically similar to the query string using TF-IDF cosine
// similarity, scoped to the tenant on r.Context() (set by TenantMiddleware
// from X-Tenant-ID). Cross-tenant rows are never returned.
func (s *Server) handleGetSimilarLogs(w http.ResponseWriter, r *http.Request) {
if s.vectorIdx == nil {
http.Error(w, "vector index not initialized", http.StatusServiceUnavailable)
Expand All @@ -30,7 +34,8 @@ func (s *Server) handleGetSimilarLogs(w http.ResponseWriter, r *http.Request) {
limit = 50
}

results := s.vectorIdx.Search(query, limit)
tenant := storage.TenantFromContext(r.Context())
results := s.vectorIdx.Search(tenant, query, limit)

w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{
Expand Down
117 changes: 117 additions & 0 deletions internal/api/similar_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package api

import (
"encoding/json"
"net/http"
"net/http/httptest"
"net/url"
"testing"

"github.com/RandomCodeSpace/otelcontext/internal/config"
"github.com/RandomCodeSpace/otelcontext/internal/vectordb"
)

// TestSimilarHandler_TenantIsolation is the RAN-20 acceptance bar for the HTTP
// surface. Two tenants with distinct corpora query /api/logs/similar; each
// sees ZERO rows belonging to the other tenant.
func TestSimilarHandler_TenantIsolation(t *testing.T) {
idx := vectordb.New(1_000)
idx.Add(101, "acme", "checkout", "ERROR", "payment gateway timeout charging customer")
idx.Add(102, "acme", "checkout", "ERROR", "payment gateway refused charge insufficient funds")
idx.Add(201, "globex", "auth", "ERROR", "payment gateway token expired for session")
idx.Add(202, "globex", "auth", "ERROR", "payment gateway 500 internal error while authenticating")

srv := &Server{vectorIdx: idx}
mux := http.NewServeMux()
mux.HandleFunc("GET /api/logs/similar", srv.handleGetSimilarLogs)
handler := TenantMiddleware(&config.Config{DefaultTenant: "default"})(mux)

acmeIDs := map[float64]bool{101: true, 102: true}
globexIDs := map[float64]bool{201: true, 202: true}

q := url.Values{}
q.Set("q", "payment gateway")
q.Set("limit", "50")
path := "/api/logs/similar?" + q.Encode()

// Tenant A
aRec := httptest.NewRecorder()
aReq := httptest.NewRequest(http.MethodGet, path, nil)
aReq.Header.Set(TenantHeader, "acme")
handler.ServeHTTP(aRec, aReq)
if aRec.Code != http.StatusOK {
t.Fatalf("acme: want 200, got %d body=%q", aRec.Code, aRec.Body.String())
}
acme := decodeResults(t, aRec)
if len(acme) == 0 {
t.Fatalf("acme got zero hits despite matching corpus")
}
for _, r := range acme {
if !acmeIDs[r.ID] {
t.Fatalf("acme leaked cross-tenant id=%v tenant=%q body=%q", r.ID, r.Tenant, r.Body)
}
}

// Tenant B
gRec := httptest.NewRecorder()
gReq := httptest.NewRequest(http.MethodGet, path, nil)
gReq.Header.Set(TenantHeader, "globex")
handler.ServeHTTP(gRec, gReq)
if gRec.Code != http.StatusOK {
t.Fatalf("globex: want 200, got %d", gRec.Code)
}
globex := decodeResults(t, gRec)
if len(globex) == 0 {
t.Fatalf("globex got zero hits despite matching corpus")
}
for _, r := range globex {
if !globexIDs[r.ID] {
t.Fatalf("globex leaked cross-tenant id=%v tenant=%q body=%q", r.ID, r.Tenant, r.Body)
}
}
}

// TestSimilarHandler_UnknownTenantReturnsEmpty confirms a request bearing an
// unknown tenant header returns zero results — the handler must not silently
// fall back to another tenant's rows.
func TestSimilarHandler_UnknownTenantReturnsEmpty(t *testing.T) {
idx := vectordb.New(100)
idx.Add(1, "acme", "svc", "ERROR", "database connection refused upstream")

srv := &Server{vectorIdx: idx}
mux := http.NewServeMux()
mux.HandleFunc("GET /api/logs/similar", srv.handleGetSimilarLogs)
handler := TenantMiddleware(&config.Config{DefaultTenant: "default"})(mux)

rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/api/logs/similar?q=database+connection", nil)
req.Header.Set(TenantHeader, "initech")
handler.ServeHTTP(rec, req)

if rec.Code != http.StatusOK {
t.Fatalf("want 200, got %d", rec.Code)
}
if r := decodeResults(t, rec); len(r) != 0 {
t.Fatalf("unknown tenant saw %d cross-tenant hits", len(r))
}
}

type similarResult struct {
ID float64 `json:"LogID"`
Tenant string `json:"Tenant"`
ServiceName string `json:"ServiceName"`
Severity string `json:"Severity"`
Body string `json:"Body"`
Score float64 `json:"Score"`
}

func decodeResults(t *testing.T, rec *httptest.ResponseRecorder) []similarResult {
t.Helper()
var env struct {
Results []similarResult `json:"results"`
}
if err := json.Unmarshal(rec.Body.Bytes(), &env); err != nil {
t.Fatalf("decode response: %v (body=%q)", err, rec.Body.String())
}
return env.Results
}
22 changes: 20 additions & 2 deletions internal/graphrag/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,19 @@ func (g *GraphRAG) DroppedMetricsCount() int64 { return g.droppedMetrics.Load()
// tests to assert cooldown behavior without requiring a live repo.
func (g *GraphRAG) InvestigationInsertCount() int64 { return g.invInserts.Load() }

// RegisterAnomaly inserts an anomaly into the AnomalyStore for tenant.
// Mirrors PersistInvestigation's "tenant accepted explicitly" shape so
// out-of-band anomaly producers (synthetic detectors, integration tests,
// future external anomaly feeds) can land directly on the right tenant
// slice without going through the metric/error detection loops. Empty
// tenant collapses to storage.DefaultTenantID.
func (g *GraphRAG) RegisterAnomaly(tenant string, anomaly AnomalyNode) {
if tenant == "" {
tenant = storage.DefaultTenantID
}
g.storesForTenant(tenant).anomalies.AddAnomaly(anomaly)
}

// recordEventDrop increments the per-signal atomic counter and — when
// a telemetry registry is wired — the Prometheus counter vec.
func (g *GraphRAG) recordEventDrop(signal string) {
Expand Down Expand Up @@ -238,8 +251,13 @@ func New(repo *storage.Repository, vectorIdx *vectordb.Index, tsdbAgg *tsdb.Aggr
// Restore persisted Drain templates so log clustering survives restarts.
// A missing table (fresh install) or transient DB error is non-fatal —
// ingestion will rebuild templates from scratch.
//
// The Drain miner is currently a single shared instance, so we treat its
// learned templates as belonging to DefaultTenantID. The persistence layer
// is already keyed by (tenant_id, id) so a future per-tenant Drain miner
// can load each tenant's slice without colliding cluster IDs.
if repo != nil && repo.DB() != nil {
if tpls, err := LoadDrainTemplates(repo.DB()); err != nil {
if tpls, err := LoadDrainTemplates(repo.DB(), storage.DefaultTenantID); err != nil {
slog.Info("GraphRAG: drain template restore skipped", "reason", err)
} else if len(tpls) > 0 {
g.drain.LoadTemplates(tpls)
Expand Down Expand Up @@ -294,7 +312,7 @@ func (g *GraphRAG) Stop() {
// Best-effort final Drain template persistence — losing the most recent
// updates on an unclean shutdown would force rebuilding from scratch.
if g.repo != nil && g.repo.DB() != nil && g.drain != nil {
if err := SaveDrainTemplates(g.repo.DB(), g.drain.Templates()); err != nil {
if err := SaveDrainTemplates(g.repo.DB(), storage.DefaultTenantID, g.drain.Templates()); err != nil {
slog.Warn("GraphRAG: final drain template save failed", "error", err)
}
}
Expand Down
32 changes: 22 additions & 10 deletions internal/graphrag/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"sync"
"time"

"github.com/RandomCodeSpace/otelcontext/internal/storage"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
Expand Down Expand Up @@ -504,20 +505,28 @@ func (d *Drain) Len() int {
// --- Persistence ---

// SaveDrainTemplates upserts the given templates into the drain_templates
// table. Tokens are JSON-encoded. Uses GORM's clause.OnConflict upsert which
// is dialect-aware (ON CONFLICT for SQLite/PostgreSQL, ON DUPLICATE KEY
// UPDATE for MySQL).
func SaveDrainTemplates(db *gorm.DB, templates []Template) error {
// table under the supplied tenant. Tokens are JSON-encoded. Uses GORM's
// clause.OnConflict upsert which is dialect-aware (ON CONFLICT for SQLite/
// PostgreSQL, ON DUPLICATE KEY UPDATE for MySQL).
//
// The conflict target is the composite (tenant_id, id) primary key so the
// same Drain template hash can coexist across tenants — a future per-tenant
// Drain miner can rely on this to keep cluster IDs stable per tenant.
func SaveDrainTemplates(db *gorm.DB, tenant string, templates []Template) error {
if db == nil || len(templates) == 0 {
return nil
}
if tenant == "" {
tenant = storage.DefaultTenantID
}
rows := make([]DrainTemplateRow, 0, len(templates))
for _, t := range templates {
tokensJSON, err := json.Marshal(t.Tokens)
if err != nil {
return fmt.Errorf("marshal drain tokens id=%d: %w", t.ID, err)
}
rows = append(rows, DrainTemplateRow{
TenantID: tenant,
ID: int64(t.ID), //nolint:gosec // intentional bit-reinterpret of FNV-64 hash for DB portability
Tokens: string(tokensJSON),
Count: t.Count,
Expand All @@ -527,22 +536,25 @@ func SaveDrainTemplates(db *gorm.DB, templates []Template) error {
})
}
return db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
Columns: []clause.Column{{Name: "tenant_id"}, {Name: "id"}},
DoUpdates: clause.AssignmentColumns([]string{
"tokens", "count", "last_seen", "sample",
}),
}).CreateInBatches(&rows, 500).Error
}

// LoadDrainTemplates reads all persisted Drain templates from the DB and
// returns them in a format ready to pass to Drain.LoadTemplates. Returns an
// empty slice (and nil error) if the table is empty.
func LoadDrainTemplates(db *gorm.DB) ([]Template, error) {
// LoadDrainTemplates reads persisted Drain templates for the supplied tenant
// and returns them in a format ready to pass to Drain.LoadTemplates. Returns
// an empty slice (and nil error) if no rows match.
func LoadDrainTemplates(db *gorm.DB, tenant string) ([]Template, error) {
if db == nil {
return nil, nil
}
if tenant == "" {
tenant = storage.DefaultTenantID
}
var rows []DrainTemplateRow
if err := db.Find(&rows).Error; err != nil {
if err := db.Where("tenant_id = ?", tenant).Find(&rows).Error; err != nil {
return nil, err
}
out := make([]Template, 0, len(rows))
Expand Down
15 changes: 8 additions & 7 deletions internal/graphrag/drain_persist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/RandomCodeSpace/otelcontext/internal/storage"
"github.com/glebarez/sqlite"
"gorm.io/gorm"
)
Expand Down Expand Up @@ -50,12 +51,12 @@ func TestDrainPersistence_RoundTrip(t *testing.T) {
}

// Persist.
if err := SaveDrainTemplates(db, d1.Templates()); err != nil {
if err := SaveDrainTemplates(db, storage.DefaultTenantID, d1.Templates()); err != nil {
t.Fatalf("SaveDrainTemplates: %v", err)
}

// Reload.
loaded, err := LoadDrainTemplates(db)
loaded, err := LoadDrainTemplates(db, storage.DefaultTenantID)
if err != nil {
t.Fatalf("LoadDrainTemplates: %v", err)
}
Expand Down Expand Up @@ -91,7 +92,7 @@ func TestDrainPersistence_Upsert(t *testing.T) {
for i := 0; i < 10; i++ {
d.Match(fmt.Sprintf("upsert kind %c event", 'a'+byte(i)), t0)
}
if err := SaveDrainTemplates(db, d.Templates()); err != nil {
if err := SaveDrainTemplates(db, storage.DefaultTenantID, d.Templates()); err != nil {
t.Fatalf("first save: %v", err)
}

Expand All @@ -108,7 +109,7 @@ func TestDrainPersistence_Upsert(t *testing.T) {
for i := 0; i < 10; i++ {
d.Match(fmt.Sprintf("upsert kind %c event", 'a'+byte(i)), t1)
}
if err := SaveDrainTemplates(db, d.Templates()); err != nil {
if err := SaveDrainTemplates(db, storage.DefaultTenantID, d.Templates()); err != nil {
t.Fatalf("second save: %v", err)
}

Expand Down Expand Up @@ -139,7 +140,7 @@ func TestDrainPersistence_Upsert(t *testing.T) {
// returns an empty slice and nil error (fresh-install path).
func TestDrainPersistence_EmptyDB(t *testing.T) {
db := newTestDrainDB(t)
tpls, err := LoadDrainTemplates(db)
tpls, err := LoadDrainTemplates(db, storage.DefaultTenantID)
if err != nil {
t.Fatalf("LoadDrainTemplates on empty table: %v", err)
}
Expand All @@ -148,7 +149,7 @@ func TestDrainPersistence_EmptyDB(t *testing.T) {
}

// Calling with nil DB is also safe.
tpls, err = LoadDrainTemplates(nil)
tpls, err = LoadDrainTemplates(nil, storage.DefaultTenantID)
if err != nil {
t.Fatalf("LoadDrainTemplates(nil): %v", err)
}
Expand All @@ -157,7 +158,7 @@ func TestDrainPersistence_EmptyDB(t *testing.T) {
}

// Saving an empty slice is a no-op (no error, no rows).
if err := SaveDrainTemplates(db, nil); err != nil {
if err := SaveDrainTemplates(db, storage.DefaultTenantID, nil); err != nil {
t.Fatalf("SaveDrainTemplates(nil): %v", err)
}
var cnt int64
Expand Down
Loading