Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 142 additions & 82 deletions controller/internal/probe/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,17 @@ type HealthVector struct {

// ProbeMetrics holds raw metrics for a single probe direction
type ProbeMetrics struct {
AvgLatency float64 `json:"avg_latency"` // ms
MedianLatency float64 `json:"median_latency"` // ms
P95Latency float64 `json:"p95_latency"` // ms
P99Latency float64 `json:"p99_latency"` // ms
PacketLoss float64 `json:"packet_loss"` // percentage
JitterAvg float64 `json:"jitter_avg"` // ms (stddev)
JitterMedian float64 `json:"jitter_median"` // ms
JitterP95 float64 `json:"jitter_p95"` // ms
SampleCount int `json:"sample_count"`
AvgLatency float64 `json:"avg_latency"` // ms
MedianLatency float64 `json:"median_latency"` // ms
P95Latency float64 `json:"p95_latency"` // ms
P99Latency float64 `json:"p99_latency"` // ms
PacketLoss float64 `json:"packet_loss"` // percentage
JitterAvg float64 `json:"jitter_avg"` // ms (stddev)
JitterMedian float64 `json:"jitter_median"` // ms
JitterP95 float64 `json:"jitter_p95"` // ms
OutOfOrderPercent float64 `json:"out_of_order_percent"` // percentage of out-of-order packets
DuplicatePercent float64 `json:"duplicate_percent"` // percentage of duplicate packets
SampleCount int `json:"sample_count"`
}

// AnalysisSignal represents a detected signal (anomaly, artifact, etc.)
Expand Down Expand Up @@ -209,14 +211,40 @@ type WorkspaceAnalysis struct {
// ── Scoring Functions ──

// scoreLatency converts avg latency (ms) into 0-100 score
func scoreLatency(avgMs, p95Ms, jitterMs float64) float64 {
// Composite: 50% avg, 30% p95, 20% jitter
// jitterP95 is used for worst-case jitter scoring (P95 more representative than average)
func scoreLatency(avgMs, p95Ms, jitterAvg, jitterP95 float64) float64 {
// Composite: 50% avg, 30% p95, 20% jitter (using P95 jitter for worst-case)
avgScore := latencyToScore(avgMs)
p95Score := latencyToScore(p95Ms)
jitterScore := jitterToScore(jitterMs)
// Blend avg jitter and P95 jitter: P95 jitter weighted higher (60%) as it's worst-case
jitterScore := jitterToScore(jitterAvg, jitterP95)
return clampScore(avgScore*0.5 + p95Score*0.3 + jitterScore*0.2)
}

// jitterToScore scores jitter using both average and P95 jitter values.
// P95 jitter is weighted higher (60%) since it represents worst-case spikes.
// Returns 0-100 score where 100 is best (no jitter).
func jitterToScore(jitterAvg, jitterP95 float64) float64 {
// Use weighted blend: 40% average, 60% P95 (P95 captures worst-case)
blendedJitter := jitterAvg*0.4 + jitterP95*0.6

if blendedJitter <= 0 {
return 100
}
switch {
case blendedJitter < 5:
return 100
case blendedJitter < 15:
return 90 - ((blendedJitter-5)/10)*10
case blendedJitter < 30:
return 80 - ((blendedJitter-15)/15)*20
case blendedJitter < 50:
return 60 - ((blendedJitter-30)/20)*20
default:
return math.Max(0, 40-((blendedJitter-50)/50)*40)
}
}

func latencyToScore(ms float64) float64 {
if ms <= 0 {
return 100
Expand All @@ -235,22 +263,41 @@ func latencyToScore(ms float64) float64 {
}
}

func jitterToScore(ms float64) float64 {
if ms <= 0 {
return 100
}
switch {
case ms < 5:
return 100
case ms < 15:
return 90 - ((ms-5)/10)*10
case ms < 30:
return 80 - ((ms-15)/15)*20
case ms < 50:
return 60 - ((ms-30)/20)*20
default:
return math.Max(0, 40-((ms-50)/50)*40)
}
// scoreReliability scores packet ordering and duplication metrics.
// High out-of-order or duplicate packet rates indicate network issues.
// Returns 0-100 score where 100 is perfect ordering.
func scoreReliability(outOfOrderPct, duplicatePct float64) float64 {
// Out-of-order packets are more harmful than duplicates (reassembly issues)
// Weight: 65% out-of-order, 35% duplicates
ooScore := 100.0
if outOfOrderPct > 0 {
switch {
case outOfOrderPct < 0.5:
ooScore = 100 - outOfOrderPct*4 // 100-98
case outOfOrderPct < 2:
ooScore = 98 - ((outOfOrderPct-0.5)/1.5)*18 // 98-80
case outOfOrderPct < 5:
ooScore = 80 - ((outOfOrderPct-2)/3)*30 // 80-50
default:
ooScore = math.Max(0, 50-((outOfOrderPct-5)/10)*50) // 50-0
}
}

dupScore := 100.0
if duplicatePct > 0 {
switch {
case duplicatePct < 1:
dupScore = 100 - duplicatePct*2 // 100-98
case duplicatePct < 5:
dupScore = 98 - ((duplicatePct-1)/4)*18 // 98-80
case duplicatePct < 10:
dupScore = 80 - ((duplicatePct-5)/5)*20 // 80-60
default:
dupScore = math.Max(0, 60-((duplicatePct-10)/10)*60) // 60-0
}
}

return clampScore(ooScore*0.65 + dupScore*0.35)
}

// scorePacketLoss converts loss % into 0-100 score
Expand Down Expand Up @@ -332,14 +379,24 @@ func clampScore(s float64) float64 {
}

// computeHealthVector builds a HealthVector from raw metrics
func computeHealthVector(metrics ProbeMetrics, routeStability float64) HealthVector {
latScore := scoreLatency(metrics.AvgLatency, metrics.P95Latency, metrics.JitterAvg)
// reverseHealth is an optional health vector from the reverse direction probe.
// When set and below threshold, a penalty is applied to overall health.
func computeHealthVector(metrics ProbeMetrics, routeStability float64, reverseHealth *HealthVector) HealthVector {
latScore := scoreLatency(metrics.AvgLatency, metrics.P95Latency, metrics.JitterAvg, metrics.JitterP95)
lossScore := scorePacketLoss(metrics.PacketLoss)
relScore := scoreReliability(metrics.OutOfOrderPercent, metrics.DuplicatePercent)
mos := computeMos(metrics.AvgLatency, metrics.PacketLoss, metrics.JitterAvg)

// Weighted composite: 30% latency, 35% loss, 15% route stability, 20% MOS-derived
// Weighted composite: 30% latency, 30% loss, 15% route stability, 15% MOS-derived, 10% reliability
mosScore := (mos - 1.0) / 3.5 * 100 // Normalize MOS 1-4.5 to 0-100
overall := clampScore(latScore*0.30 + lossScore*0.35 + routeStability*0.15 + mosScore*0.20)
overall := clampScore(latScore*0.30 + lossScore*0.30 + routeStability*0.15 + mosScore*0.15 + relScore*0.10)

// Reverse probe penalty: if reverse direction has poor health, reduce overall by up to 15%
if reverseHealth != nil && reverseHealth.OverallHealth < 60 {
// Penalty scales from 0% at 60 health to 15% at 0 health
penaltyFactor := (60 - reverseHealth.OverallHealth) / 60 * 0.15
overall = clampScore(overall * (1.0 - penaltyFactor))
}

return HealthVector{
LatencyScore: clampScore(latScore),
Expand Down Expand Up @@ -997,8 +1054,50 @@ func ComputeProbeAnalysis(ctx context.Context, ch *sql.DB, pg *gorm.DB, workspac
routeStability = pathAnalysis.RouteStabilityPct
}

// Compute health vector
health := computeHealthVector(metrics, routeStability)
// Check for reverse/reciprocal probe first — needed for health vector computation
var reverseProbe *ProbeAnalysis
var reverseHealth *HealthVector
if targetAgentID > 0 {
reverseProbes, err := findReverseAgentProbes(ctx, pg, p.AgentID)
if err == nil {
for _, rp := range reverseProbes {
if rp.AgentID == targetAgentID {
// Found the reverse probe — compute its analysis using target agent data only
revMetrics, _ := probeAnalysisMetrics(ctx, ch, []uint{targetAgentID}, rp.ID, from)
revPath, revSignals, _ := analyzeMtrForProbe(ctx, ch, []uint{targetAgentID}, rp.ID, from, agentIPToID, agentByID)
revRouteStab := 100.0
if revPath != nil {
revRouteStab = revPath.RouteStabilityPct
}
revHealth := computeHealthVector(revMetrics, revRouteStab, nil)

revAgentName := ""
if a, ok := agentByID[targetAgentID]; ok {
revAgentName = a.Name
}

reverseProbe = &ProbeAnalysis{
ProbeID: rp.ID,
ProbeType: string(rp.Type),
Target: agentName, // reverse target is the original agent
AgentID: targetAgentID,
AgentName: revAgentName,
Health: revHealth,
Metrics: revMetrics,
PathAnalysis: revPath,
Signals: revSignals,
Findings: buildFindings(revHealth, revMetrics, revPath, revSignals),
GeneratedAt: time.Now().UTC(),
}
reverseHealth = &revHealth
break
}
}
}
}

// Compute health vector (now includes reverse probe penalty)
health := computeHealthVector(metrics, routeStability, reverseHealth)

// Build combined signals
var signals []AnalysisSignal
Expand Down Expand Up @@ -1060,46 +1159,7 @@ func ComputeProbeAnalysis(ctx context.Context, ch *sql.DB, pg *gorm.DB, workspac
Findings: findings,
GeneratedAt: time.Now().UTC(),
}

// Check for reverse/reciprocal probe
if targetAgentID > 0 {
reverseProbes, err := findReverseAgentProbes(ctx, pg, p.AgentID)
if err == nil {
for _, rp := range reverseProbes {
if rp.AgentID == targetAgentID {
// Found the reverse probe — compute its analysis using target agent data only
revMetrics, _ := probeAnalysisMetrics(ctx, ch, []uint{targetAgentID}, rp.ID, from)
revPath, revSignals, _ := analyzeMtrForProbe(ctx, ch, []uint{targetAgentID}, rp.ID, from, agentIPToID, agentByID)
revRouteStab := 100.0
if revPath != nil {
revRouteStab = revPath.RouteStabilityPct
}
revHealth := computeHealthVector(revMetrics, revRouteStab)
revFindings := buildFindings(revHealth, revMetrics, revPath, revSignals)

revAgentName := ""
if a, ok := agentByID[targetAgentID]; ok {
revAgentName = a.Name
}

result.Reverse = &ProbeAnalysis{
ProbeID: rp.ID,
ProbeType: string(rp.Type),
Target: agentName, // reverse target is the original agent
AgentID: targetAgentID,
AgentName: revAgentName,
Health: revHealth,
Metrics: revMetrics,
PathAnalysis: revPath,
Signals: revSignals,
Findings: revFindings,
GeneratedAt: time.Now().UTC(),
}
break
}
}
}
}
result.Reverse = reverseProbe

return result, nil
}
Expand Down Expand Up @@ -1172,7 +1232,7 @@ func ComputeWorkspaceAnalysis(ctx context.Context, ch *sql.DB, pg *gorm.DB, work
PacketLoss: stats.PacketLoss,
SampleCount: stats.Count,
}
h := computeHealthVector(m, 100)
h := computeHealthVector(m, 100, nil)
probeEntries = append(probeEntries, ProbeHealthEntry{
Target: stripPort(target),
ProbeType: "PING",
Expand All @@ -1195,7 +1255,7 @@ func ComputeWorkspaceAnalysis(ctx context.Context, ch *sql.DB, pg *gorm.DB, work
JitterAvg: stats.Jitter,
SampleCount: stats.Count,
}
h := computeHealthVector(m, 100)
h := computeHealthVector(m, 100, nil)
probeEntries = append(probeEntries, ProbeHealthEntry{
Target: stripPort(target),
ProbeType: "MTR",
Expand All @@ -1218,7 +1278,7 @@ func ComputeWorkspaceAnalysis(ctx context.Context, ch *sql.DB, pg *gorm.DB, work
PacketLoss: stats.PacketLoss,
SampleCount: stats.Count,
}
h := computeHealthVector(m, 100)
h := computeHealthVector(m, 100, nil)
probeEntries = append(probeEntries, ProbeHealthEntry{
Target: stripPort(target),
ProbeType: "TRAFFICSIM",
Expand Down Expand Up @@ -1260,7 +1320,7 @@ func ComputeWorkspaceAnalysis(ctx context.Context, ch *sql.DB, pg *gorm.DB, work
PacketLoss: avgLossVal,
JitterAvg: avgJitterAvgVal,
}
agentHealth = computeHealthVector(agentMetrics, 100)
agentHealth = computeHealthVector(agentMetrics, 100, nil)
} else {
dataGap = true
agentHealth = HealthVector{
Expand Down Expand Up @@ -4013,14 +4073,14 @@ func ComputeAgentVoiceQuality(ctx context.Context, db *gorm.DB, ch *sql.DB, agen
var latencyScore, jitterScore, packetLossScore float64
count := 0
if bestForward != nil {
latencyScore += scoreLatency(bestForward.AvgLatency, bestForward.P95Latency, bestForward.JitterAvg)
jitterScore += jitterToScore(bestForward.JitterAvg)
latencyScore += scoreLatency(bestForward.AvgLatency, bestForward.P95Latency, bestForward.JitterAvg, bestForward.JitterP95)
jitterScore += jitterToScore(bestForward.JitterAvg, bestForward.JitterP95)
packetLossScore += scorePacketLoss(bestForward.PacketLoss)
count++
}
if bestReturn != nil {
latencyScore += scoreLatency(bestReturn.AvgLatency, bestReturn.P95Latency, bestReturn.JitterAvg)
jitterScore += jitterToScore(bestReturn.JitterAvg)
latencyScore += scoreLatency(bestReturn.AvgLatency, bestReturn.P95Latency, bestReturn.JitterAvg, bestReturn.JitterP95)
jitterScore += jitterToScore(bestReturn.JitterAvg, bestReturn.JitterP95)
packetLossScore += scorePacketLoss(bestReturn.PacketLoss)
count++
}
Expand Down
Loading