Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,8 @@ go.work.sum
*.stderr
*scheduled_tasks*
tmp/
data/
data/

/bin/
/loadsim
/otelcontext
25 changes: 19 additions & 6 deletions internal/graphrag/investigation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"log/slog"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -39,6 +40,16 @@ func (c *investigationCooldown) allow(key string, now time.Time) bool {
return true
}

// cooldownKey builds a case- and whitespace-insensitive key from the tuple
// (trigger_service, root_service, root_operation). Service names emitted
// from different instrumentations occasionally differ in casing or have
// trailing whitespace; canonicalizing here prevents those variants from
// bypassing the cooldown guard.
func cooldownKey(triggerService, rootService, rootOperation string) string {
norm := func(s string) string { return strings.ToLower(strings.TrimSpace(s)) }
return norm(triggerService) + "|" + norm(rootService) + "|" + norm(rootOperation)
}

// prune drops entries older than cutoff to bound map size. Called from
// the refresh tick.
func (c *investigationCooldown) prune(cutoff time.Time) {
Expand Down Expand Up @@ -91,19 +102,21 @@ func (g *GraphRAG) PersistInvestigation(triggerService string, chains []ErrorCha
return
}

now := time.Now()

// Cooldown: suppress repeat investigations for the same
// (trigger_service, root_service, root_operation) inside a sliding window.
// Without this guard, a stuck service produces one insert every anomaly
// tick (default 10s) indefinitely.
key := triggerService + "|" + firstChain.RootCause.Service + "|" + firstChain.RootCause.Operation
if g.invCooldown != nil && !g.invCooldown.allow(key, time.Now()) {
// Keys are canonicalized (lower + trim) so "Orders" and "orders " share a
// bucket — otherwise trivial casing differences would bypass the guard.
key := cooldownKey(triggerService, firstChain.RootCause.Service, firstChain.RootCause.Operation)
if g.invCooldown != nil && !g.invCooldown.allow(key, now) {
return
}
// Increment BEFORE db.Create so the counter reflects "cooldown allowed;
// persist attempted". See InvestigationInsertCount's doc comment.
g.invInserts.Add(1)

id := fmt.Sprintf("inv_%d", time.Now().UnixNano())
id := fmt.Sprintf("inv_%d", now.UnixNano())

severity := "warning"
if len(anomalies) > 0 {
Expand Down Expand Up @@ -160,7 +173,7 @@ func (g *GraphRAG) PersistInvestigation(triggerService string, chains []ErrorCha

inv := Investigation{
ID: id,
CreatedAt: time.Now(),
CreatedAt: now,
Status: "detected",
Severity: severity,
TriggerService: triggerService,
Expand Down
17 changes: 17 additions & 0 deletions internal/graphrag/investigation_cooldown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,20 @@ func TestPersistInvestigation_Cooldown(t *testing.T) {
t.Fatalf("distinct service should bypass cooldown; got %d, want > %d", third, second)
}
}

// TestCooldownKey_Canonical verifies the key normalizes case and trims
// whitespace so "Orders" / "orders " / "ORDERS" land in the same bucket.
func TestCooldownKey_Canonical(t *testing.T) {
cases := [][3]string{
{"orders", "orders", "op"},
{"Orders", "ORDERS", "op"},
{" orders ", "orders", " op "},
{"ORDERS", "Orders ", "OP"},
}
want := cooldownKey(cases[0][0], cases[0][1], cases[0][2])
for _, c := range cases[1:] {
if got := cooldownKey(c[0], c[1], c[2]); got != want {
t.Errorf("cooldownKey%v = %q, want %q", c, got, want)
}
}
}
8 changes: 6 additions & 2 deletions internal/graphrag/refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ func (g *GraphRAG) refreshLoop(ctx context.Context) {
slog.Debug("GraphRAG pruned expired traces/spans", "count", pruned)
}
g.pruneOldAnomalies()
// Bound the investigation cooldown map. 2× window keeps
// entries through the active suppression plus a grace period.
// Bound the investigation cooldown map. The 10m cutoff is 2×
// the cooldown window (5m) — it retains entries through the
// active suppression plus a grace period. This assumes the
// refresh tick runs at least every 10 minutes; if RefreshEvery
// grows larger, raise the cutoff in lockstep, otherwise a stuck
// service could bypass the cooldown between prunes.
if g.invCooldown != nil {
g.invCooldown.prune(time.Now().Add(-10 * time.Minute))
}
Expand Down
13 changes: 0 additions & 13 deletions internal/storage/metrics_p99_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package storage

import (
"context"
"math"
"testing"
"time"

Expand Down Expand Up @@ -222,18 +221,6 @@ func p99Itoa(n int) string {
return string(buf[pos:])
}

// verifyP99Index is the reference formula used in assertions.
func verifyP99Index(n int) int {
idx := int(math.Ceil(float64(n)*0.99)) - 1
if idx < 0 {
idx = 0
}
if idx >= n {
idx = n - 1
}
return idx
}

// ---------------------------------------------------------------------------
// Critical 2: verify MySQL branch preserves tenant filter
// ---------------------------------------------------------------------------
Expand Down
Loading