diff --git a/backend/internal/lifecycle/manager.go b/backend/internal/lifecycle/manager.go index eb8538d3..2581fea0 100644 --- a/backend/internal/lifecycle/manager.go +++ b/backend/internal/lifecycle/manager.go @@ -4,9 +4,9 @@ // decider, diff the result into a sparse merge-patch, persist. The LCM never // polls and never writes the display status (that is derived on read). // -// Split A scope is the Apply* pipeline only. The reaction table + escalation -// engine (ACT) and the Session Manager land in later splits; TickEscalations is -// a documented no-op here. +// After a transition is persisted, the Apply* paths fire the mapped reaction +// (the ACT layer: reaction table + escalation engine) via the react() chokepoint +// in reactions.go. The Session Manager lands in a later split. package lifecycle import ( @@ -29,8 +29,8 @@ const ( MetaAgentSessionID = "agentSessionId" ) -// Manager is the LCM. Notifier/AgentMessenger are held for the ACT lane (split -// B); the Apply* pipeline does not fire reactions yet. +// Manager is the LCM. The Apply* pipeline persists a transition and then fires +// the mapped reaction via Notifier/AgentMessenger (see reactions.go). type Manager struct { store ports.LifecycleStore notifier ports.Notifier @@ -38,6 +38,14 @@ type Manager struct { recentActivityWindow time.Duration locks keyedMutex + + // trackers hold per-(session,reaction) escalation budgets (ACT policy, not + // canonical state). trackerMu guards them: react() touches them from the + // caller's goroutine, TickEscalations from the reaper's. clock is the time + // source for escalation stamping (overridable in tests). + trackers map[trackerKey]*reactionTracker + trackerMu sync.Mutex + clock func() time.Time } var _ ports.LifecycleManager = (*Manager)(nil) @@ -48,6 +56,8 @@ func New(store ports.LifecycleStore, notifier ports.Notifier, messenger ports.Ag notifier: notifier, messenger: messenger, recentActivityWindow: defaultRecentActivityWindow, + trackers: map[trackerKey]*reactionTracker{}, + clock: time.Now, } } @@ -100,16 +110,28 @@ func (m *Manager) withLock(id domain.SessionID, fn func() error) error { return fn() } +// transition is what a persisted write produced: the canonical before and after +// the patch. The ACT layer (react) derives the reaction from these. It is nil +// when the pipeline made no write. +type transition struct { + beforeLC domain.CanonicalSessionLifecycle + afterLC domain.CanonicalSessionLifecycle +} + // mutate runs the shared pipeline: load -> build patch -> persist (only if the // patch changed something). decideFn returns the diffed patch and whether it // touches anything; a false "changed" is a clean no-op (no write, no revision // bump), which is how failed-probe / unknown-fact inputs are dropped. +// +// On a write it returns the transition (before/after canonical) so the caller — +// which still holds the originating facts — can fire the mapped reaction. func (m *Manager) mutate( ctx context.Context, id domain.SessionID, decideFn func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error), -) error { - return m.withLock(id, func() error { +) (*transition, error) { + var tr *transition + err := m.withLock(id, func() error { cur, exists, err := m.store.Load(ctx, id) if err != nil { return err @@ -121,8 +143,17 @@ func (m *Manager) mutate( if !changed { return nil } - return m.store.PatchLifecycle(ctx, id, patch) + if err := m.store.PatchLifecycle(ctx, id, patch); err != nil { + return err + } + after, _, err := m.store.Load(ctx, id) + if err != nil { + return err + } + tr = &transition{beforeLC: cur, afterLC: after} + return nil }) + return tr, err } // ---- OBSERVE entrypoints ---- @@ -132,7 +163,7 @@ func (m *Manager) mutate( // non-detecting verdict clears any stale detecting memory (#3) so the next // probe doesn't read a phantom prior. func (m *Manager) ApplyRuntimeObservation(ctx context.Context, id domain.SessionID, f ports.RuntimeFacts) error { - return m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) { + tr, err := m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) { if !exists { return ports.LifecyclePatch{}, false, nil // nothing seeded; ignore stray probe } @@ -158,6 +189,10 @@ func (m *Manager) ApplyRuntimeObservation(ctx context.Context, id domain.Session return patch, changed, nil }) + if err != nil { + return err + } + return m.react(ctx, id, tr, reactionContext{}) } // ApplySCMObservation maps PR facts onto the PR axis. A failed fetch is dropped @@ -165,7 +200,7 @@ func (m *Manager) ApplyRuntimeObservation(ctx context.Context, id domain.Session // session axis stays owned by activity, and DeriveLegacyStatus surfaces the PR // reason for display. A terminal PR (merged/closed) also parks the session. func (m *Manager) ApplySCMObservation(ctx context.Context, id domain.SessionID, f ports.SCMFacts) error { - return m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) { + tr, err := m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) { if !exists || !f.Fetched { return ports.LifecyclePatch{}, false, nil } @@ -195,6 +230,10 @@ func (m *Manager) ApplySCMObservation(ctx context.Context, id domain.SessionID, return ports.LifecyclePatch{}, false, nil } }) + if err != nil { + return err + } + return m.react(ctx, id, tr, reactionContext{ciFailureLogTail: f.CIFailureLogTail}) } // ApplyActivitySignal updates the activity axis. Only a valid-confidence signal @@ -204,7 +243,7 @@ func (m *Manager) ApplySCMObservation(ctx context.Context, id domain.SessionID, // life, so it may resolve a detecting session — clearing the quarantine memory // so a later probe doesn't resume counting from a stale prior. func (m *Manager) ApplyActivitySignal(ctx context.Context, id domain.SessionID, s ports.ActivitySignal) error { - return m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) { + tr, err := m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) { if !exists || s.State != ports.SignalValid { return ports.LifecyclePatch{}, false, nil } @@ -230,6 +269,10 @@ func (m *Manager) ApplyActivitySignal(ctx context.Context, id domain.SessionID, return patch, changed, nil }) + if err != nil { + return err + } + return m.react(ctx, id, tr, reactionContext{}) } // ---- mutation outcomes reported by the Session Manager ---- @@ -270,7 +313,9 @@ func (m *Manager) OnSpawnCompleted(ctx context.Context, id domain.SessionID, o p // the terminal session/runtime sub-states for the kill kind and clears any // in-flight detecting memory. func (m *Manager) OnKillRequested(ctx context.Context, id domain.SessionID, r ports.KillReason) error { - return m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) { + // An explicit user kill is a human action, not an inferred event, so it + // fires no reaction — the transition is discarded. + _, err := m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) { if !exists { // Killing an unknown/already-gone session is a benign race; no-op // rather than fabricating a terminal record for a session we never @@ -295,12 +340,13 @@ func (m *Manager) OnKillRequested(ctx context.Context, id domain.SessionID, r po } return patch, changed, nil }) -} - -// TickEscalations is a no-op in split A. The reaper will call this to fire -// duration-based escalations the synchronous LCM can't wake itself for, but the -// reaction table + escalation engine that back it land in split B. -func (m *Manager) TickEscalations(ctx context.Context, now time.Time) error { + if err != nil { + return err + } + // A kill is terminal but bypasses react()'s incident-over cleanup (it fires + // no reaction). Drop any escalation trackers here so a later duration-based + // TickEscalations can't emit reaction.escalated for a dead session. + m.clearSessionTrackers(id) return nil } diff --git a/backend/internal/lifecycle/manager_test.go b/backend/internal/lifecycle/manager_test.go index e8c47c0a..d0a97125 100644 --- a/backend/internal/lifecycle/manager_test.go +++ b/backend/internal/lifecycle/manager_test.go @@ -414,17 +414,6 @@ func TestOnKillRequested_UnseededIsNoOp(t *testing.T) { } } -func TestTickEscalationsIsNoOp(t *testing.T) { - mgr, store := newManager() - store.seed(sid, lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive)) - if err := mgr.TickEscalations(context.Background(), t0); err != nil { - t.Fatalf("tick: %v", err) - } - if l := mustLoad(t, store); l.Revision != 0 { - t.Errorf("TickEscalations must not write, got revision=%d", l.Revision) - } -} - // ---- fake store contract ---- func TestFakeStoreExpectedRevision(t *testing.T) { diff --git a/backend/internal/lifecycle/reactions.go b/backend/internal/lifecycle/reactions.go new file mode 100644 index 00000000..544f152f --- /dev/null +++ b/backend/internal/lifecycle/reactions.go @@ -0,0 +1,416 @@ +package lifecycle + +// reactions.go is the ACT layer: the reaction table, the per-(session,reaction) +// escalation engine, and the duration-driven TickEscalations the synchronous +// LCM can't wake itself for. Reactions fire from react() after a transition is +// persisted by the Apply* pipeline (see manager.go). +// +// Dispatch is synchronous: react() runs Send/Notify inline. It is the single +// dispatch chokepoint, so moving it onto a worker goroutine later (once a daemon +// owns that goroutine's lifecycle) is a change confined to this one function. + +import ( + "context" + "fmt" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +// reactionKey names a row in the reaction table and a tracker bucket. +type reactionKey string + +const ( + reactionCIFailed reactionKey = "ci-failed" + reactionChangesRequested reactionKey = "changes-requested" + reactionBugbotComments reactionKey = "bugbot-comments" + reactionMergeConflicts reactionKey = "merge-conflicts" + reactionAgentIdle reactionKey = "agent-idle" + reactionApprovedAndGreen reactionKey = "approved-and-green" + reactionAgentStuck reactionKey = "agent-stuck" + reactionNeedsInput reactionKey = "agent-needs-input" + reactionAgentExited reactionKey = "agent-exited" + reactionPRClosed reactionKey = "pr-closed" + reactionAllComplete reactionKey = "all-complete" +) + +type actionKind string + +const ( + actionSendToAgent actionKind = "send-to-agent" + actionNotify actionKind = "notify" + actionAutoMerge actionKind = "auto-merge" +) + +// reactionConfig is one row of the reaction table (distillation §4.1/§4.2). +// +// - retries numeric escalation cap: escalate once attempts exceed it. +// - escalateAfter duration escalation: escalate once this elapses since the +// first attempt (fired by TickEscalations, since the LCM never polls). +// - persistent the tracker survives the status leaving the triggering +// state; it only resets when the incident is truly over (PR no longer open +// or the session terminal). Only ci-failed is persistent, so a flapping +// CI (fail→pending→fail) keeps draining one shared retry budget. +type reactionConfig struct { + auto bool + action actionKind + message string + priority ports.EventPriority + eventType string + retries int + escalateAfter time.Duration + persistent bool +} + +// defaultReactions is the product's default behaviour (distillation §4.2). +// auto-merge is intentionally absent: approved-and-green is a notify, so the +// human decides to merge. The auto-merge action kind exists for opt-in configs, +// but no default row uses it. +var defaultReactions = map[reactionKey]reactionConfig{ + reactionCIFailed: { + auto: true, action: actionSendToAgent, persistent: true, retries: 2, + message: "CI is failing on your PR. Review the failing output below and push a fix.", + eventType: "reaction.ci-failed", priority: ports.PriorityAction, + }, + reactionChangesRequested: { + auto: true, action: actionSendToAgent, escalateAfter: 30 * time.Minute, + message: "A reviewer requested changes on your PR. Address the comments and push.", + eventType: "reaction.changes-requested", priority: ports.PriorityAction, + }, + reactionBugbotComments: { + auto: true, action: actionSendToAgent, escalateAfter: 30 * time.Minute, + message: "An automated reviewer left comments on your PR. Address them and push.", + eventType: "reaction.bugbot-comments", priority: ports.PriorityAction, + }, + reactionMergeConflicts: { + auto: true, action: actionSendToAgent, escalateAfter: 15 * time.Minute, + message: "Your PR has merge conflicts. Rebase onto the base branch and resolve them.", + eventType: "reaction.merge-conflicts", priority: ports.PriorityAction, + }, + reactionAgentIdle: { + auto: true, action: actionSendToAgent, retries: 2, escalateAfter: 15 * time.Minute, + message: "You appear idle. Continue the task or explain what is blocking you.", + eventType: "reaction.agent-idle", priority: ports.PriorityWarning, + }, + reactionApprovedAndGreen: { + auto: false, action: actionNotify, priority: ports.PriorityAction, + message: "PR is approved and green — ready to merge.", + eventType: "reaction.approved-and-green", + }, + reactionAgentStuck: { + // §4.2 lists a threshold: 10m here; it is intentionally not gated — entry + // into stuck is already debounced upstream by the detecting->stuck + // quarantine (DETECTING_MAX_ATTEMPTS/DURATION), so a second timer would be + // redundant. + action: actionNotify, priority: ports.PriorityUrgent, + message: "Agent is stuck and needs attention.", + eventType: "reaction.agent-stuck", + }, + reactionNeedsInput: { + action: actionNotify, priority: ports.PriorityUrgent, + message: "Agent needs input to continue.", + eventType: "reaction.agent-needs-input", + }, + reactionAgentExited: { + action: actionNotify, priority: ports.PriorityUrgent, + message: "Agent process exited unexpectedly.", + eventType: "reaction.agent-exited", + }, + reactionPRClosed: { + action: actionNotify, priority: ports.PriorityAction, + message: "PR was closed without merging — decide: resume, learn, or terminate.", + eventType: "reaction.pr-closed", + }, + reactionAllComplete: { + action: actionNotify, priority: ports.PriorityInfo, + message: "PR merged — work complete.", + eventType: "reaction.all-complete", + }, +} + +// reactionEventFor maps a canonical record to the reaction it should drive, +// mirroring DeriveLegacyStatus but for the ACT layer. ok is false when the +// current state has no reaction. +// +// A closed PR derives to the idle display status, so it is detected from the PR +// axis directly before falling through to the status mapping. bugbot-comments +// and merge-conflicts have no producer in the split-A decide core yet, so they +// are dormant: configured but unreachable until DECIDE surfaces them. +func reactionEventFor(l domain.CanonicalSessionLifecycle) (reactionKey, bool) { + if l.PR.State == domain.PRClosed { + return reactionPRClosed, true + } + switch domain.DeriveLegacyStatus(l) { + case domain.StatusCIFailed: + return reactionCIFailed, true + case domain.StatusChangesRequested: + return reactionChangesRequested, true + case domain.StatusApproved, domain.StatusMergeable: + return reactionApprovedAndGreen, true + case domain.StatusIdle: + return reactionAgentIdle, true + case domain.StatusStuck: + return reactionAgentStuck, true + case domain.StatusNeedsInput: + return reactionNeedsInput, true + case domain.StatusKilled: + // Inferred death only — an explicit user kill goes through + // OnKillRequested, which does not react. + return reactionAgentExited, true + case domain.StatusMerged: + return reactionAllComplete, true + } + return "", false +} + +// reactionContext carries fact-derived material the message templates need. The +// SCM path populates it (CI failure log tail); other paths pass the zero value. +type reactionContext struct { + ciFailureLogTail *string +} + +// trackerKey buckets an escalation tracker by session and reaction. +type trackerKey struct { + id domain.SessionID + key reactionKey +} + +// reactionTracker is the per-(session,reaction) escalation budget. It lives in +// memory on the Manager: a daemon restart resets budgets, which only ever costs +// a few extra agent retries before re-escalating — never a missed human +// notification. Keeping it out of the canonical store preserves the +// truth-vs-policy split (the store holds session truth; this is ACT policy). +type reactionTracker struct { + attempts int + escalated bool + firstAttemptAt time.Time +} + +// react fires the ACT layer after a persisted transition: clear the tracker for +// the reaction we left, then dispatch the reaction for the one we entered. It +// fires only on a genuine reaction change, so re-persisting the same state does +// not re-dispatch. Synchronous by design (see file header). +// +// Integration-time caveat: react runs AFTER withLock releases (deliberately, so +// a busy-waiting send-to-agent never holds the per-session mutex). Under a live +// daemon with concurrent observers (SCM poller + reaper + activity ingest) the +// afterLC snapshot can be stale by dispatch time — e.g. a ci-failed send firing +// after the session already moved to approved. Tests are single-threaded so it +// is not observable yet; when the daemon lands, give react a per-session +// ordering (a small react queue) or re-check the triggering state before +// dispatching. +func (m *Manager) react(ctx context.Context, id domain.SessionID, tr *transition, rc reactionContext) error { + if tr == nil { + return nil + } + beforeKey, hadBefore := reactionEventFor(tr.beforeLC) + afterKey, hasAfter := reactionEventFor(tr.afterLC) + + changed := beforeKey != afterKey + + switch { + case incidentOver(tr.afterLC) || recovered(tr.afterLC): + // The PR-pipeline incident has ended — the PR resolved (merged/closed), + // the session went terminal, or it reached an approved/green state. Every + // tracker for this session is now stale, including a persistent ci-failed + // one. This is keyed on the state REACHED, not the one left: the recovery + // transition is typically review_pending->approved (beforeKey empty), so + // clearing only beforeKey would leak the ci-failed tracker and leave its + // escalated=true to silence a future regression. Clear them all. + m.clearSessionTrackers(id) + case hadBefore && (!hasAfter || changed): + // Within an unresolved open PR: a normal tracker resets when its state is + // left. A persistent one (ci-failed) is NOT cleared here — it must survive + // the ambiguous review_pending limbo (the fail->pending->fail flap, §4.2); + // it only resets via the recovery/incident-over branch above. + if !defaultReactions[beforeKey].persistent { + m.clearTracker(id, beforeKey) + } + } + + if hasAfter && (!hadBefore || changed) { + return m.executeReaction(ctx, id, afterKey, rc) + } + return nil +} + +// incidentOver reports that a PR-pipeline incident has truly ended (PR no longer +// open, or the session terminal), so all trackers for the session may reset. +func incidentOver(l domain.CanonicalSessionLifecycle) bool { + return l.PR.State != domain.PROpen || isTerminal(l.Session.State) +} + +// recovered reports a genuinely-green open PR: an approved/mergeable state, which +// unambiguously means CI is no longer failing (the open-PR ladder ranks ci_failing +// above approved, so an approved display cannot coexist with failing CI). Unlike +// the ambiguous review_pending state — which may just be CI re-running — reaching +// this ends a ci-failed incident and re-arms its budget. +func recovered(l domain.CanonicalSessionLifecycle) bool { + if l.PR.State != domain.PROpen { + return false + } + switch l.PR.Reason { + case domain.PRReasonApproved, domain.PRReasonMergeReady: + return true + default: + return false + } +} + +func (m *Manager) executeReaction(ctx context.Context, id domain.SessionID, key reactionKey, rc reactionContext) error { + cfg := defaultReactions[key] + switch cfg.action { + case actionNotify: + // notify reactions are human-attention terminals: fire once on the + // triggering transition, no retry/escalation budget. + return m.notifier.Notify(ctx, ports.OrchestratorEvent{ + Type: cfg.eventType, + Priority: cfg.priority, + SessionID: id, + Message: cfg.message, + }) + case actionAutoMerge: + // Off by default: no default row maps here, and wiring a merge port is a + // later PR. An opt-in config could route a reaction here. + return nil + case actionSendToAgent: + return m.sendToAgent(ctx, id, key, cfg, rc) + } + return nil +} + +// sendToAgent runs the escalation engine for an auto send-to-agent reaction: +// count the attempt, escalate when the numeric cap or duration is exceeded +// (silencing further auto-dispatch), else inject the message via the messenger. +func (m *Manager) sendToAgent(ctx context.Context, id domain.SessionID, key reactionKey, cfg reactionConfig, rc reactionContext) error { + m.trackerMu.Lock() + tk := m.trackerFor(id, key) + if tk.escalated { + m.trackerMu.Unlock() + return nil // silenced until the condition clears the tracker + } + now := m.clock() + freshFirst := tk.firstAttemptAt.IsZero() + if freshFirst { + tk.firstAttemptAt = now + } + tk.attempts++ + if shouldEscalate(tk, cfg, now) { + tk.escalated = true + m.trackerMu.Unlock() + return m.escalate(ctx, id, key) + } + m.trackerMu.Unlock() + + if err := m.messenger.Send(ctx, id, composeMessage(cfg, rc)); err != nil { + // A delivery failure must not consume escalation budget: roll this + // attempt back so the next relevant transition retries from the same + // point rather than marching toward escalation on undelivered messages + // (distillation §4.3). + m.trackerMu.Lock() + tk.attempts-- + if freshFirst { + tk.firstAttemptAt = time.Time{} + } + m.trackerMu.Unlock() + return err + } + return nil +} + +// shouldEscalate uses inclusive boundaries: escalate once the numeric cap is +// exceeded or once exactly escalateAfter has elapsed (don't wait for the next +// tick to cross a strict threshold). +func shouldEscalate(tk *reactionTracker, cfg reactionConfig, now time.Time) bool { + if cfg.retries > 0 && tk.attempts > cfg.retries { + return true + } + if cfg.escalateAfter > 0 && !tk.firstAttemptAt.IsZero() && now.Sub(tk.firstAttemptAt) >= cfg.escalateAfter { + return true + } + return false +} + +// escalate emits reaction.escalated and notifies the human. The caller has +// already set tracker.escalated under the lock, which silences further +// auto-dispatch for this reaction until the tracker clears. +func (m *Manager) escalate(ctx context.Context, id domain.SessionID, key reactionKey) error { + return m.notifier.Notify(ctx, ports.OrchestratorEvent{ + Type: "reaction.escalated", + Priority: ports.PriorityUrgent, + SessionID: id, + Message: fmt.Sprintf("auto-handling of %q is exhausted and needs a human.", key), + Data: map[string]any{"reaction": string(key)}, + }) +} + +func composeMessage(cfg reactionConfig, rc reactionContext) string { + if rc.ciFailureLogTail != nil && *rc.ciFailureLogTail != "" { + return cfg.message + "\n\nFailing output:\n" + *rc.ciFailureLogTail + } + return cfg.message +} + +// trackerFor returns the tracker for (id,key), creating it on first use. The +// caller must hold trackerMu. +func (m *Manager) trackerFor(id domain.SessionID, key reactionKey) *reactionTracker { + k := trackerKey{id: id, key: key} + tk := m.trackers[k] + if tk == nil { + tk = &reactionTracker{} + m.trackers[k] = tk + } + return tk +} + +func (m *Manager) clearTracker(id domain.SessionID, key reactionKey) { + m.trackerMu.Lock() + delete(m.trackers, trackerKey{id: id, key: key}) + m.trackerMu.Unlock() +} + +// clearSessionTrackers drops every tracker for a session — used when its +// incident is over, so no budget (and no stale escalated=true) survives into a +// later unrelated incident. +func (m *Manager) clearSessionTrackers(id domain.SessionID) { + m.trackerMu.Lock() + for k := range m.trackers { + if k.id == id { + delete(m.trackers, k) + } + } + m.trackerMu.Unlock() +} + +// TickEscalations fires the duration-based escalations the synchronous LCM +// cannot wake itself for. The reaper calls it on a timer; it escalates any +// not-yet-escalated tracker whose escalateAfter has elapsed. Notifications are +// sent outside the lock so agent/notifier latency never blocks tracker access. +func (m *Manager) TickEscalations(ctx context.Context, now time.Time) error { + type due struct { + id domain.SessionID + key reactionKey + } + var fire []due + + m.trackerMu.Lock() + for k, tk := range m.trackers { + if tk.escalated { + continue + } + cfg := defaultReactions[k.key] + if cfg.escalateAfter > 0 && !tk.firstAttemptAt.IsZero() && now.Sub(tk.firstAttemptAt) >= cfg.escalateAfter { + tk.escalated = true + fire = append(fire, due{id: k.id, key: k.key}) + } + } + m.trackerMu.Unlock() + + for _, d := range fire { + if err := m.escalate(ctx, d.id, d.key); err != nil { + return err + } + } + return nil +} diff --git a/backend/internal/lifecycle/reactions_test.go b/backend/internal/lifecycle/reactions_test.go new file mode 100644 index 00000000..e90e8881 --- /dev/null +++ b/backend/internal/lifecycle/reactions_test.go @@ -0,0 +1,416 @@ +package lifecycle + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +// failingMessenger always fails delivery, counting attempts — used to assert a +// send failure does not consume escalation budget. +type failingMessenger struct{ attempts int } + +func (f *failingMessenger) Send(_ context.Context, _ domain.SessionID, _ string) error { + f.attempts++ + return fmt.Errorf("messenger unavailable") +} + +// newReactive wires a Manager with handles on the recording fakes so reaction +// tests can assert what was sent/notified. clock is pinned to t0 for +// deterministic escalation stamping. +func newReactive() (*Manager, *fakeStore, *recordingNotifier, *recordingMessenger) { + store := newFakeStore() + notf := &recordingNotifier{} + msgr := &recordingMessenger{} + m := New(store, notf, msgr) + m.clock = func() time.Time { return t0 } + return m, store, notf, msgr +} + +func lcOpenPR(reason domain.PRReason) domain.CanonicalSessionLifecycle { + l := lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive) + l.PR = domain.PRSubstate{State: domain.PROpen, Reason: reason, Number: 7} + return l +} + +func notifyCount(n *recordingNotifier, eventType string) int { + n.mu.Lock() + defer n.mu.Unlock() + c := 0 + for _, e := range n.events { + if e.Type == eventType { + c++ + } + } + return c +} + +func ctx() context.Context { return context.Background() } + +// ---- right reaction per transition ---- + +func TestReaction_CIFailedSendsToAgentWithLogTail(t *testing.T) { + m, store, notf, msgr := newReactive() + store.seed(sid, lcOpenPR(domain.PRReasonReviewPending)) + + tail := "build failed\nundefined: foo" + err := m.ApplySCMObservation(ctx(), sid, ports.SCMFacts{ + Fetched: true, PRState: domain.PROpen, CISummary: ports.CIFailing, + PRNumber: 7, CIFailureLogTail: &tail, + }) + if err != nil { + t.Fatalf("apply: %v", err) + } + + if len(msgr.sent) != 1 { + t.Fatalf("want 1 send, got %d", len(msgr.sent)) + } + if got := msgr.sent[0].Message; !strings.Contains(got, "CI is failing") || !strings.Contains(got, tail) { + t.Errorf("message missing base text or log tail: %q", got) + } + if notifyCount(notf, "reaction.escalated") != 0 { + t.Error("a first failure must not escalate") + } +} + +func TestReaction_ApprovedAndGreenNotifiesNeverAutoMerges(t *testing.T) { + m, store, notf, msgr := newReactive() + store.seed(sid, lcOpenPR(domain.PRReasonReviewPending)) + + err := m.ApplySCMObservation(ctx(), sid, ports.SCMFacts{ + Fetched: true, PRState: domain.PROpen, ReviewDecision: ports.ReviewApproved, + Mergeability: ports.Mergeability{Mergeable: true}, PRNumber: 7, + }) + if err != nil { + t.Fatalf("apply: %v", err) + } + + // approved-and-green is notify (human decides to merge); the agent is never + // messaged and no auto-merge fires. + if len(msgr.sent) != 0 { + t.Errorf("approved-and-green must not message the agent, got %d sends", len(msgr.sent)) + } + if notifyCount(notf, "reaction.approved-and-green") != 1 { + t.Errorf("want one approved-and-green notify, got events %+v", notf.events) + } +} + +func TestReaction_NotifyEventsForHardStates(t *testing.T) { + tests := []struct { + name string + apply func(m *Manager) + eventType string + }{ + { + name: "waiting_input -> agent-needs-input", + apply: func(m *Manager) { applyActivity(m, domain.ActivityWaitingInput) }, + eventType: "reaction.agent-needs-input", + }, + { + name: "blocked -> agent-stuck", + apply: func(m *Manager) { applyActivity(m, domain.ActivityBlocked) }, + eventType: "reaction.agent-stuck", + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + m, store, notf, msgr := newReactive() + store.seed(sid, lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive)) + tc.apply(m) + if notifyCount(notf, tc.eventType) != 1 { + t.Errorf("want one %s, got events %+v", tc.eventType, notf.events) + } + if len(msgr.sent) != 0 { + t.Errorf("notify reaction must not message the agent, got %d", len(msgr.sent)) + } + }) + } +} + +func TestReaction_InferredDeathNotifiesAgentExited(t *testing.T) { + m, store, notf, _ := newReactive() + store.seed(sid, detectingLC()) + + err := m.ApplyRuntimeObservation(ctx(), sid, ports.RuntimeFacts{ + RuntimeState: ports.RuntimeProbeDead, ProcessState: ports.ProcessProbeDead, ObservedAt: t0, + }) + if err != nil { + t.Fatalf("apply: %v", err) + } + if l := mustLoad(t, store); domain.DeriveLegacyStatus(l) != domain.StatusKilled { + t.Fatalf("precondition: want killed, got %s", domain.DeriveLegacyStatus(l)) + } + if notifyCount(notf, "reaction.agent-exited") != 1 { + t.Errorf("want one agent-exited, got events %+v", notf.events) + } +} + +func TestReaction_PRClosedAndMerged(t *testing.T) { + tests := []struct { + name string + prState domain.PRState + eventType string + }{ + {"closed -> pr-closed", domain.PRClosed, "reaction.pr-closed"}, + {"merged -> all-complete", domain.PRMerged, "reaction.all-complete"}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + m, store, notf, _ := newReactive() + store.seed(sid, lcOpenPR(domain.PRReasonReviewPending)) + err := m.ApplySCMObservation(ctx(), sid, ports.SCMFacts{ + Fetched: true, PRState: tc.prState, PRNumber: 7, + }) + if err != nil { + t.Fatalf("apply: %v", err) + } + if notifyCount(notf, tc.eventType) != 1 { + t.Errorf("want one %s, got events %+v", tc.eventType, notf.events) + } + }) + } +} + +func TestReaction_OnKillRequestedDoesNotReact(t *testing.T) { + m, store, notf, msgr := newReactive() + store.seed(sid, lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive)) + + if err := m.OnKillRequested(ctx(), sid, ports.KillReason{Kind: ports.KillManual}); err != nil { + t.Fatalf("kill: %v", err) + } + // An explicit human kill is not an inferred event: no agent-exited, no send. + if len(notf.events) != 0 || len(msgr.sent) != 0 { + t.Errorf("explicit kill must fire no reaction: notifies=%+v sends=%+v", notf.events, msgr.sent) + } +} + +// ---- escalation engine ---- + +func TestReaction_CIFailedNumericEscalation(t *testing.T) { + m, store, notf, msgr := newReactive() + store.seed(sid, lcOpenPR(domain.PRReasonReviewPending)) + + // ci-failed has retries 2 and is persistent, so the budget is shared across + // fail->pending->fail oscillations and escalates on the third failure. + failN := 4 + for i := 0; i < failN; i++ { + failCI(t, m) + pendingCI(t, m) // oscillate out (persistent tracker must NOT reset) + } + + if len(msgr.sent) != 2 { + t.Errorf("want 2 auto-sends before escalation, got %d", len(msgr.sent)) + } + if c := notifyCount(notf, "reaction.escalated"); c != 1 { + t.Errorf("want exactly one escalation, got %d", c) + } +} + +func TestReaction_DurationEscalationFiresOnTick(t *testing.T) { + m, store, notf, msgr := newReactive() + store.seed(sid, lcOpenPR(domain.PRReasonReviewPending)) + + // changes-requested: send once now, then escalate by duration (30m) — which + // only the reaper's TickEscalations can fire (the LCM never polls). + err := m.ApplySCMObservation(ctx(), sid, ports.SCMFacts{ + Fetched: true, PRState: domain.PROpen, ReviewDecision: ports.ReviewChangesRequested, PRNumber: 7, + }) + if err != nil { + t.Fatalf("apply: %v", err) + } + if len(msgr.sent) != 1 { + t.Fatalf("want one send on transition, got %d", len(msgr.sent)) + } + + if err := m.TickEscalations(ctx(), t0.Add(10*time.Minute)); err != nil { + t.Fatalf("tick: %v", err) + } + if notifyCount(notf, "reaction.escalated") != 0 { + t.Error("must not escalate before escalateAfter elapses") + } + + // Inclusive boundary: escalate at exactly escalateAfter (30m), not only past it. + if err := m.TickEscalations(ctx(), t0.Add(30*time.Minute)); err != nil { + t.Fatalf("tick: %v", err) + } + if notifyCount(notf, "reaction.escalated") != 1 { + t.Errorf("want one duration escalation at exactly 30m, got events %+v", notf.events) + } +} + +func TestReaction_KillClearsEscalationTrackers(t *testing.T) { + m, store, notf, _ := newReactive() + store.seed(sid, lcOpenPR(domain.PRReasonReviewPending)) + + // changes-requested creates a duration-based tracker. + if err := m.ApplySCMObservation(ctx(), sid, ports.SCMFacts{ + Fetched: true, PRState: domain.PROpen, ReviewDecision: ports.ReviewChangesRequested, PRNumber: 7, + }); err != nil { + t.Fatalf("apply: %v", err) + } + if sessionTrackerCount(m, sid) == 0 { + t.Fatalf("precondition: expected a tracker") + } + + if err := m.OnKillRequested(ctx(), sid, ports.KillReason{Kind: ports.KillManual}); err != nil { + t.Fatalf("kill: %v", err) + } + if n := sessionTrackerCount(m, sid); n != 0 { + t.Errorf("kill must clear trackers, %d left", n) + } + // A later duration tick must not escalate a dead session. + if err := m.TickEscalations(ctx(), t0.Add(time.Hour)); err != nil { + t.Fatalf("tick: %v", err) + } + if c := notifyCount(notf, "reaction.escalated"); c != 0 { + t.Errorf("killed session must not escalate, got %d", c) + } +} + +func TestReaction_SendFailureDoesNotBurnBudget(t *testing.T) { + store := newFakeStore() + notf := &recordingNotifier{} + fm := &failingMessenger{} + m := New(store, notf, fm) + m.clock = func() time.Time { return t0 } + store.seed(sid, lcOpenPR(domain.PRReasonReviewPending)) + + tail := "fail" + failing := ports.SCMFacts{Fetched: true, PRState: domain.PROpen, CISummary: ports.CIFailing, PRNumber: 7, CIFailureLogTail: &tail} + pending := ports.SCMFacts{Fetched: true, PRState: domain.PROpen, CISummary: ports.CIPending, ReviewDecision: ports.ReviewPending, PRNumber: 7} + + // ci-failed has retries 2; with every delivery failing, the budget is rolled + // back each time, so even 5 failures never escalate. + for i := 0; i < 5; i++ { + _ = m.ApplySCMObservation(ctx(), sid, failing) // returns the delivery error + _ = m.ApplySCMObservation(ctx(), sid, pending) + } + if fm.attempts < 5 { + t.Errorf("expected at least 5 send attempts, got %d", fm.attempts) + } + if c := notifyCount(notf, "reaction.escalated"); c != 0 { + t.Errorf("undelivered messages must not escalate, got %d", c) + } +} + +func TestReaction_NonPersistentTrackerClearsOnLeave(t *testing.T) { + m, store, _, msgr := newReactive() + store.seed(sid, lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive)) + + // agent-idle has retries 2 but is NOT persistent: leaving idle clears the + // tracker, so three idle incidents each send fresh and none escalate. + for i := 0; i < 3; i++ { + applyActivity(m, domain.ActivityIdle) + applyActivity(m, domain.ActivityActive) + } + if len(msgr.sent) != 3 { + t.Errorf("want 3 idle sends (budget reset each incident), got %d", len(msgr.sent)) + } +} + +func TestReaction_CIFailedRearmsOnGenuineRecovery(t *testing.T) { + m, store, notf, msgr := newReactive() + store.seed(sid, lcOpenPR(domain.PRReasonReviewPending)) + + // Drain the ci-failed budget to escalation (silenced thereafter). + for i := 0; i < 4; i++ { + failCI(t, m) + pendingCI(t, m) + } + if notifyCount(notf, "reaction.escalated") != 1 { + t.Fatalf("precondition: want one escalation, got %d", notifyCount(notf, "reaction.escalated")) + } + sentBefore := len(msgr.sent) + + // A genuine recovery (approved + green) ends the incident and re-arms the + // budget; a later regression must re-nudge the agent, not stay silenced. + if err := m.ApplySCMObservation(ctx(), sid, ports.SCMFacts{ + Fetched: true, PRState: domain.PROpen, ReviewDecision: ports.ReviewApproved, + Mergeability: ports.Mergeability{Mergeable: true}, PRNumber: 7, + }); err != nil { + t.Fatalf("recover: %v", err) + } + failCI(t, m) + + if len(msgr.sent) != sentBefore+1 { + t.Errorf("regression after recovery must re-nudge the agent: sends %d -> %d", sentBefore, len(msgr.sent)) + } +} + +func TestReaction_IncidentOverClearsAllSessionTrackers(t *testing.T) { + m, store, _, _ := newReactive() + store.seed(sid, lcOpenPR(domain.PRReasonReviewPending)) + + failCI(t, m) // creates a persistent ci-failed tracker + if sessionTrackerCount(m, sid) == 0 { + t.Fatalf("precondition: expected a ci-failed tracker") + } + + // Merging ends the incident; no tracker (and no stale escalated=true) may + // survive for the session. + if err := m.ApplySCMObservation(ctx(), sid, ports.SCMFacts{ + Fetched: true, PRState: domain.PRMerged, PRNumber: 7, + }); err != nil { + t.Fatalf("merge: %v", err) + } + if n := sessionTrackerCount(m, sid); n != 0 { + t.Errorf("incident over must clear all trackers, %d left", n) + } +} + +func sessionTrackerCount(m *Manager, id domain.SessionID) int { + m.trackerMu.Lock() + defer m.trackerMu.Unlock() + c := 0 + for k := range m.trackers { + if k.id == id { + c++ + } + } + return c +} + +// ---- TickEscalations never writes canonical state ---- + +func TestTickEscalations_DoesNotPersist(t *testing.T) { + m, store, _, _ := newReactive() + store.seed(sid, lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive)) + if err := m.TickEscalations(ctx(), t0); err != nil { + t.Fatalf("tick: %v", err) + } + if l := mustLoad(t, store); l.Revision != 0 { + t.Errorf("TickEscalations must not write canonical state, got revision=%d", l.Revision) + } +} + +// ---- helpers ---- + +func applyActivity(m *Manager, a domain.ActivityState) { + _ = m.ApplyActivitySignal(ctx(), sid, ports.ActivitySignal{ + State: ports.SignalValid, Activity: a, Timestamp: t0, Source: domain.SourceHook, + }) +} + +func failCI(t *testing.T, m *Manager) { + t.Helper() + tail := "fail" + if err := m.ApplySCMObservation(ctx(), sid, ports.SCMFacts{ + Fetched: true, PRState: domain.PROpen, CISummary: ports.CIFailing, PRNumber: 7, CIFailureLogTail: &tail, + }); err != nil { + t.Fatalf("failCI: %v", err) + } +} + +func pendingCI(t *testing.T, m *Manager) { + t.Helper() + if err := m.ApplySCMObservation(ctx(), sid, ports.SCMFacts{ + Fetched: true, PRState: domain.PROpen, CISummary: ports.CIPending, ReviewDecision: ports.ReviewPending, PRNumber: 7, + }); err != nil { + t.Fatalf("pendingCI: %v", err) + } +}