Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions internal/api/graph_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"math"
"net/http"
"time"

"github.com/RandomCodeSpace/otelcontext/internal/storage"
)

// SystemSummary is the top-level system health summary.
Expand Down Expand Up @@ -63,10 +65,12 @@ var OtelContextStartTime = time.Now()
// handleGetSystemGraph handles GET /api/system/graph.
// When the in-memory graph has been populated it returns instantly from memory.
// Falls back to a DB query only when the graph has never been built yet.
// Results are additionally cached for 10s to smooth out burst traffic.
// Results are cached for 10s per tenant — the cache key is scoped by tenant
// so two tenants hitting this endpoint never share a response.
func (s *Server) handleGetSystemGraph(w http.ResponseWriter, r *http.Request) {
const cacheKey = "system_graph"
const cacheTTL = 10 * time.Second
ctx := r.Context()
cacheKey := "system_graph:" + storage.TenantFromContext(ctx)

if cached, ok := s.cache.Get(cacheKey); ok {
w.Header().Set("Content-Type", "application/json")
Expand All @@ -75,10 +79,10 @@ func (s *Server) handleGetSystemGraph(w http.ResponseWriter, r *http.Request) {
return
}

resp := s.buildGraphFromMemory()
resp := s.buildGraphFromMemory(ctx)
if resp == nil {
// Graph not yet hydrated — fall back to DB path.
resp = s.buildGraphFromDB(r.Context())
resp = s.buildGraphFromDB(ctx)
if resp == nil {
http.Error(w, "failed to build system graph", http.StatusInternalServerError)
return
Expand All @@ -93,10 +97,10 @@ func (s *Server) handleGetSystemGraph(w http.ResponseWriter, r *http.Request) {

// buildGraphFromMemory converts the in-memory graph snapshot to the API response.
// Returns nil if the graph has not been built yet.
func (s *Server) buildGraphFromMemory() *SystemGraphResponse {
func (s *Server) buildGraphFromMemory(ctx context.Context) *SystemGraphResponse {
// Prefer GraphRAG if available
if s.graphRAG != nil {
return s.buildGraphFromGraphRAG()
return s.buildGraphFromGraphRAG(ctx)
}
if s.graph == nil {
return nil
Expand Down Expand Up @@ -147,9 +151,10 @@ func (s *Server) buildGraphFromMemory() *SystemGraphResponse {
return buildSummaryResponse(nodes, edges, totalErrorRate, totalLatency)
}

// buildGraphFromGraphRAG converts the GraphRAG service store into the API response.
func (s *Server) buildGraphFromGraphRAG() *SystemGraphResponse {
services := s.graphRAG.ServiceMap(0)
// buildGraphFromGraphRAG converts the caller's tenant slice of the GraphRAG
// service store into the API response.
func (s *Server) buildGraphFromGraphRAG(ctx context.Context) *SystemGraphResponse {
services := s.graphRAG.ServiceMap(ctx, 0)
if len(services) == 0 {
return nil
}
Expand Down Expand Up @@ -179,7 +184,7 @@ func (s *Server) buildGraphFromGraphRAG() *SystemGraphResponse {
}

edges := make([]GraphEdge, 0)
allEdges := s.graphRAG.ServiceStore.AllEdges()
allEdges := s.graphRAG.AllServiceEdges(ctx)
for _, e := range allEdges {
if e.Type == "CALLS" {
edges = append(edges, GraphEdge{
Expand Down
82 changes: 49 additions & 33 deletions internal/graphrag/anomaly.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,23 @@
"context"
"fmt"
"time"

"github.com/RandomCodeSpace/otelcontext/internal/storage"
)

// detectAnomalies runs anomaly detection across all services.
func (g *GraphRAG) detectAnomalies() {
services := g.ServiceStore.AllServices()
// detectAnomalies runs anomaly detection across every known tenant. For each
// tenant slice we walk the ServiceStore and SignalStore under their own
// locks and emit anomalies into that tenant's AnomalyStore.
func (g *GraphRAG) detectAnomalies(ctx context.Context) {
tenants := g.snapshotTenants()
for tenant, stores := range tenants {
tctx := storage.WithTenantContext(ctx, tenant)
g.detectAnomaliesForTenant(tctx, tenant, stores)
}
}

func (g *GraphRAG) detectAnomaliesForTenant(ctx context.Context, tenant string, stores *tenantStores) {

Check failure on line 22 in internal/graphrag/anomaly.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 20 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=RandomCodeSpace_otelcontext&issues=AZ3BLRO6F9-dexYwA4z5&open=AZ3BLRO6F9-dexYwA4z5&pullRequest=27
services := stores.service.AllServices()
now := time.Now()

for _, svc := range services {
Expand All @@ -23,14 +35,14 @@
Evidence: fmt.Sprintf("error rate %.1f%% (baseline ~%.1f%%)", svc.ErrorRate*100, baselineErrorRate*100),
Timestamp: now,
}
g.AnomalyStore.AddAnomaly(anomaly)
g.correlateWithRecent(anomaly)
stores.anomalies.AddAnomaly(anomaly)
correlateWithRecent(stores, anomaly)

// Trigger investigation
chains := g.ErrorChain(svc.Name, now.Add(-5*time.Minute), 5)
// Trigger investigation (scoped to this tenant).
chains := g.ErrorChain(ctx, svc.Name, now.Add(-5*time.Minute), 5)
if len(chains) > 0 {
anomalies := g.AnomalyStore.AnomaliesForService(svc.Name, now.Add(-1*time.Minute))
g.PersistInvestigation(svc.Name, chains, anomalies)
anomalies := stores.anomalies.AnomaliesForService(svc.Name, now.Add(-1*time.Minute))
g.PersistInvestigation(tenant, svc.Name, chains, anomalies)
}
}

Expand All @@ -44,18 +56,18 @@
Evidence: fmt.Sprintf("avg latency %.0fms", svc.AvgLatency),
Timestamp: now,
}
g.AnomalyStore.AddAnomaly(anomaly)
g.correlateWithRecent(anomaly)
stores.anomalies.AddAnomaly(anomaly)
correlateWithRecent(stores, anomaly)
}
}

// Metric z-score anomalies (check metrics in SignalStore)
g.SignalStore.mu.RLock()
metrics := make([]*MetricNode, 0, len(g.SignalStore.Metrics))
for _, m := range g.SignalStore.Metrics {
// Metric z-score anomalies (check metrics in this tenant's SignalStore).
stores.signals.mu.RLock()
metrics := make([]*MetricNode, 0, len(stores.signals.Metrics))
for _, m := range stores.signals.Metrics {
metrics = append(metrics, m)
}
g.SignalStore.mu.RUnlock()
stores.signals.mu.RUnlock()

for _, m := range metrics {
if m.SampleCount < 10 {
Expand All @@ -74,23 +86,24 @@
Evidence: fmt.Sprintf("metric %s z-score %.1f (avg=%.2f, range=[%.2f, %.2f])", m.MetricName, deviation, m.RollingAvg, m.RollingMin, m.RollingMax),
Timestamp: now,
}
g.AnomalyStore.AddAnomaly(anomaly)
g.correlateWithRecent(anomaly)
stores.anomalies.AddAnomaly(anomaly)
correlateWithRecent(stores, anomaly)
}
}
}
}

// correlateWithRecent links an anomaly to other anomalies within ±30s.
func (g *GraphRAG) correlateWithRecent(anomaly AnomalyNode) {
// correlateWithRecent links an anomaly to other anomalies within ±30s in the
// same tenant's AnomalyStore.
func correlateWithRecent(stores *tenantStores, anomaly AnomalyNode) {
window := 30 * time.Second
recent := g.AnomalyStore.AnomaliesSince(anomaly.Timestamp.Add(-window))
recent := stores.anomalies.AnomaliesSince(anomaly.Timestamp.Add(-window))
for _, prev := range recent {
if prev.ID == anomaly.ID {
continue
}
if prev.Timestamp.After(anomaly.Timestamp.Add(-window)) && prev.Timestamp.Before(anomaly.Timestamp.Add(window)) {
g.AnomalyStore.AddPrecededByEdge(anomaly.ID, prev.ID, anomaly.Timestamp)
stores.anomalies.AddPrecededByEdge(anomaly.ID, prev.ID, anomaly.Timestamp)
}
}
}
Expand All @@ -117,20 +130,23 @@
}
}

// pruneOldAnomalies removes anomalies older than 24 hours.
// pruneOldAnomalies removes anomalies older than 24 hours across every
// tenant slice.
func (g *GraphRAG) pruneOldAnomalies() {
cutoff := time.Now().Add(-24 * time.Hour)
g.AnomalyStore.mu.Lock()
defer g.AnomalyStore.mu.Unlock()
for id, a := range g.AnomalyStore.Anomalies {
if a.Timestamp.Before(cutoff) {
delete(g.AnomalyStore.Anomalies, id)
for _, stores := range g.snapshotTenants() {
stores.anomalies.mu.Lock()
for id, a := range stores.anomalies.Anomalies {
if a.Timestamp.Before(cutoff) {
delete(stores.anomalies.Anomalies, id)
}
}
}
for ek, e := range g.AnomalyStore.Edges {
if e.UpdatedAt.Before(cutoff) {
delete(g.AnomalyStore.Edges, ek)
for ek, e := range stores.anomalies.Edges {
if e.UpdatedAt.Before(cutoff) {
delete(stores.anomalies.Edges, ek)
}
}
stores.anomalies.mu.Unlock()
}
}

Expand All @@ -145,7 +161,7 @@
case <-g.stopCh:
return
case <-ticker.C:
g.detectAnomalies()
g.detectAnomalies(ctx)
}
}
}
Loading
Loading