Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions config/components/observability/alerts/activity-alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ spec:
expr: |
rate(activity_processor_activities_generated_total[1h]) == 0
AND
rate(activity_processor_audit_events_received_total[1h]) > 0
rate(activity_processor_events_received_total{source="audit_log"}[1h]) > 0
for: 30m
labels:
severity: warning
Expand All @@ -248,9 +248,9 @@ spec:
# Error Rate Threshold
- alert: ActivityProcessorHighErrorRate
expr: |
sum(rate(activity_processor_audit_events_errored_total[5m]))
sum(rate(activity_processor_events_errored_total{source="audit_log"}[5m]))
/
sum(rate(activity_processor_audit_events_received_total[5m]))
sum(rate(activity_processor_events_received_total{source="audit_log"}[5m]))
> 0.05
for: 10m
labels:
Expand Down
85 changes: 85 additions & 0 deletions internal/activityprocessor/dlq_reeval_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package activityprocessor

import (
"context"
"encoding/json"
"fmt"
"testing"

"go.miloapis.com/activity/internal/processor"
)

func TestClassifyEvaluationError(t *testing.T) {
tests := []struct {
name string
err error
ruleIndex int
want processor.ErrorType
}{
{
name: "no rule index is a match error",
err: fmt.Errorf("boom"),
ruleIndex: -1,
want: processor.ErrorTypeCELMatch,
},
{
name: "kind resolution failure",
err: fmt.Errorf("wrap: %w", processor.ErrKindResolution),
ruleIndex: 0,
want: processor.ErrorTypeKindResolve,
},
{
name: "activity build failure resolves to kind",
err: fmt.Errorf("wrap: %w", processor.ErrActivityBuild),
ruleIndex: 2,
want: processor.ErrorTypeKindResolve,
},
{
name: "summary error with rule index",
err: fmt.Errorf("rule 0 summary: no such key: name"),
ruleIndex: 0,
want: processor.ErrorTypeCELSummary,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := classifyEvaluationError(tt.err, tt.ruleIndex); got != tt.want {
t.Errorf("classifyEvaluationError() = %q, want %q", got, tt.want)
}
})
}
}

func TestReEvaluateDeadLetter_UnmarshalError(t *testing.T) {
p := &Processor{policyCache: NewPolicyCache()}

outcome := p.reEvaluateDeadLetter(context.Background(), &processor.DeadLetterEvent{
Type: processor.EventTypeAudit,
OriginalPayload: json.RawMessage(`{not json`),
})

if outcome.Resolved {
t.Fatal("expected unresolved outcome for unparseable payload")
}
if outcome.ErrorType != processor.ErrorTypeUnmarshal {
t.Errorf("ErrorType = %q, want %q", outcome.ErrorType, processor.ErrorTypeUnmarshal)
}
if outcome.Err == nil {
t.Error("expected an error to be returned")
}
}

func TestReEvaluateDeadLetter_NoPolicyResolves(t *testing.T) {
p := &Processor{policyCache: NewPolicyCache()}

payload := []byte(`{"objectRef":{"apiGroup":"example.com","resource":"widgets","name":"w1"}}`)
outcome := p.reEvaluateDeadLetter(context.Background(), &processor.DeadLetterEvent{
Type: processor.EventTypeAudit,
OriginalPayload: payload,
})

if !outcome.Resolved {
t.Errorf("expected resolved outcome when no policy targets the resource, got err=%v", outcome.Err)
}
}
109 changes: 104 additions & 5 deletions internal/activityprocessor/dlq_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,24 @@ var (
},
[]string{"api_group", "kind", "policy_name"},
)

dlqRetryFailedTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "activity_processor",
Subsystem: "dlq_retry",
Name: "failed_total",
Help: "Total number of DLQ retries that failed re-evaluation and produced no activity",
},
[]string{"api_group", "kind", "policy_name", "error_type"},
)
)

func init() {
metrics.Registry.MustRegister(
dlqRetryAttemptsTotal,
dlqRetryBatchDuration,
dlqEventsHighRetryTotal,
dlqRetryFailedTotal,
)
}

Expand Down Expand Up @@ -98,11 +109,32 @@ func DefaultDLQRetryConfig() DLQRetryConfig {
}
}

// RetryOutcome describes the result of re-evaluating a dead-letter event in place.
type RetryOutcome struct {
// Resolved is true when re-evaluation succeeded (no error). Only resolved
// events are republished onto the source stream to generate an activity.
Resolved bool
// ErrorType classifies the failure when Resolved is false, for metrics.
ErrorType processor.ErrorType
// Err is the underlying re-evaluation error when Resolved is false.
Err error
}

