Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions internal/graphrag/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,13 @@ func New(repo *storage.Repository, vectorIdx *vectordb.Index, tsdbAgg *tsdb.Aggr
// Restore persisted Drain templates so log clustering survives restarts.
// A missing table (fresh install) or transient DB error is non-fatal —
// ingestion will rebuild templates from scratch.
//
// The Drain miner is currently a single shared instance, so we treat its
// learned templates as belonging to DefaultTenantID. The persistence layer
// is already keyed by (tenant_id, id) so a future per-tenant Drain miner
// can load each tenant's slice without colliding cluster IDs.
if repo != nil && repo.DB() != nil {
if tpls, err := LoadDrainTemplates(repo.DB()); err != nil {
if tpls, err := LoadDrainTemplates(repo.DB(), storage.DefaultTenantID); err != nil {
slog.Info("GraphRAG: drain template restore skipped", "reason", err)
} else if len(tpls) > 0 {
g.drain.LoadTemplates(tpls)
Expand Down Expand Up @@ -294,7 +299,7 @@ func (g *GraphRAG) Stop() {
// Best-effort final Drain template persistence — losing the most recent
// updates on an unclean shutdown would force rebuilding from scratch.
if g.repo != nil && g.repo.DB() != nil && g.drain != nil {
if err := SaveDrainTemplates(g.repo.DB(), g.drain.Templates()); err != nil {
if err := SaveDrainTemplates(g.repo.DB(), storage.DefaultTenantID, g.drain.Templates()); err != nil {
slog.Warn("GraphRAG: final drain template save failed", "error", err)
}
}
Expand Down
32 changes: 22 additions & 10 deletions internal/graphrag/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"sync"
"time"

"github.com/RandomCodeSpace/otelcontext/internal/storage"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
Expand Down Expand Up @@ -504,20 +505,28 @@ func (d *Drain) Len() int {
// --- Persistence ---

// SaveDrainTemplates upserts the given templates into the drain_templates
// table. Tokens are JSON-encoded. Uses GORM's clause.OnConflict upsert which
// is dialect-aware (ON CONFLICT for SQLite/PostgreSQL, ON DUPLICATE KEY
// UPDATE for MySQL).
func SaveDrainTemplates(db *gorm.DB, templates []Template) error {
// table under the supplied tenant. Tokens are JSON-encoded. Uses GORM's
// clause.OnConflict upsert which is dialect-aware (ON CONFLICT for SQLite/
// PostgreSQL, ON DUPLICATE KEY UPDATE for MySQL).
//
// The conflict target is the composite (tenant_id, id) primary key so the
// same Drain template hash can coexist across tenants — a future per-tenant
// Drain miner can rely on this to keep cluster IDs stable per tenant.
func SaveDrainTemplates(db *gorm.DB, tenant string, templates []Template) error {
if db == nil || len(templates) == 0 {
return nil
}
if tenant == "" {
tenant = storage.DefaultTenantID
}
rows := make([]DrainTemplateRow, 0, len(templates))
for _, t := range templates {
tokensJSON, err := json.Marshal(t.Tokens)
if err != nil {
return fmt.Errorf("marshal drain tokens id=%d: %w", t.ID, err)
}
rows = append(rows, DrainTemplateRow{
TenantID: tenant,
ID: int64(t.ID), //nolint:gosec // intentional bit-reinterpret of FNV-64 hash for DB portability
Tokens: string(tokensJSON),
Count: t.Count,
Expand All @@ -527,22 +536,25 @@ func SaveDrainTemplates(db *gorm.DB, templates []Template) error {
})
}
return db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
Columns: []clause.Column{{Name: "tenant_id"}, {Name: "id"}},
DoUpdates: clause.AssignmentColumns([]string{
"tokens", "count", "last_seen", "sample",
}),
}).CreateInBatches(&rows, 500).Error
}

// LoadDrainTemplates reads all persisted Drain templates from the DB and
// returns them in a format ready to pass to Drain.LoadTemplates. Returns an
// empty slice (and nil error) if the table is empty.
func LoadDrainTemplates(db *gorm.DB) ([]Template, error) {
// LoadDrainTemplates reads persisted Drain templates for the supplied tenant
// and returns them in a format ready to pass to Drain.LoadTemplates. Returns
// an empty slice (and nil error) if no rows match.
func LoadDrainTemplates(db *gorm.DB, tenant string) ([]Template, error) {
if db == nil {
return nil, nil
}
if tenant == "" {
tenant = storage.DefaultTenantID
}
var rows []DrainTemplateRow
if err := db.Find(&rows).Error; err != nil {
if err := db.Where("tenant_id = ?", tenant).Find(&rows).Error; err != nil {
return nil, err
}
out := make([]Template, 0, len(rows))
Expand Down
15 changes: 8 additions & 7 deletions internal/graphrag/drain_persist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/RandomCodeSpace/otelcontext/internal/storage"
"github.com/glebarez/sqlite"
"gorm.io/gorm"
)
Expand Down Expand Up @@ -50,12 +51,12 @@ func TestDrainPersistence_RoundTrip(t *testing.T) {
}

// Persist.
if err := SaveDrainTemplates(db, d1.Templates()); err != nil {
if err := SaveDrainTemplates(db, storage.DefaultTenantID, d1.Templates()); err != nil {
t.Fatalf("SaveDrainTemplates: %v", err)
}

// Reload.
loaded, err := LoadDrainTemplates(db)
loaded, err := LoadDrainTemplates(db, storage.DefaultTenantID)
if err != nil {
t.Fatalf("LoadDrainTemplates: %v", err)
}
Expand Down Expand Up @@ -91,7 +92,7 @@ func TestDrainPersistence_Upsert(t *testing.T) {
for i := 0; i < 10; i++ {
d.Match(fmt.Sprintf("upsert kind %c event", 'a'+byte(i)), t0)
}
if err := SaveDrainTemplates(db, d.Templates()); err != nil {
if err := SaveDrainTemplates(db, storage.DefaultTenantID, d.Templates()); err != nil {
t.Fatalf("first save: %v", err)
}

Expand All @@ -108,7 +109,7 @@ func TestDrainPersistence_Upsert(t *testing.T) {
for i := 0; i < 10; i++ {
d.Match(fmt.Sprintf("upsert kind %c event", 'a'+byte(i)), t1)
}
if err := SaveDrainTemplates(db, d.Templates()); err != nil {
if err := SaveDrainTemplates(db, storage.DefaultTenantID, d.Templates()); err != nil {
t.Fatalf("second save: %v", err)
}

Expand Down Expand Up @@ -139,7 +140,7 @@ func TestDrainPersistence_Upsert(t *testing.T) {
// returns an empty slice and nil error (fresh-install path).
func TestDrainPersistence_EmptyDB(t *testing.T) {
db := newTestDrainDB(t)
tpls, err := LoadDrainTemplates(db)
tpls, err := LoadDrainTemplates(db, storage.DefaultTenantID)
if err != nil {
t.Fatalf("LoadDrainTemplates on empty table: %v", err)
}
Expand All @@ -148,7 +149,7 @@ func TestDrainPersistence_EmptyDB(t *testing.T) {
}

// Calling with nil DB is also safe.
tpls, err = LoadDrainTemplates(nil)
tpls, err = LoadDrainTemplates(nil, storage.DefaultTenantID)
if err != nil {
t.Fatalf("LoadDrainTemplates(nil): %v", err)
}
Expand All @@ -157,7 +158,7 @@ func TestDrainPersistence_EmptyDB(t *testing.T) {
}

// Saving an empty slice is a no-op (no error, no rows).
if err := SaveDrainTemplates(db, nil); err != nil {
if err := SaveDrainTemplates(db, storage.DefaultTenantID, nil); err != nil {
t.Fatalf("SaveDrainTemplates(nil): %v", err)
}
var cnt int64
Expand Down
71 changes: 43 additions & 28 deletions internal/graphrag/investigation.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/RandomCodeSpace/otelcontext/internal/storage"
"gorm.io/gorm"
)

// investigationCooldown suppresses repeated PersistInvestigation calls with
Expand Down Expand Up @@ -43,13 +42,14 @@ func (c *investigationCooldown) allow(key string, now time.Time) bool {
}

// cooldownKey builds a case- and whitespace-insensitive key from the tuple
// (trigger_service, root_service, root_operation). Service names emitted
// from different instrumentations occasionally differ in casing or have
// trailing whitespace; canonicalizing here prevents those variants from
// bypassing the cooldown guard.
func cooldownKey(triggerService, rootService, rootOperation string) string {
// (tenant, trigger_service, root_service, root_operation). Tenant scopes the
// guard so an error in tenant-A doesn't suppress the same error pattern in
// tenant-B. Service names emitted from different instrumentations occasionally
// differ in casing or have trailing whitespace; canonicalizing here prevents
// those variants from bypassing the cooldown guard.
func cooldownKey(tenant, triggerService, rootService, rootOperation string) string {
norm := func(s string) string { return strings.ToLower(strings.TrimSpace(s)) }
return norm(triggerService) + "|" + norm(rootService) + "|" + norm(rootOperation)
return norm(tenant) + "|" + norm(triggerService) + "|" + norm(rootService) + "|" + norm(rootOperation)
}

// prune drops entries older than cutoff to bound map size. Called from
Expand All @@ -65,9 +65,14 @@ func (c *investigationCooldown) prune(cutoff time.Time) {
}

// Investigation is a persisted record of an automated error investigation.
//
// TenantID scopes the row to its originating tenant. The composite
// (tenant_id, created_at) index supports the recency-ordered "investigations
// for tenant X" query that GetInvestigations runs on every read.
type Investigation struct {
TenantID string `gorm:"size:64;default:'default';not null;index:idx_investigations_tenant_created,priority:1" json:"tenant_id"`
ID string `gorm:"primaryKey;size:64" json:"id"`
CreatedAt time.Time `json:"created_at"`
CreatedAt time.Time `gorm:"index:idx_investigations_tenant_created,priority:2" json:"created_at"`
Status string `gorm:"size:20" json:"status"` // detected, triaged, resolved
Severity string `gorm:"size:20" json:"severity"` // critical, warning, info
TriggerService string `gorm:"size:255;index" json:"trigger_service"`
Expand All @@ -88,20 +93,17 @@ func (Investigation) TableName() string {
return "investigations"
}

// AutoMigrateGraphRAG runs GORM auto-migration for GraphRAG models.
func AutoMigrateGraphRAG(db *gorm.DB) error {
return db.AutoMigrate(&Investigation{}, &GraphSnapshot{}, &DrainTemplateRow{})
}

// PersistInvestigation saves an investigation record from an error chain
// analysis. Tenant is accepted explicitly so the caller (the per-tenant
// anomaly loop) can re-enter ImpactAnalysis on the correct tenant slice. The
// `investigations` table itself does not yet carry a tenant_id column —
// Subtask B (RAN-38) owns that migration.
// anomaly loop) can re-enter ImpactAnalysis on the correct tenant slice and so
// the persisted row carries its originating tenant_id.
func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains []ErrorChainResult, anomalies []*AnomalyNode) {
if len(chains) == 0 {
return
}
if tenant == "" {
tenant = storage.DefaultTenantID
}

firstChain := chains[0]
if firstChain.RootCause == nil {
Expand All @@ -111,10 +113,12 @@ func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains []
now := time.Now()

// Cooldown: suppress repeat investigations for the same
// (trigger_service, root_service, root_operation) inside a sliding window.
// Keys are canonicalized (lower + trim) so "Orders" and "orders " share a
// bucket — otherwise trivial casing differences would bypass the guard.
key := cooldownKey(triggerService, firstChain.RootCause.Service, firstChain.RootCause.Operation)
// (tenant, trigger_service, root_service, root_operation) inside a sliding
// window. Keys are canonicalized (lower + trim) so "Orders" and "orders "
// share a bucket — otherwise trivial casing differences would bypass the
// guard. Tenant scoping prevents an error in one tenant from suppressing
// the same pattern in another.
key := cooldownKey(tenant, triggerService, firstChain.RootCause.Service, firstChain.RootCause.Operation)
if g.invCooldown != nil && !g.invCooldown.allow(key, now) {
return
}
Expand Down Expand Up @@ -180,6 +184,7 @@ func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains []
}

inv := Investigation{
TenantID: tenant,
ID: id,
CreatedAt: now,
Status: "detected",
Expand All @@ -202,23 +207,30 @@ func (g *GraphRAG) PersistInvestigation(tenant, triggerService string, chains []
return
}
if err := g.repo.DB().Create(&inv).Error; err != nil {
slog.Error("Failed to persist investigation", "error", err)
slog.Error("Failed to persist investigation", "tenant", tenant, "error", err)
return
}

slog.Info("Investigation persisted", "id", id, "service", triggerService, "severity", severity)
slog.Info("Investigation persisted", "id", id, "tenant", tenant, "service", triggerService, "severity", severity)
}

// GetInvestigations queries persisted investigations.
func (g *GraphRAG) GetInvestigations(service, severity, status string, limit int) ([]Investigation, error) {
// GetInvestigations queries persisted investigations scoped to the tenant
// carried by ctx. The composite (tenant_id, created_at) index supports the
// recency-ordered scan.
func (g *GraphRAG) GetInvestigations(ctx context.Context, service, severity, status string, limit int) ([]Investigation, error) {
if limit <= 0 {
limit = 20
}
if limit > 100 {
limit = 100
}

db := g.repo.DB().Model(&Investigation{}).Order("created_at DESC").Limit(limit)
tenant := storage.TenantFromContext(ctx)
db := g.repo.DB().
Model(&Investigation{}).
Where("tenant_id = ?", tenant).
Order("created_at DESC").
Limit(limit)
if service != "" {
db = db.Where("trigger_service = ? OR root_service = ?", service, service)
}
Expand All @@ -236,10 +248,13 @@ func (g *GraphRAG) GetInvestigations(service, severity, status string, limit int
return investigations, nil
}

// GetInvestigation retrieves a single investigation by ID.
func (g *GraphRAG) GetInvestigation(id string) (*Investigation, error) {
// GetInvestigation retrieves a single investigation by ID, scoped to the
// tenant carried by ctx. Returning ErrRecordNotFound for cross-tenant lookups
// prevents id-guessing from leaking another tenant's row.
func (g *GraphRAG) GetInvestigation(ctx context.Context, id string) (*Investigation, error) {
tenant := storage.TenantFromContext(ctx)
var inv Investigation
if err := g.repo.DB().Where("id = ?", id).First(&inv).Error; err != nil {
if err := g.repo.DB().Where("tenant_id = ? AND id = ?", tenant, id).First(&inv).Error; err != nil {
return nil, err
}
return &inv, nil
Expand Down
28 changes: 20 additions & 8 deletions internal/graphrag/investigation_cooldown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,30 @@ func TestPersistInvestigation_Cooldown(t *testing.T) {
}

// TestCooldownKey_Canonical verifies the key normalizes case and trims
// whitespace so "Orders" / "orders " / "ORDERS" land in the same bucket.
// whitespace so "Orders" / "orders " / "ORDERS" land in the same bucket
// within a tenant.
func TestCooldownKey_Canonical(t *testing.T) {
cases := [][3]string{
{"orders", "orders", "op"},
{"Orders", "ORDERS", "op"},
{" orders ", "orders", " op "},
{"ORDERS", "Orders ", "OP"},
cases := [][4]string{
{"acme", "orders", "orders", "op"},
{"Acme", "Orders", "ORDERS", "op"},
{" acme ", " orders ", "orders", " op "},
{"ACME", "ORDERS", "Orders ", "OP"},
}
want := cooldownKey(cases[0][0], cases[0][1], cases[0][2])
want := cooldownKey(cases[0][0], cases[0][1], cases[0][2], cases[0][3])
for _, c := range cases[1:] {
if got := cooldownKey(c[0], c[1], c[2]); got != want {
if got := cooldownKey(c[0], c[1], c[2], c[3]); got != want {
t.Errorf("cooldownKey%v = %q, want %q", c, got, want)
}
}
}

// TestCooldownKey_TenantIsolated asserts that two tenants emitting the same
// (trigger, root, op) tuple produce distinct cooldown keys, so an error in
// tenant-A doesn't suppress the same pattern in tenant-B.
func TestCooldownKey_TenantIsolated(t *testing.T) {
a := cooldownKey("tenant-a", "orders", "orders", "op")
b := cooldownKey("tenant-b", "orders", "orders", "op")
if a == b {
t.Fatalf("tenant scoping missing: tenant-a and tenant-b share key %q", a)
}
}
Loading
Loading