From a29a61ee8758a316130218cefaeeeb5b178ac033 Mon Sep 17 00:00:00 2001 From: Scot Wells Date: Thu, 25 Jun 2026 09:36:42 -0500 Subject: [PATCH] fix: make DLQ retry outcome-aware so failures stop reporting success The dead-letter retry worker republished raw bytes and counted the NATS-publish ACK as success without ever re-running CEL. Persistent policy-evaluation failures looked resolved, the events bounced back into ingest, failed identically, and were eventually dedup-dropped, so activities for those audit events were silently never generated. Key changes: - Re-run audit policy evaluation in place (via the existing EvaluateCompiledAuditRules path) before treating a retry as resolved. On eval failure the event is counted as a real failure and its retry count is advanced through updateAndRepublishMetadata instead of being republished into the dedup window; only resolved events are republished. - Split the misleading attempts metric: re-evaluated successes count as "succeeded", events with no evaluator count as "republished", and the new activity_processor_dlq_retry_failed_total{api_group,kind,policy_name, error_type} counter surfaces genuine persistent loss. - Fix the activity-processor alerts that referenced the non-existent activity_processor_audit_events_{received,errored}_total metrics; point them at activity_processor_events_{received,errored}_total{source="audit_log"}. - Reuse a shared classifyEvaluationError helper across the ingest and retry paths. Deferred (noted in PR): event-keyed KV ledger, terminal poison-pill parked stream, and the production-side ActivityPolicy has() guards (those policies live outside this repo). Claude-Session: https://claude.ai/code/session_01KnYuL5Pf1R5ysZoxxNkKiu --- .../observability/alerts/activity-alerts.yaml | 6 +- internal/activityprocessor/dlq_reeval_test.go | 85 ++++++++++++++ internal/activityprocessor/dlq_retry.go | 109 +++++++++++++++++- internal/activityprocessor/processor.go | 58 ++++++++-- 4 files changed, 239 insertions(+), 19 deletions(-) create mode 100644 internal/activityprocessor/dlq_reeval_test.go diff --git a/config/components/observability/alerts/activity-alerts.yaml b/config/components/observability/alerts/activity-alerts.yaml index d58e38ab..7dc6dc00 100644 --- a/config/components/observability/alerts/activity-alerts.yaml +++ b/config/components/observability/alerts/activity-alerts.yaml @@ -235,7 +235,7 @@ spec: expr: | rate(activity_processor_activities_generated_total[1h]) == 0 AND - rate(activity_processor_audit_events_received_total[1h]) > 0 + rate(activity_processor_events_received_total{source="audit_log"}[1h]) > 0 for: 30m labels: severity: warning @@ -248,9 +248,9 @@ spec: # Error Rate Threshold - alert: ActivityProcessorHighErrorRate expr: | - sum(rate(activity_processor_audit_events_errored_total[5m])) + sum(rate(activity_processor_events_errored_total{source="audit_log"}[5m])) / - sum(rate(activity_processor_audit_events_received_total[5m])) + sum(rate(activity_processor_events_received_total{source="audit_log"}[5m])) > 0.05 for: 10m labels: diff --git a/internal/activityprocessor/dlq_reeval_test.go b/internal/activityprocessor/dlq_reeval_test.go new file mode 100644 index 00000000..958a6060 --- /dev/null +++ b/internal/activityprocessor/dlq_reeval_test.go @@ -0,0 +1,85 @@ +package activityprocessor + +import ( + "context" + "encoding/json" + "fmt" + "testing" + + "go.miloapis.com/activity/internal/processor" +) + +func TestClassifyEvaluationError(t *testing.T) { + tests := []struct { + name string + err error + ruleIndex int + want processor.ErrorType + }{ + { + name: "no rule index is a match error", + err: fmt.Errorf("boom"), + ruleIndex: -1, + want: processor.ErrorTypeCELMatch, + }, + { + name: "kind resolution failure", + err: fmt.Errorf("wrap: %w", processor.ErrKindResolution), + ruleIndex: 0, + want: processor.ErrorTypeKindResolve, + }, + { + name: "activity build failure resolves to kind", + err: fmt.Errorf("wrap: %w", processor.ErrActivityBuild), + ruleIndex: 2, + want: processor.ErrorTypeKindResolve, + }, + { + name: "summary error with rule index", + err: fmt.Errorf("rule 0 summary: no such key: name"), + ruleIndex: 0, + want: processor.ErrorTypeCELSummary, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := classifyEvaluationError(tt.err, tt.ruleIndex); got != tt.want { + t.Errorf("classifyEvaluationError() = %q, want %q", got, tt.want) + } + }) + } +} + +func TestReEvaluateDeadLetter_UnmarshalError(t *testing.T) { + p := &Processor{policyCache: NewPolicyCache()} + + outcome := p.reEvaluateDeadLetter(context.Background(), &processor.DeadLetterEvent{ + Type: processor.EventTypeAudit, + OriginalPayload: json.RawMessage(`{not json`), + }) + + if outcome.Resolved { + t.Fatal("expected unresolved outcome for unparseable payload") + } + if outcome.ErrorType != processor.ErrorTypeUnmarshal { + t.Errorf("ErrorType = %q, want %q", outcome.ErrorType, processor.ErrorTypeUnmarshal) + } + if outcome.Err == nil { + t.Error("expected an error to be returned") + } +} + +func TestReEvaluateDeadLetter_NoPolicyResolves(t *testing.T) { + p := &Processor{policyCache: NewPolicyCache()} + + payload := []byte(`{"objectRef":{"apiGroup":"example.com","resource":"widgets","name":"w1"}}`) + outcome := p.reEvaluateDeadLetter(context.Background(), &processor.DeadLetterEvent{ + Type: processor.EventTypeAudit, + OriginalPayload: payload, + }) + + if !outcome.Resolved { + t.Errorf("expected resolved outcome when no policy targets the resource, got err=%v", outcome.Err) + } +} diff --git a/internal/activityprocessor/dlq_retry.go b/internal/activityprocessor/dlq_retry.go index 8cf671b2..434e10a4 100644 --- a/internal/activityprocessor/dlq_retry.go +++ b/internal/activityprocessor/dlq_retry.go @@ -51,6 +51,16 @@ var ( }, []string{"api_group", "kind", "policy_name"}, ) + + dlqRetryFailedTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "activity_processor", + Subsystem: "dlq_retry", + Name: "failed_total", + Help: "Total number of DLQ retries that failed re-evaluation and produced no activity", + }, + []string{"api_group", "kind", "policy_name", "error_type"}, + ) ) func init() { @@ -58,6 +68,7 @@ func init() { dlqRetryAttemptsTotal, dlqRetryBatchDuration, dlqEventsHighRetryTotal, + dlqRetryFailedTotal, ) } @@ -98,11 +109,32 @@ func DefaultDLQRetryConfig() DLQRetryConfig { } } +// RetryOutcome describes the result of re-evaluating a dead-letter event in place. +type RetryOutcome struct { + // Resolved is true when re-evaluation succeeded (no error). Only resolved + // events are republished onto the source stream to generate an activity. + Resolved bool + // ErrorType classifies the failure when Resolved is false, for metrics. + ErrorType processor.ErrorType + // Err is the underlying re-evaluation error when Resolved is false. + Err error +} + +// RetryEvaluator re-runs policy evaluation against a dead-letter event's original +// payload so the retry worker can tell "republished to NATS" apart from +// "successfully processed". A nil evaluator falls back to publish-ACK accounting. +type RetryEvaluator func(ctx context.Context, event *processor.DeadLetterEvent) RetryOutcome + // DLQRetryController manages automatic retry of dead-letter queue events. type DLQRetryController struct { js nats.JetStreamContext config DLQRetryConfig + // evaluator re-runs policy evaluation in place before a retry is treated as + // resolved. When nil, the controller cannot observe evaluation outcomes and + // only reports that events were republished. + evaluator RetryEvaluator + auditStreamName string eventStreamName string dlqStreamName string @@ -117,7 +149,8 @@ type DLQRetryController struct { activeRetriesMu sync.Mutex } -// NewDLQRetryController creates a new DLQ retry controller. +// NewDLQRetryController creates a new DLQ retry controller. evaluator may be nil, +// in which case retries are reported as "republished" rather than "succeeded". func NewDLQRetryController( js nats.JetStreamContext, config DLQRetryConfig, @@ -125,10 +158,12 @@ func NewDLQRetryController( eventStreamName string, dlqStreamName string, dlqSubjectPrefix string, + evaluator RetryEvaluator, ) *DLQRetryController { return &DLQRetryController{ js: js, config: config, + evaluator: evaluator, auditStreamName: auditStreamName, eventStreamName: eventStreamName, dlqStreamName: dlqStreamName, @@ -393,7 +428,21 @@ func (c *DLQRetryController) processRetryBatch(ctx context.Context, trigger stri } } - // Attempt to republish + // Re-evaluate the event in place before treating the retry as resolved. + // A NATS-publish ACK only means the republish was accepted, not that + // policy evaluation passed, so a re-failing event must count as a real + // failure and keep accumulating its retry count rather than resetting. + if c.evaluator != nil && dlEvent.Type == processor.EventTypeAudit { + outcome := c.evaluator(ctx, &dlEvent) + if !outcome.Resolved { + c.handleReEvaluationFailure(ctx, msg, &dlEvent, now, trigger, apiGroup, kind, outcome) + failed++ + continue + } + } + + // Evaluation passed (or no evaluator wired): republish so the event + // re-enters the ingest path and produces an activity. if err := c.republishEvent(ctx, &dlEvent); err != nil { klog.ErrorS(err, "Failed to republish DLQ event", "eventType", dlEvent.Type, @@ -418,23 +467,73 @@ func (c *DLQRetryController) processRetryBatch(ctx context.Context, trigger stri continue } - // Success - ack the DLQ message + // Republish accepted - ack the DLQ message. if ackErr := msg.Ack(); ackErr != nil { klog.ErrorS(ackErr, "Failed to ack successfully retried DLQ message") } - dlqRetryAttemptsTotal.WithLabelValues(trigger, apiGroup, kind, "succeeded").Inc() succeeded++ - klog.V(2).InfoS("Successfully retried DLQ event", + // "succeeded" means evaluation passed and an activity will be generated; + // "republished" means we put the event back without observing the + // outcome (no evaluator for this event type). + result := "republished" + if c.evaluator != nil && dlEvent.Type == processor.EventTypeAudit { + result = "succeeded" + } + dlqRetryAttemptsTotal.WithLabelValues(trigger, apiGroup, kind, result).Inc() + + klog.V(2).InfoS("Retried DLQ event", "eventType", dlEvent.Type, "policy", dlEvent.PolicyName, "retryCount", dlEvent.RetryCount, + "result", result, ) } return processed, succeeded, failed } +// handleReEvaluationFailure records a re-failing dead-letter event as a genuine +// failure: it counts the failure, advances the retry count via the DLQ metadata +// loop, and avoids republishing into the dedup window where the event would be +// silently dropped. On metadata-update failure the message is NAKed to preserve it. +func (c *DLQRetryController) handleReEvaluationFailure( + ctx context.Context, + msg *nats.Msg, + event *processor.DeadLetterEvent, + now time.Time, + trigger, apiGroup, kind string, + outcome RetryOutcome, +) { + errorType := outcome.ErrorType + if errorType == "" { + errorType = processor.ErrorTypeCELSummary + } + + klog.ErrorS(outcome.Err, "DLQ event re-failed evaluation", + "eventType", event.Type, + "policy", event.PolicyName, + "errorType", errorType, + "retryCount", event.RetryCount, + ) + + dlqRetryAttemptsTotal.WithLabelValues(trigger, apiGroup, kind, "failed").Inc() + dlqRetryFailedTotal.WithLabelValues(apiGroup, kind, event.PolicyName, string(errorType)).Inc() + + // Advance the retry count instead of republishing into ingest, so the + // failure is durably tracked and high-retry alerting can fire. + if err := c.updateAndRepublishMetadata(ctx, event, now); err != nil { + klog.ErrorS(err, "Failed to update retry metadata after re-failure, NAKing to preserve event") + if nakErr := msg.Nak(); nakErr != nil { + klog.ErrorS(nakErr, "Failed to NAK DLQ message after metadata update failure") + } + return + } + if ackErr := msg.Ack(); ackErr != nil { + klog.ErrorS(ackErr, "Failed to ack DLQ message after re-failure metadata update") + } +} + // isEligibleForRetry checks if an event's backoff has expired. func (c *DLQRetryController) isEligibleForRetry(event *processor.DeadLetterEvent, now time.Time) bool { // First retry is always eligible diff --git a/internal/activityprocessor/processor.go b/internal/activityprocessor/processor.go index 22a3a154..4e2f793b 100644 --- a/internal/activityprocessor/processor.go +++ b/internal/activityprocessor/processor.go @@ -516,6 +516,7 @@ func (p *Processor) Start(ctx context.Context) error { p.config.NATSEventStream, dlqConfig.StreamName, dlqConfig.SubjectPrefix, + p.reEvaluateDeadLetter, ) p.wg.Add(1) @@ -951,17 +952,7 @@ func (p *Processor) processMessage(msg *nats.Msg) error { p.eventEmitter.EmitEvaluationError(p.ctx, policy.Name, ruleIndex, err) // Classify error type for DLQ using sentinel errors - errorType := processor.ErrorTypeCELMatch - if ruleIndex >= 0 { - // If we have a rule index, check if it's a kind resolution or summary error - if errors.Is(err, processor.ErrKindResolution) { - errorType = processor.ErrorTypeKindResolve - } else if errors.Is(err, processor.ErrActivityBuild) { - errorType = processor.ErrorTypeKindResolve // Activity build errors are typically kind resolution - } else { - errorType = processor.ErrorTypeCELSummary - } - } + errorType := classifyEvaluationError(err, ruleIndex) // Publish to DLQ policyVersion := int64(0) @@ -1037,6 +1028,51 @@ func (p *Processor) evaluateCompiledAuditRules(policy *CompiledPolicy, auditMap return EvaluateCompiledAuditRules(policy, auditMap, audit, p.resourceToKind) } +// reEvaluateDeadLetter re-runs audit policy evaluation against a dead-letter +// event's original payload so the DLQ retry worker can distinguish a genuinely +// resolved event from one that merely re-publishes and fails again. It mirrors +// the ingest evaluation path (auditToMap + EvaluateCompiledAuditRules) without +// emitting an activity; the retry worker republishes resolved events itself. +func (p *Processor) reEvaluateDeadLetter(_ context.Context, event *processor.DeadLetterEvent) RetryOutcome { + var audit auditv1.Event + if err := json.Unmarshal(event.OriginalPayload, &audit); err != nil { + return RetryOutcome{ErrorType: processor.ErrorTypeUnmarshal, Err: err} + } + + auditMap, err := auditToMap(&audit) + if err != nil { + return RetryOutcome{ErrorType: processor.ErrorTypeUnmarshal, Err: err} + } + + policies := p.policyCache.Get(audit.ObjectRef.APIGroup, audit.ObjectRef.Resource) + if len(policies) == 0 { + // No policy still targets this resource; nothing left to evaluate, so the + // event can leave the DLQ rather than retry forever. + return RetryOutcome{Resolved: true} + } + + for _, policy := range policies { + _, ruleIndex, err := p.evaluateCompiledAuditRules(policy, auditMap, &audit) + if err != nil { + return RetryOutcome{ErrorType: classifyEvaluationError(err, ruleIndex), Err: err} + } + } + + return RetryOutcome{Resolved: true} +} + +// classifyEvaluationError maps an audit evaluation error to a DLQ error type, +// matching the classification used on the ingest path. +func classifyEvaluationError(err error, ruleIndex int) processor.ErrorType { + if ruleIndex < 0 { + return processor.ErrorTypeCELMatch + } + if errors.Is(err, processor.ErrKindResolution) || errors.Is(err, processor.ErrActivityBuild) { + return processor.ErrorTypeKindResolve + } + return processor.ErrorTypeCELSummary +} + // auditToMap converts an audit event to a map for CEL evaluation. func auditToMap(audit *auditv1.Event) (map[string]any, error) { data, err := json.Marshal(audit)