// RetryEvaluator re-runs policy evaluation against a dead-letter event's original
// payload so the retry worker can tell "republished to NATS" apart from
// "successfully processed". A nil evaluator falls back to publish-ACK accounting.
type RetryEvaluator func(ctx context.Context, event *processor.DeadLetterEvent) RetryOutcome

// DLQRetryController manages automatic retry of dead-letter queue events.
type DLQRetryController struct {
js nats.JetStreamContext
config DLQRetryConfig

// evaluator re-runs policy evaluation in place before a retry is treated as
// resolved. When nil, the controller cannot observe evaluation outcomes and
// only reports that events were republished.
evaluator RetryEvaluator

auditStreamName string
eventStreamName string
dlqStreamName string
Expand All @@ -117,18 +149,21 @@ type DLQRetryController struct {
activeRetriesMu sync.Mutex
}

// NewDLQRetryController creates a new DLQ retry controller.
// NewDLQRetryController creates a new DLQ retry controller. evaluator may be nil,
// in which case retries are reported as "republished" rather than "succeeded".
func NewDLQRetryController(
js nats.JetStreamContext,
config DLQRetryConfig,
auditStreamName string,
eventStreamName string,
dlqStreamName string,
dlqSubjectPrefix string,
evaluator RetryEvaluator,
) *DLQRetryController {
return &DLQRetryController{
js: js,
config: config,
evaluator: evaluator,
auditStreamName: auditStreamName,
eventStreamName: eventStreamName,
dlqStreamName: dlqStreamName,
Expand Down Expand Up @@ -393,7 +428,21 @@ func (c *DLQRetryController) processRetryBatch(ctx context.Context, trigger stri
}
}

// Attempt to republish
// Re-evaluate the event in place before treating the retry as resolved.
// A NATS-publish ACK only means the republish was accepted, not that
// policy evaluation passed, so a re-failing event must count as a real
// failure and keep accumulating its retry count rather than resetting.
if c.evaluator != nil && dlEvent.Type == processor.EventTypeAudit {
outcome := c.evaluator(ctx, &dlEvent)
if !outcome.Resolved {
c.handleReEvaluationFailure(ctx, msg, &dlEvent, now, trigger, apiGroup, kind, outcome)
failed++
continue
}
}

// Evaluation passed (or no evaluator wired): republish so the event
// re-enters the ingest path and produces an activity.
if err := c.republishEvent(ctx, &dlEvent); err != nil {
klog.ErrorS(err, "Failed to republish DLQ event",
"eventType", dlEvent.Type,
Expand All @@ -418,23 +467,73 @@ func (c *DLQRetryController) processRetryBatch(ctx context.Context, trigger stri
continue
}

// Success - ack the DLQ message
// Republish accepted - ack the DLQ message.
if ackErr := msg.Ack(); ackErr != nil {
klog.ErrorS(ackErr, "Failed to ack successfully retried DLQ message")
}
dlqRetryAttemptsTotal.WithLabelValues(trigger, apiGroup, kind, "succeeded").Inc()
succeeded++

klog.V(2).InfoS("Successfully retried DLQ event",
// "succeeded" means evaluation passed and an activity will be generated;
// "republished" means we put the event back without observing the
// outcome (no evaluator for this event type).
result := "republished"
if c.evaluator != nil && dlEvent.Type == processor.EventTypeAudit {
result = "succeeded"
}
dlqRetryAttemptsTotal.WithLabelValues(trigger, apiGroup, kind, result).Inc()

klog.V(2).InfoS("Retried DLQ event",
"eventType", dlEvent.Type,
"policy", dlEvent.PolicyName,
"retryCount", dlEvent.RetryCount,
"result", result,
)
}

return processed, succeeded, failed
}

// handleReEvaluationFailure records a re-failing dead-letter event as a genuine
// failure: it counts the failure, advances the retry count via the DLQ metadata
// loop, and avoids republishing into the dedup window where the event would be
// silently dropped. On metadata-update failure the message is NAKed to preserve it.
func (c *DLQRetryController) handleReEvaluationFailure(
ctx context.Context,
msg *nats.Msg,
event *processor.DeadLetterEvent,
now time.Time,
trigger, apiGroup, kind string,
outcome RetryOutcome,
) {
errorType := outcome.ErrorType
if errorType == "" {
errorType = processor.ErrorTypeCELSummary
}

klog.ErrorS(outcome.Err, "DLQ event re-failed evaluation",
"eventType", event.Type,
"policy", event.PolicyName,
"errorType", errorType,
"retryCount", event.RetryCount,
)

dlqRetryAttemptsTotal.WithLabelValues(trigger, apiGroup, kind, "failed").Inc()
dlqRetryFailedTotal.WithLabelValues(apiGroup, kind, event.PolicyName, string(errorType)).Inc()

// Advance the retry count instead of republishing into ingest, so the
// failure is durably tracked and high-retry alerting can fire.
if err := c.updateAndRepublishMetadata(ctx, event, now); err != nil {
klog.ErrorS(err, "Failed to update retry metadata after re-failure, NAKing to preserve event")
if nakErr := msg.Nak(); nakErr != nil {
klog.ErrorS(nakErr, "Failed to NAK DLQ message after metadata update failure")
}
return
}
if ackErr := msg.Ack(); ackErr != nil {
klog.ErrorS(ackErr, "Failed to ack DLQ message after re-failure metadata update")
}
}

// isEligibleForRetry checks if an event's backoff has expired.
func (c *DLQRetryController) isEligibleForRetry(event *processor.DeadLetterEvent, now time.Time) bool {
// First retry is always eligible
Expand Down
58 changes: 47 additions & 11 deletions internal/activityprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ func (p *Processor) Start(ctx context.Context) error {
p.config.NATSEventStream,
dlqConfig.StreamName,
dlqConfig.SubjectPrefix,
p.reEvaluateDeadLetter,
)

p.wg.Add(1)
Expand Down Expand Up @@ -951,17 +952,7 @@ func (p *Processor) processMessage(msg *nats.Msg) error {
p.eventEmitter.EmitEvaluationError(p.ctx, policy.Name, ruleIndex, err)

// Classify error type for DLQ using sentinel errors
errorType := processor.ErrorTypeCELMatch
if ruleIndex >= 0 {
// If we have a rule index, check if it's a kind resolution or summary error
if errors.Is(err, processor.ErrKindResolution) {
errorType = processor.ErrorTypeKindResolve
} else if errors.Is(err, processor.ErrActivityBuild) {
errorType = processor.ErrorTypeKindResolve // Activity build errors are typically kind resolution
} else {
errorType = processor.ErrorTypeCELSummary
}
}
errorType := classifyEvaluationError(err, ruleIndex)

// Publish to DLQ
policyVersion := int64(0)
Expand Down Expand Up @@ -1037,6 +1028,51 @@ func (p *Processor) evaluateCompiledAuditRules(policy *CompiledPolicy, auditMap
return EvaluateCompiledAuditRules(policy, auditMap, audit, p.resourceToKind)
}

// reEvaluateDeadLetter re-runs audit policy evaluation against a dead-letter
// event's original payload so the DLQ retry worker can distinguish a genuinely
// resolved event from one that merely re-publishes and fails again. It mirrors
// the ingest evaluation path (auditToMap + EvaluateCompiledAuditRules) without
// emitting an activity; the retry worker republishes resolved events itself.
func (p *Processor) reEvaluateDeadLetter(_ context.Context, event *processor.DeadLetterEvent) RetryOutcome {
var audit auditv1.Event
if err := json.Unmarshal(event.OriginalPayload, &audit); err != nil {
return RetryOutcome{ErrorType: processor.ErrorTypeUnmarshal, Err: err}
}

auditMap, err := auditToMap(&audit)
if err != nil {
return RetryOutcome{ErrorType: processor.ErrorTypeUnmarshal, Err: err}
}

policies := p.policyCache.Get(audit.ObjectRef.APIGroup, audit.ObjectRef.Resource)
if len(policies) == 0 {
// No policy still targets this resource; nothing left to evaluate, so the
// event can leave the DLQ rather than retry forever.
return RetryOutcome{Resolved: true}
}

for _, policy := range policies {
_, ruleIndex, err := p.evaluateCompiledAuditRules(policy, auditMap, &audit)
if err != nil {
return RetryOutcome{ErrorType: classifyEvaluationError(err, ruleIndex), Err: err}
}
}

return RetryOutcome{Resolved: true}
}

// classifyEvaluationError maps an audit evaluation error to a DLQ error type,
// matching the classification used on the ingest path.
func classifyEvaluationError(err error, ruleIndex int) processor.ErrorType {
if ruleIndex < 0 {
return processor.ErrorTypeCELMatch
}
if errors.Is(err, processor.ErrKindResolution) || errors.Is(err, processor.ErrActivityBuild) {
return processor.ErrorTypeKindResolve
}
return processor.ErrorTypeCELSummary
}

// auditToMap converts an audit event to a map for CEL evaluation.
func auditToMap(audit *auditv1.Event) (map[string]any, error) {
data, err := json.Marshal(audit)
Expand Down