From 1420bb9493d5962ded7fc78b639497a495c487c8 Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Wed, 27 May 2026 01:26:15 +0530 Subject: [PATCH 1/3] feat(lifecycle): implement LCM Apply* pipeline (split A) Implements ports.LifecycleManager as a synchronous observe->decide->persist reducer. Every entrypoint runs the shared pipeline under a per-session lock: load canonical -> run the matching pure decider -> diff into a sparse merge-patch -> persist. Never polls, never writes the display status. - ApplyRuntimeObservation -> probe decider; always writes the runtime axis. - ApplySCMObservation -> open-PR / terminal-PR deciders (failed fetch is a no-op: failed probe != "no PR"). Open PRs write only the PR axis. - ApplyActivitySignal -> updates the activity axis + maps onto the session axis; only valid-confidence signals are authoritative. - OnSpawnCompleted -> runtime alive + handles to metadata; session stays not_started (display: spawning). - OnKillRequested -> SM's explicit terminal-write authority. - TickEscalations -> no-op stub (reaction/escalation engine is split B). Composition rule (#1): liveness owns the runtime + death axis; activity owns the working/idle/waiting axis. A healthy probe verdict writes the session axis only to recover a liveness-owned state, so it never clobbers an activity-owned needs_input/blocked. Activity is the mirror: it stays off the death axis. Detecting clear (#3): a non-detecting probe verdict clears stale detecting memory so the next probe reads no phantom Prior. Built/tested against in-memory fakes (LifecycleStore with full merge-patch + ExpectedRevision, recording Notifier/AgentMessenger). Per-session serialisation verified under -race. Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/internal/lifecycle/decide_bridge.go | 204 ++++++++++ backend/internal/lifecycle/fakes_test.go | 161 ++++++++ backend/internal/lifecycle/manager.go | 327 +++++++++++++++++ backend/internal/lifecycle/manager_test.go | 388 ++++++++++++++++++++ 4 files changed, 1080 insertions(+) create mode 100644 backend/internal/lifecycle/decide_bridge.go create mode 100644 backend/internal/lifecycle/fakes_test.go create mode 100644 backend/internal/lifecycle/manager.go create mode 100644 backend/internal/lifecycle/manager_test.go diff --git a/backend/internal/lifecycle/decide_bridge.go b/backend/internal/lifecycle/decide_bridge.go new file mode 100644 index 00000000..059ac2eb --- /dev/null +++ b/backend/internal/lifecycle/decide_bridge.go @@ -0,0 +1,204 @@ +package lifecycle + +import ( + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/domain/decide" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +// defaultRecentActivityWindow is how fresh the last activity signal must be for +// the probe decider to treat the agent as "recently active" (which keeps an +// ambiguous dead-runtime probe in detecting instead of concluding death). +const defaultRecentActivityWindow = 60 * time.Second + +// ---- fact translation: ports DTOs -> pure decide inputs ---- + +// runtimeFactsToProbeInput maps a raw RuntimeFacts (plus the prior detecting +// memory and last-known activity read back from canonical) into the probe +// decider's input. KillRequested is always false here: the inferred-death path +// never carries an explicit kill — that arrives via OnKillRequested. +func runtimeFactsToProbeInput(f ports.RuntimeFacts, cur domain.CanonicalSessionLifecycle, window time.Duration) decide.ProbeInput { + rt, rtFailed := runtimeProbeToState(f.RuntimeState) + proc, procFailed := processProbeToLiveness(f.ProcessState) + now := nowOr(f.ObservedAt) + return decide.ProbeInput{ + Runtime: rt, + RuntimeFailed: rtFailed, + Process: proc, + ProcessFailed: procFailed, + RecentActivity: hasRecentActivity(cur.Activity, now, window), + Prior: cur.Detecting, + Now: now, + } +} + +func runtimeProbeToState(p ports.RuntimeProbe) (domain.RuntimeState, bool) { + switch p { + case ports.RuntimeProbeAlive: + return domain.RuntimeAlive, false + case ports.RuntimeProbeDead: + return domain.RuntimeExited, false + case ports.RuntimeProbeFailed: + return domain.RuntimeProbeFailed, true + default: // indeterminate / unset: ambiguous, never a death conclusion + return domain.RuntimeUnknown, false + } +} + +func processProbeToLiveness(p ports.ProcessProbe) (decide.ProcessLiveness, bool) { + switch p { + case ports.ProcessProbeAlive: + return decide.ProcessAlive, false + case ports.ProcessProbeDead: + return decide.ProcessDead, false + case ports.ProcessProbeFailed: + return decide.ProcessIndeterminate, true + default: // indeterminate / unset + return decide.ProcessIndeterminate, false + } +} + +// runtimeSubstateFromFacts derives the runtime sub-state to persist. Liveness +// always owns this axis, so it is written on every runtime observation +// regardless of what the session axis does. +func runtimeSubstateFromFacts(f ports.RuntimeFacts) domain.RuntimeSubstate { + switch f.RuntimeState { + case ports.RuntimeProbeAlive: + return domain.RuntimeSubstate{State: domain.RuntimeAlive, Reason: domain.RuntimeReasonProcessRunning} + case ports.RuntimeProbeDead: + return domain.RuntimeSubstate{State: domain.RuntimeExited, Reason: domain.RuntimeReasonTmuxMissing} + case ports.RuntimeProbeFailed: + return domain.RuntimeSubstate{State: domain.RuntimeProbeFailed, Reason: domain.RuntimeReasonProbeError} + default: + return domain.RuntimeSubstate{State: domain.RuntimeUnknown, Reason: domain.RuntimeReasonProbeError} + } +} + +// hasRecentActivity answers the probe decider's "was the agent heard from +// recently?" question. Sticky states (waiting_input/blocked) count as recent +// because they mean a live-but-paused agent; an explicit exited signal never +// counts; otherwise we age the last-activity timestamp against the window. +func hasRecentActivity(a domain.ActivitySubstate, now time.Time, window time.Duration) bool { + if a.State == domain.ActivityExited { + return false + } + if a.State.IsSticky() { + return true + } + if a.LastActivityAt.IsZero() { + return false + } + return now.Sub(a.LastActivityAt) <= window +} + +// openPRInput maps SCM facts onto the open-PR ladder. IdleBeyond is always false +// in split A — the idle-duration signal is owned by the escalation engine +// (split B); the synchronous LCM has no clock of its own here. +func openPRInput(f ports.SCMFacts) decide.OpenPRInput { + return decide.OpenPRInput{ + CIFailing: f.CISummary == ports.CIFailing, + ChangesRequested: f.ReviewDecision == ports.ReviewChangesRequested, + Approved: f.ReviewDecision == ports.ReviewApproved, + Mergeable: f.Mergeability.Mergeable, + ReviewPending: f.ReviewDecision == ports.ReviewPending, + Number: f.PRNumber, + URL: f.PRURL, + } +} + +// ---- activity -> session axis mapping (activity owns working/idle/waiting) ---- + +// activityToSession maps an activity classification onto the session sub-state. +// exited returns ok=false: an exit signal must NOT write a terminal session +// state — only the probe pipeline (via detecting) may conclude inferred death. +func activityToSession(a domain.ActivityState) (domain.SessionState, domain.SessionReason, bool) { + switch a { + case domain.ActivityActive: + return domain.SessionWorking, domain.ReasonTaskInProgress, true + case domain.ActivityReady, domain.ActivityIdle: + return domain.SessionIdle, domain.ReasonResearchComplete, true + case domain.ActivityWaitingInput: + return domain.SessionNeedsInput, domain.ReasonAwaitingUserInput, true + case domain.ActivityBlocked: + return domain.SessionStuck, domain.ReasonAwaitingUserInput, true + default: // exited / unset + return "", "", false + } +} + +// ---- composition predicates: who may write the session axis ---- + +// isTerminal reports a final session state that must not be resurrected by an +// observation (only an explicit Restore reopens a terminal session). +func isTerminal(s domain.SessionState) bool { + return s == domain.SessionDone || s == domain.SessionTerminated +} + +// isLivenessOwned reports whether the current session sub-state was set by the +// liveness/death axis (the probe pipeline) and may therefore be recovered by a +// later healthy probe. detecting is always liveness-owned; a stuck/terminated +// state is liveness-owned only when its reason came from a death inference. +func isLivenessOwned(s domain.SessionSubstate) bool { + if s.State == domain.SessionDetecting { + return true + } + switch s.Reason { + case domain.ReasonRuntimeLost, domain.ReasonAgentProcessExited, domain.ReasonProbeFailure: + return true + } + return false +} + +// shouldWriteSessionRuntime is the #1 composition rule for ApplyRuntimeObservation. +// A death-axis verdict (detecting/stuck/terminal) always writes — it overrides +// activity because a (maybe) dead agent can't be working/waiting. A healthy +// "working" verdict only writes when it is recovering a liveness-owned state +// (e.g. detecting -> working); it must NOT clobber an activity-owned +// needs_input/blocked/idle the activity axis is responsible for. +func shouldWriteSessionRuntime(d decide.LifecycleDecision, cur domain.CanonicalSessionLifecycle) bool { + if d.SessionState == domain.SessionWorking { + return !isTerminal(cur.Session.State) && isLivenessOwned(cur.Session) + } + return true +} + +// shouldWriteSessionActivity is the mirror rule for ApplyActivitySignal: the +// activity axis owns working/idle/waiting, but it must not touch the death axis. +// It writes unless the session is terminal or currently liveness-owned (let the +// probe pipeline resolve detecting / death-inferred states instead). +func shouldWriteSessionActivity(cur domain.CanonicalSessionLifecycle) bool { + return !isTerminal(cur.Session.State) && !isLivenessOwned(cur.Session) +} + +// ---- explicit-kill mapping (SM's terminal-write authority) ---- + +func killSession(k ports.LifecycleKillReason) domain.SessionSubstate { + switch k { + case ports.KillManual: + return domain.SessionSubstate{State: domain.SessionTerminated, Reason: domain.ReasonManuallyKilled} + case ports.KillCleanup: + return domain.SessionSubstate{State: domain.SessionTerminated, Reason: domain.ReasonAutoCleanup} + default: // error + return domain.SessionSubstate{State: domain.SessionTerminated, Reason: domain.ReasonErrorInProcess} + } +} + +func killRuntime(k ports.LifecycleKillReason) domain.RuntimeSubstate { + switch k { + case ports.KillManual: + return domain.RuntimeSubstate{State: domain.RuntimeExited, Reason: domain.RuntimeReasonManualKillRequested} + case ports.KillCleanup: + return domain.RuntimeSubstate{State: domain.RuntimeExited, Reason: domain.RuntimeReasonAutoCleanup} + default: // error + return domain.RuntimeSubstate{State: domain.RuntimeExited, Reason: domain.RuntimeReasonProbeError} + } +} + +func nowOr(t time.Time) time.Time { + if t.IsZero() { + return time.Now() + } + return t +} diff --git a/backend/internal/lifecycle/fakes_test.go b/backend/internal/lifecycle/fakes_test.go new file mode 100644 index 00000000..904693aa --- /dev/null +++ b/backend/internal/lifecycle/fakes_test.go @@ -0,0 +1,161 @@ +package lifecycle + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +// fakeStore is an in-memory LifecycleStore that faithfully applies merge-patch +// semantics (sparse field writes, the three-way Detecting/ClearDetecting rule, +// ExpectedRevision optimistic-concurrency check, monotonic Revision bump) so +// tests assert against the real persisted canonical. +type fakeStore struct { + mu sync.Mutex + records map[domain.SessionID]*domain.SessionRecord + metadata map[domain.SessionID]map[string]string +} + +var _ ports.LifecycleStore = (*fakeStore)(nil) + +func newFakeStore() *fakeStore { + return &fakeStore{ + records: map[domain.SessionID]*domain.SessionRecord{}, + metadata: map[domain.SessionID]map[string]string{}, + } +} + +// seed installs a starting lifecycle for a session id (bypassing the patch path). +func (s *fakeStore) seed(id domain.SessionID, l domain.CanonicalSessionLifecycle) { + s.mu.Lock() + defer s.mu.Unlock() + if l.Version == 0 { + l.Version = domain.LifecycleVersion + } + s.records[id] = &domain.SessionRecord{ID: id, Lifecycle: l} +} + +func (s *fakeStore) Load(_ context.Context, id domain.SessionID) (domain.CanonicalSessionLifecycle, bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + rec, ok := s.records[id] + if !ok { + return domain.CanonicalSessionLifecycle{}, false, nil + } + return rec.Lifecycle, true, nil +} + +func (s *fakeStore) PatchLifecycle(_ context.Context, id domain.SessionID, p ports.LifecyclePatch) error { + s.mu.Lock() + defer s.mu.Unlock() + + rec, ok := s.records[id] + if !ok { + rec = &domain.SessionRecord{ID: id, Lifecycle: domain.CanonicalSessionLifecycle{Version: domain.LifecycleVersion}} + s.records[id] = rec + } + l := &rec.Lifecycle + + if p.ExpectedRevision != nil && *p.ExpectedRevision != l.Revision { + return fmt.Errorf("revision mismatch for %s: have %d, expected %d", id, l.Revision, *p.ExpectedRevision) + } + + if p.Session != nil { + l.Session = *p.Session + } + if p.PR != nil { + l.PR = *p.PR + } + if p.Runtime != nil { + l.Runtime = *p.Runtime + } + if p.Activity != nil { + l.Activity = *p.Activity + } + switch { + case p.ClearDetecting: + l.Detecting = nil + case p.Detecting != nil: + d := *p.Detecting + l.Detecting = &d + } + + l.Version = domain.LifecycleVersion + l.Revision++ + rec.UpdatedAt = time.Now() + return nil +} + +func (s *fakeStore) List(_ context.Context, project domain.ProjectID) ([]domain.SessionRecord, error) { + s.mu.Lock() + defer s.mu.Unlock() + var out []domain.SessionRecord + for _, rec := range s.records { + if rec.ProjectID == project { + out = append(out, *rec) + } + } + return out, nil +} + +func (s *fakeStore) GetMetadata(_ context.Context, id domain.SessionID) (map[string]string, error) { + s.mu.Lock() + defer s.mu.Unlock() + out := map[string]string{} + for k, v := range s.metadata[id] { + out[k] = v + } + return out, nil +} + +func (s *fakeStore) PatchMetadata(_ context.Context, id domain.SessionID, kv map[string]string) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.metadata[id] == nil { + s.metadata[id] = map[string]string{} + } + for k, v := range kv { + s.metadata[id][k] = v + } + return nil +} + +// recordingNotifier captures emitted events for assertions. +type recordingNotifier struct { + mu sync.Mutex + events []ports.OrchestratorEvent +} + +var _ ports.Notifier = (*recordingNotifier)(nil) + +func (n *recordingNotifier) Notify(_ context.Context, e ports.OrchestratorEvent) error { + n.mu.Lock() + defer n.mu.Unlock() + n.events = append(n.events, e) + return nil +} + +// recordingMessenger captures messages injected into agents. +type recordingMessenger struct { + mu sync.Mutex + sent []struct { + ID domain.SessionID + Message string + } +} + +var _ ports.AgentMessenger = (*recordingMessenger)(nil) + +func (a *recordingMessenger) Send(_ context.Context, id domain.SessionID, message string) error { + a.mu.Lock() + defer a.mu.Unlock() + a.sent = append(a.sent, struct { + ID domain.SessionID + Message string + }{id, message}) + return nil +} diff --git a/backend/internal/lifecycle/manager.go b/backend/internal/lifecycle/manager.go new file mode 100644 index 00000000..55a594bb --- /dev/null +++ b/backend/internal/lifecycle/manager.go @@ -0,0 +1,327 @@ +// Package lifecycle implements ports.LifecycleManager: the synchronous +// observe->decide->persist reducer. Every Apply*/On* entrypoint runs the same +// pipeline under a per-session lock — load canonical, run the matching pure +// decider, diff the result into a sparse merge-patch, persist. The LCM never +// polls and never writes the display status (that is derived on read). +// +// Split A scope is the Apply* pipeline only. The reaction table + escalation +// engine (ACT) and the Session Manager land in later splits; TickEscalations is +// a documented no-op here. +package lifecycle + +import ( + "context" + "sync" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/domain/decide" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +// Metadata keys OnSpawnCompleted records for the spawned session's handles. +const ( + MetaBranch = "branch" + MetaWorkspacePath = "workspacePath" + MetaRuntimeHandleID = "runtimeHandleId" + MetaRuntimeName = "runtimeName" + MetaAgentSessionID = "agentSessionId" +) + +// Manager is the LCM. Notifier/AgentMessenger are held for the ACT lane (split +// B); the Apply* pipeline does not fire reactions yet. +type Manager struct { + store ports.LifecycleStore + notifier ports.Notifier + messenger ports.AgentMessenger + + recentActivityWindow time.Duration + locks keyedMutex +} + +var _ ports.LifecycleManager = (*Manager)(nil) + +func New(store ports.LifecycleStore, notifier ports.Notifier, messenger ports.AgentMessenger) *Manager { + return &Manager{ + store: store, + notifier: notifier, + messenger: messenger, + recentActivityWindow: defaultRecentActivityWindow, + } +} + +// ---- per-session serialisation ---- + +// keyedMutex hands out one lock per session id so the load->decide->persist +// read-modify-write is serial within a session but parallel across sessions. +type keyedMutex struct { + mu sync.Mutex + locks map[domain.SessionID]*sync.Mutex +} + +func (k *keyedMutex) lock(id domain.SessionID) func() { + k.mu.Lock() + if k.locks == nil { + k.locks = make(map[domain.SessionID]*sync.Mutex) + } + m, ok := k.locks[id] + if !ok { + m = &sync.Mutex{} + k.locks[id] = m + } + k.mu.Unlock() + + m.Lock() + return m.Unlock +} + +func (m *Manager) withLock(id domain.SessionID, fn func() error) error { + unlock := m.locks.lock(id) + defer unlock() + return fn() +} + +// mutate runs the shared pipeline: load -> build patch -> persist (only if the +// patch changed something). decideFn returns the diffed patch and whether it +// touches anything; a false "changed" is a clean no-op (no write, no revision +// bump), which is how failed-probe / unknown-fact inputs are dropped. +func (m *Manager) mutate( + ctx context.Context, + id domain.SessionID, + decideFn func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error), +) error { + return m.withLock(id, func() error { + cur, exists, err := m.store.Load(ctx, id) + if err != nil { + return err + } + patch, changed, err := decideFn(cur, exists) + if err != nil { + return err + } + if !changed { + return nil + } + return m.store.PatchLifecycle(ctx, id, patch) + }) +} + +// ---- OBSERVE entrypoints ---- + +// ApplyRuntimeObservation feeds the probe decider. Liveness always writes the +// runtime axis; the session axis follows the #1 composition rule; and a +// non-detecting verdict clears any stale detecting memory (#3) so the next +// probe doesn't read a phantom prior. +func (m *Manager) ApplyRuntimeObservation(ctx context.Context, id domain.SessionID, f ports.RuntimeFacts) error { + return m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) { + if !exists { + return ports.LifecyclePatch{}, false, nil // nothing seeded; ignore stray probe + } + + d := decide.ResolveProbeDecision(runtimeFactsToProbeInput(f, cur, m.recentActivityWindow)) + + var patch ports.LifecyclePatch + changed := false + + if rt := runtimeSubstateFromFacts(f); cur.Runtime != rt { + patch.Runtime = &rt + changed = true + } + if shouldWriteSessionRuntime(d, cur) { + changed = setSessionIfChanged(&patch, cur, d.SessionState, d.SessionReason) || changed + } + changed = setDetecting(&patch, cur, d.Detecting) || changed + + return patch, changed, nil + }) +} + +// ApplySCMObservation maps PR facts onto the PR axis. A failed fetch is dropped +// (failed probe != "no PR"). An open PR writes only the PR sub-state — the +// session axis stays owned by activity, and DeriveLegacyStatus surfaces the PR +// reason for display. A terminal PR (merged/closed) also parks the session. +func (m *Manager) ApplySCMObservation(ctx context.Context, id domain.SessionID, f ports.SCMFacts) error { + return m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) { + if !exists || !f.Fetched { + return ports.LifecyclePatch{}, false, nil + } + + switch f.PRState { + case domain.PROpen: + d := decide.ResolveOpenPRDecision(openPRInput(f)) + var patch ports.LifecyclePatch + changed := setPRIfChanged(&patch, cur, d, f) + return patch, changed, nil + + case domain.PRMerged, domain.PRClosed: + d := decide.ResolveTerminalPRStateDecision(f.PRState) + var patch ports.LifecyclePatch + changed := setPRIfChanged(&patch, cur, d, f) + if !isTerminal(cur.Session.State) { + changed = setSessionIfChanged(&patch, cur, d.SessionState, d.SessionReason) || changed + } + return patch, changed, nil + + default: // none / unset: no PR-driven transition in split A + return ports.LifecyclePatch{}, false, nil + } + }) +} + +// ApplyActivitySignal updates the activity axis. Only a valid-confidence signal +// is authoritative (stale/unavailable/probe_failure != idleness). It refreshes +// the persisted activity sub-state (the probe decider's RecentActivity input) +// and maps the classification onto the session axis, subject to the mirror +// composition rule that keeps activity off the death axis. +func (m *Manager) ApplyActivitySignal(ctx context.Context, id domain.SessionID, s ports.ActivitySignal) error { + return m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) { + if !exists || s.State != ports.SignalValid { + return ports.LifecyclePatch{}, false, nil + } + + var patch ports.LifecyclePatch + changed := false + + act := domain.ActivitySubstate{State: s.Activity, LastActivityAt: nowOr(s.Timestamp), Source: s.Source} + if !sameActivity(cur.Activity, act) { + patch.Activity = &act + changed = true + } + if st, rs, ok := activityToSession(s.Activity); ok && shouldWriteSessionActivity(cur) { + changed = setSessionIfChanged(&patch, cur, st, rs) || changed + } + + return patch, changed, nil + }) +} + +// ---- mutation outcomes reported by the Session Manager ---- + +// OnSpawnCompleted records that a spawn finished: the runtime is up and the +// handles are known. Per the agreed rule it flips the runtime axis to alive and +// stores the handles in metadata, but leaves the session at not_started +// (display: spawning) — the agent "acknowledges" via the first activity signal. +func (m *Manager) OnSpawnCompleted(ctx context.Context, id domain.SessionID, o ports.SpawnOutcome) error { + return m.withLock(id, func() error { + cur, _, err := m.store.Load(ctx, id) + if err != nil { + return err + } + rt := domain.RuntimeSubstate{State: domain.RuntimeAlive, Reason: domain.RuntimeReasonProcessRunning} + if cur.Runtime != rt { + if err := m.store.PatchLifecycle(ctx, id, ports.LifecyclePatch{Runtime: &rt}); err != nil { + return err + } + } + if meta := spawnMetadata(o); len(meta) > 0 { + if err := m.store.PatchMetadata(ctx, id, meta); err != nil { + return err + } + } + return nil + }) +} + +// OnKillRequested is the SM's explicit terminal-write authority (the one +// terminal path that does not go through the inferred-death decider). It writes +// the terminal session/runtime sub-states for the kill kind and clears any +// in-flight detecting memory. +func (m *Manager) OnKillRequested(ctx context.Context, id domain.SessionID, r ports.KillReason) error { + return m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) { + var patch ports.LifecyclePatch + changed := false + + if sess := killSession(r.Kind); cur.Session != sess { + patch.Session = &sess + changed = true + } + if rt := killRuntime(r.Kind); cur.Runtime != rt { + patch.Runtime = &rt + changed = true + } + if cur.Detecting != nil { + patch.ClearDetecting = true + changed = true + } + return patch, changed, nil + }) +} + +// TickEscalations is a no-op in split A. The reaper will call this to fire +// duration-based escalations the synchronous LCM can't wake itself for, but the +// reaction table + escalation engine that back it land in split B. +func (m *Manager) TickEscalations(ctx context.Context, now time.Time) error { + return nil +} + +// ---- patch helpers (diff -> sparse merge-patch) ---- + +// setSessionIfChanged sets patch.Session only when the decided sub-state +// differs from current; an empty decided state means "decider does not address +// the session axis" and is left untouched. +func setSessionIfChanged(patch *ports.LifecyclePatch, cur domain.CanonicalSessionLifecycle, st domain.SessionState, rs domain.SessionReason) bool { + if st == "" { + return false + } + want := domain.SessionSubstate{State: st, Reason: rs} + if cur.Session == want { + return false + } + patch.Session = &want + return true +} + +// setPRIfChanged folds the decided PR sub-state plus the fact-borne PR identity +// (number/url) into the patch when it differs from current. +func setPRIfChanged(patch *ports.LifecyclePatch, cur domain.CanonicalSessionLifecycle, d decide.LifecycleDecision, f ports.SCMFacts) bool { + want := domain.PRSubstate{State: d.PRState, Reason: d.PRReason, Number: f.PRNumber, URL: f.PRURL} + if cur.PR == want { + return false + } + patch.PR = &want + return true +} + +// setDetecting implements the three-way detecting semantics: set/replace when +// the decision carries memory, clear (#3) when it doesn't but canonical still +// holds stale memory, else leave untouched. +func setDetecting(patch *ports.LifecyclePatch, cur domain.CanonicalSessionLifecycle, d *domain.DetectingState) bool { + if d != nil { + if cur.Detecting != nil && *cur.Detecting == *d { + return false + } + patch.Detecting = d + return true + } + if cur.Detecting != nil { + patch.ClearDetecting = true + return true + } + return false +} + +// sameActivity compares activity sub-states with time-aware equality (== on +// time.Time is monotonic-clock sensitive and would spuriously report changes). +func sameActivity(a, b domain.ActivitySubstate) bool { + return a.State == b.State && a.Source == b.Source && a.LastActivityAt.Equal(b.LastActivityAt) +} + +func spawnMetadata(o ports.SpawnOutcome) map[string]string { + meta := map[string]string{} + if o.Branch != "" { + meta[MetaBranch] = o.Branch + } + if o.WorkspacePath != "" { + meta[MetaWorkspacePath] = o.WorkspacePath + } + if o.RuntimeHandle.ID != "" { + meta[MetaRuntimeHandleID] = o.RuntimeHandle.ID + } + if o.RuntimeHandle.RuntimeName != "" { + meta[MetaRuntimeName] = o.RuntimeHandle.RuntimeName + } + if o.AgentSessionID != "" { + meta[MetaAgentSessionID] = o.AgentSessionID + } + return meta +} diff --git a/backend/internal/lifecycle/manager_test.go b/backend/internal/lifecycle/manager_test.go new file mode 100644 index 00000000..6f9f7f71 --- /dev/null +++ b/backend/internal/lifecycle/manager_test.go @@ -0,0 +1,388 @@ +package lifecycle + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +var t0 = time.Date(2026, 5, 26, 12, 0, 0, 0, time.UTC) + +const sid domain.SessionID = "s1" + +func newManager() (*Manager, *fakeStore) { + store := newFakeStore() + return New(store, &recordingNotifier{}, &recordingMessenger{}), store +} + +func mustLoad(t *testing.T, store *fakeStore) domain.CanonicalSessionLifecycle { + t.Helper() + l, ok, err := store.Load(context.Background(), sid) + if err != nil || !ok { + t.Fatalf("load: ok=%v err=%v", ok, err) + } + return l +} + +// ---- ApplyRuntimeObservation + #1 composition + #3 detecting clear ---- + +func TestApplyRuntimeObservation(t *testing.T) { + aliveProbe := ports.RuntimeFacts{RuntimeState: ports.RuntimeProbeAlive, ProcessState: ports.ProcessProbeAlive, ObservedAt: t0} + failedProbe := ports.RuntimeFacts{RuntimeState: ports.RuntimeProbeFailed, ProcessState: ports.ProcessProbeAlive, ObservedAt: t0} + deadProbe := ports.RuntimeFacts{RuntimeState: ports.RuntimeProbeDead, ProcessState: ports.ProcessProbeDead, ObservedAt: t0} + + tests := []struct { + name string + seed domain.CanonicalSessionLifecycle + facts ports.RuntimeFacts + wantSession domain.SessionState + wantReason domain.SessionReason + wantRuntime domain.RuntimeState + wantDisplay domain.SessionStatus + wantDetecting bool // expect non-nil detecting memory persisted + }{ + { + name: "healthy probe must not clobber an activity-owned needs_input (#1)", + seed: lc(domain.SessionNeedsInput, domain.ReasonAwaitingUserInput, domain.RuntimeAlive), + facts: aliveProbe, + wantSession: domain.SessionNeedsInput, + wantReason: domain.ReasonAwaitingUserInput, + wantRuntime: domain.RuntimeAlive, + wantDisplay: domain.StatusNeedsInput, + wantDetecting: false, + }, + { + name: "healthy probe recovers a liveness-owned detecting -> working and clears memory (#1 + #3)", + seed: detectingLC(), + facts: aliveProbe, + wantSession: domain.SessionWorking, + wantReason: domain.ReasonTaskInProgress, + wantRuntime: domain.RuntimeAlive, + wantDisplay: domain.StatusWorking, + wantDetecting: false, + }, + { + name: "failed probe routes to detecting and records memory", + seed: lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive), + facts: failedProbe, + wantSession: domain.SessionDetecting, + wantReason: domain.ReasonProbeFailure, + wantRuntime: domain.RuntimeProbeFailed, + wantDisplay: domain.StatusDetecting, + wantDetecting: true, + }, + { + name: "dead+dead with no recent activity concludes killed and clears detecting (#3)", + seed: detectingLC(), + facts: deadProbe, + wantSession: domain.SessionTerminated, + wantReason: domain.ReasonRuntimeLost, + wantRuntime: domain.RuntimeExited, + wantDisplay: domain.StatusKilled, + wantDetecting: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mgr, store := newManager() + store.seed(sid, tt.seed) + + if err := mgr.ApplyRuntimeObservation(context.Background(), sid, tt.facts); err != nil { + t.Fatalf("apply: %v", err) + } + + l := mustLoad(t, store) + if l.Session.State != tt.wantSession || l.Session.Reason != tt.wantReason { + t.Errorf("session = %v/%v, want %v/%v", l.Session.State, l.Session.Reason, tt.wantSession, tt.wantReason) + } + if l.Runtime.State != tt.wantRuntime { + t.Errorf("runtime = %v, want %v", l.Runtime.State, tt.wantRuntime) + } + if got := domain.DeriveLegacyStatus(l); got != tt.wantDisplay { + t.Errorf("display = %v, want %v", got, tt.wantDisplay) + } + if (l.Detecting != nil) != tt.wantDetecting { + t.Errorf("detecting present = %v, want %v (%+v)", l.Detecting != nil, tt.wantDetecting, l.Detecting) + } + }) + } +} + +func TestApplyRuntimeObservation_NoRecordIsNoOp(t *testing.T) { + mgr, store := newManager() + if err := mgr.ApplyRuntimeObservation(context.Background(), sid, ports.RuntimeFacts{RuntimeState: ports.RuntimeProbeAlive, ProcessState: ports.ProcessProbeAlive, ObservedAt: t0}); err != nil { + t.Fatalf("apply: %v", err) + } + if _, ok, _ := store.Load(context.Background(), sid); ok { + t.Error("a probe for an unseeded session must not fabricate a record") + } +} + +// ---- ApplyActivitySignal ---- + +func TestApplyActivitySignal(t *testing.T) { + tests := []struct { + name string + seed domain.CanonicalSessionLifecycle + signal ports.ActivitySignal + wantSession domain.SessionState + wantActivity domain.ActivityState + wantChanged bool + }{ + { + name: "valid waiting_input maps to needs_input", + seed: lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive), + signal: ports.ActivitySignal{State: ports.SignalValid, Activity: domain.ActivityWaitingInput, Timestamp: t0, Source: domain.SourceHook}, + wantSession: domain.SessionNeedsInput, + wantActivity: domain.ActivityWaitingInput, + wantChanged: true, + }, + { + name: "valid active recovers needs_input -> working", + seed: lc(domain.SessionNeedsInput, domain.ReasonAwaitingUserInput, domain.RuntimeAlive), + signal: ports.ActivitySignal{State: ports.SignalValid, Activity: domain.ActivityActive, Timestamp: t0, Source: domain.SourceHook}, + wantSession: domain.SessionWorking, + wantActivity: domain.ActivityActive, + wantChanged: true, + }, + { + name: "low-confidence signal is dropped (no idleness inferred)", + seed: lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive), + signal: ports.ActivitySignal{State: ports.SignalProbeFailure, Activity: domain.ActivityIdle, Timestamp: t0, Source: domain.SourceHook}, + wantSession: domain.SessionWorking, + wantChanged: false, + }, + { + name: "activity does not touch a liveness-owned detecting session", + seed: detectingLC(), + signal: ports.ActivitySignal{State: ports.SignalValid, Activity: domain.ActivityActive, Timestamp: t0, Source: domain.SourceHook}, + wantSession: domain.SessionDetecting, + wantActivity: domain.ActivityActive, + wantChanged: true, // activity sub-state still updates + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mgr, store := newManager() + store.seed(sid, tt.seed) + + if err := mgr.ApplyActivitySignal(context.Background(), sid, tt.signal); err != nil { + t.Fatalf("apply: %v", err) + } + + l := mustLoad(t, store) + if l.Session.State != tt.wantSession { + t.Errorf("session = %v, want %v", l.Session.State, tt.wantSession) + } + if tt.wantChanged && l.Revision != 1 { + t.Errorf("revision = %d, want 1 (expected a write)", l.Revision) + } + if !tt.wantChanged && l.Revision != 0 { + t.Errorf("revision = %d, want 0 (expected a no-op)", l.Revision) + } + if tt.wantChanged && tt.wantActivity != "" && l.Activity.State != tt.wantActivity { + t.Errorf("activity = %v, want %v", l.Activity.State, tt.wantActivity) + } + if tt.name == "activity does not touch a liveness-owned detecting session" && l.Detecting == nil { + t.Error("activity must leave detecting memory for the probe pipeline to resolve") + } + }) + } +} + +// ---- ApplySCMObservation ---- + +func TestApplySCMObservation(t *testing.T) { + t.Run("failed fetch is a no-op (failed probe != no PR)", func(t *testing.T) { + mgr, store := newManager() + store.seed(sid, lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive)) + if err := mgr.ApplySCMObservation(context.Background(), sid, ports.SCMFacts{Fetched: false, PRState: domain.PROpen}); err != nil { + t.Fatalf("apply: %v", err) + } + if l := mustLoad(t, store); l.Revision != 0 || l.PR.State != "" { + t.Errorf("expected no-op, got revision=%d pr=%v", l.Revision, l.PR.State) + } + }) + + t.Run("open PR writes only the PR axis; session stays activity-owned", func(t *testing.T) { + mgr, store := newManager() + store.seed(sid, lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive)) + f := ports.SCMFacts{Fetched: true, PRState: domain.PROpen, CISummary: ports.CIFailing, PRNumber: 12, PRURL: "https://x/12"} + if err := mgr.ApplySCMObservation(context.Background(), sid, f); err != nil { + t.Fatalf("apply: %v", err) + } + l := mustLoad(t, store) + if l.PR.State != domain.PROpen || l.PR.Reason != domain.PRReasonCIFailing || l.PR.Number != 12 { + t.Errorf("pr = %+v, want open/ci_failing/#12", l.PR) + } + if l.Session.State != domain.SessionWorking { + t.Errorf("session = %v, want working (untouched)", l.Session.State) + } + if got := domain.DeriveLegacyStatus(l); got != domain.StatusCIFailed { + t.Errorf("display = %v, want ci_failed", got) + } + }) + + t.Run("merged PR parks the session and displays merged", func(t *testing.T) { + mgr, store := newManager() + seed := lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive) + seed.PR = domain.PRSubstate{State: domain.PROpen, Reason: domain.PRReasonInProgress, Number: 12} + store.seed(sid, seed) + f := ports.SCMFacts{Fetched: true, PRState: domain.PRMerged, PRNumber: 12} + if err := mgr.ApplySCMObservation(context.Background(), sid, f); err != nil { + t.Fatalf("apply: %v", err) + } + l := mustLoad(t, store) + if l.PR.State != domain.PRMerged || l.Session.Reason != domain.ReasonMergedWaitingDecision { + t.Errorf("got pr=%v session=%v, want merged + merged_waiting_decision", l.PR.State, l.Session.Reason) + } + if got := domain.DeriveLegacyStatus(l); got != domain.StatusMerged { + t.Errorf("display = %v, want merged", got) + } + }) + + t.Run("no PR is a no-op in split A", func(t *testing.T) { + mgr, store := newManager() + store.seed(sid, lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive)) + if err := mgr.ApplySCMObservation(context.Background(), sid, ports.SCMFacts{Fetched: true, PRState: domain.PRNone}); err != nil { + t.Fatalf("apply: %v", err) + } + if l := mustLoad(t, store); l.Revision != 0 { + t.Errorf("expected no-op, got revision=%d", l.Revision) + } + }) +} + +// ---- mutation outcomes ---- + +func TestOnSpawnCompleted(t *testing.T) { + mgr, store := newManager() + store.seed(sid, lc(domain.SessionNotStarted, domain.ReasonSpawnRequested, domain.RuntimeUnknown)) + + out := ports.SpawnOutcome{ + Branch: "feat/x", + WorkspacePath: "/w/x", + RuntimeHandle: ports.RuntimeHandle{ID: "tmux:1", RuntimeName: "tmux"}, + AgentSessionID: "agent-1", + } + if err := mgr.OnSpawnCompleted(context.Background(), sid, out); err != nil { + t.Fatalf("apply: %v", err) + } + + l := mustLoad(t, store) + if l.Runtime.State != domain.RuntimeAlive { + t.Errorf("runtime = %v, want alive", l.Runtime.State) + } + if l.Session.State != domain.SessionNotStarted { + t.Errorf("session = %v, want not_started (spawn does not assert acknowledgement)", l.Session.State) + } + if got := domain.DeriveLegacyStatus(l); got != domain.StatusSpawning { + t.Errorf("display = %v, want spawning", got) + } + meta, _ := store.GetMetadata(context.Background(), sid) + if meta[MetaBranch] != "feat/x" || meta[MetaAgentSessionID] != "agent-1" || meta[MetaRuntimeName] != "tmux" { + t.Errorf("metadata not recorded: %+v", meta) + } +} + +func TestOnKillRequested(t *testing.T) { + mgr, store := newManager() + store.seed(sid, detectingLC()) + + if err := mgr.OnKillRequested(context.Background(), sid, ports.KillReason{Kind: ports.KillManual, Detail: "user"}); err != nil { + t.Fatalf("apply: %v", err) + } + + l := mustLoad(t, store) + if l.Session.State != domain.SessionTerminated || l.Session.Reason != domain.ReasonManuallyKilled { + t.Errorf("session = %v/%v, want terminated/manually_killed", l.Session.State, l.Session.Reason) + } + if l.Runtime.Reason != domain.RuntimeReasonManualKillRequested { + t.Errorf("runtime reason = %v, want manual_kill_requested", l.Runtime.Reason) + } + if l.Detecting != nil { + t.Errorf("kill must clear detecting memory, got %+v", l.Detecting) + } + if got := domain.DeriveLegacyStatus(l); got != domain.StatusKilled { + t.Errorf("display = %v, want killed", got) + } +} + +func TestTickEscalationsIsNoOp(t *testing.T) { + mgr, store := newManager() + store.seed(sid, lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive)) + if err := mgr.TickEscalations(context.Background(), t0); err != nil { + t.Fatalf("tick: %v", err) + } + if l := mustLoad(t, store); l.Revision != 0 { + t.Errorf("TickEscalations must not write, got revision=%d", l.Revision) + } +} + +// ---- fake store contract ---- + +func TestFakeStoreExpectedRevision(t *testing.T) { + store := newFakeStore() + store.seed(sid, lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive)) // revision 0 + rt := domain.RuntimeSubstate{State: domain.RuntimeExited} + + bad := 99 + if err := store.PatchLifecycle(context.Background(), sid, ports.LifecyclePatch{Runtime: &rt, ExpectedRevision: &bad}); err == nil { + t.Error("stale ExpectedRevision must be rejected") + } + good := 0 + if err := store.PatchLifecycle(context.Background(), sid, ports.LifecyclePatch{Runtime: &rt, ExpectedRevision: &good}); err != nil { + t.Errorf("matching ExpectedRevision must succeed, got %v", err) + } +} + +// ---- per-session serialisation under the race detector ---- + +func TestPerSessionSerialization(t *testing.T) { + mgr, store := newManager() + store.seed(sid, lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive)) + + const n = 50 + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + go func(i int) { + defer wg.Done() + _ = mgr.ApplyActivitySignal(context.Background(), sid, ports.ActivitySignal{ + State: ports.SignalValid, + Activity: domain.ActivityActive, + Timestamp: t0.Add(time.Duration(i) * time.Second), + Source: domain.SourceHook, + }) + }(i) + } + wg.Wait() + + // Each goroutine writes a distinct LastActivityAt, so every call is a real + // change; with correct serialisation all n land without a lost update. + if l := mustLoad(t, store); l.Revision != n { + t.Errorf("revision = %d, want %d (lost update under concurrency)", l.Revision, n) + } +} + +// ---- helpers ---- + +func lc(state domain.SessionState, reason domain.SessionReason, rt domain.RuntimeState) domain.CanonicalSessionLifecycle { + return domain.CanonicalSessionLifecycle{ + Version: domain.LifecycleVersion, + Session: domain.SessionSubstate{State: state, Reason: reason}, + Runtime: domain.RuntimeSubstate{State: rt}, + } +} + +func detectingLC() domain.CanonicalSessionLifecycle { + l := lc(domain.SessionDetecting, domain.ReasonRuntimeLost, domain.RuntimeMissing) + l.Detecting = &domain.DetectingState{Attempts: 1, StartedAt: t0, EvidenceHash: "abc"} + return l +} From 3945b10f2057743919d33799dd9144c4a4de8324 Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Wed, 27 May 2026 01:37:14 +0530 Subject: [PATCH 2/3] fix(lifecycle): address PR #5 review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - shouldWriteSessionRuntime: never resurrect a terminal session; an observation may refresh the runtime axis but must touch neither the session axis nor the detecting memory (gated in ApplyRuntimeObservation). - OnSpawnCompleted: error on an unseeded session instead of fabricating a partial record (SM must seed first — a missing seed is a contract violation). - OnKillRequested: no-op on an unknown/already-gone session (benign race) instead of fabricating a terminal record. - keyedMutex: reference-count entries and evict on last release so the lock map stays bounded in a long-running daemon. - runtimeSubstateFromFacts: map RuntimeProbeIndeterminate to RuntimeUnknown with a neutral reason, distinct from the probe_error of a failed probe. Adds tests for terminal non-resurrection, unseeded spawn-completed error, and unknown-session kill no-op. Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/internal/lifecycle/decide_bridge.go | 16 ++++-- backend/internal/lifecycle/manager.go | 59 +++++++++++++++++---- backend/internal/lifecycle/manager_test.go | 40 ++++++++++++++ 3 files changed, 101 insertions(+), 14 deletions(-) diff --git a/backend/internal/lifecycle/decide_bridge.go b/backend/internal/lifecycle/decide_bridge.go index 059ac2eb..d1ac7f65 100644 --- a/backend/internal/lifecycle/decide_bridge.go +++ b/backend/internal/lifecycle/decide_bridge.go @@ -71,8 +71,12 @@ func runtimeSubstateFromFacts(f ports.RuntimeFacts) domain.RuntimeSubstate { return domain.RuntimeSubstate{State: domain.RuntimeExited, Reason: domain.RuntimeReasonTmuxMissing} case ports.RuntimeProbeFailed: return domain.RuntimeSubstate{State: domain.RuntimeProbeFailed, Reason: domain.RuntimeReasonProbeError} - default: - return domain.RuntimeSubstate{State: domain.RuntimeUnknown, Reason: domain.RuntimeReasonProbeError} + case ports.RuntimeProbeIndeterminate: + // Probe ran but couldn't tell — distinct from a probe error, so no + // probe_error reason; the ambiguity is carried by RuntimeUnknown alone. + return domain.RuntimeSubstate{State: domain.RuntimeUnknown} + default: // unset + return domain.RuntimeSubstate{State: domain.RuntimeUnknown} } } @@ -158,8 +162,14 @@ func isLivenessOwned(s domain.SessionSubstate) bool { // (e.g. detecting -> working); it must NOT clobber an activity-owned // needs_input/blocked/idle the activity axis is responsible for. func shouldWriteSessionRuntime(d decide.LifecycleDecision, cur domain.CanonicalSessionLifecycle) bool { + if isTerminal(cur.Session.State) { + // A terminal session is only reopened by an explicit Restore — never by + // an observation. Even a death-axis verdict (e.g. detecting) must not + // resurrect it; the runtime axis is still patched separately. + return false + } if d.SessionState == domain.SessionWorking { - return !isTerminal(cur.Session.State) && isLivenessOwned(cur.Session) + return isLivenessOwned(cur.Session) } return true } diff --git a/backend/internal/lifecycle/manager.go b/backend/internal/lifecycle/manager.go index 55a594bb..b7f9d0aa 100644 --- a/backend/internal/lifecycle/manager.go +++ b/backend/internal/lifecycle/manager.go @@ -11,6 +11,7 @@ package lifecycle import ( "context" + "fmt" "sync" "time" @@ -54,25 +55,43 @@ func New(store ports.LifecycleStore, notifier ports.Notifier, messenger ports.Ag // keyedMutex hands out one lock per session id so the load->decide->persist // read-modify-write is serial within a session but parallel across sessions. +// +// Entries are reference-counted and evicted when the last holder releases, so +// the map stays bounded to sessions with in-flight operations rather than +// growing unbounded over the lifetime of a long-running daemon. type keyedMutex struct { mu sync.Mutex - locks map[domain.SessionID]*sync.Mutex + locks map[domain.SessionID]*lockEntry +} + +type lockEntry struct { + mu sync.Mutex + refs int } func (k *keyedMutex) lock(id domain.SessionID) func() { k.mu.Lock() if k.locks == nil { - k.locks = make(map[domain.SessionID]*sync.Mutex) + k.locks = make(map[domain.SessionID]*lockEntry) } - m, ok := k.locks[id] + e, ok := k.locks[id] if !ok { - m = &sync.Mutex{} - k.locks[id] = m + e = &lockEntry{} + k.locks[id] = e } + e.refs++ k.mu.Unlock() - m.Lock() - return m.Unlock + e.mu.Lock() + return func() { + e.mu.Unlock() + k.mu.Lock() + e.refs-- + if e.refs == 0 { + delete(k.locks, id) + } + k.mu.Unlock() + } } func (m *Manager) withLock(id domain.SessionID, fn func() error) error { @@ -127,10 +146,15 @@ func (m *Manager) ApplyRuntimeObservation(ctx context.Context, id domain.Session patch.Runtime = &rt changed = true } - if shouldWriteSessionRuntime(d, cur) { - changed = setSessionIfChanged(&patch, cur, d.SessionState, d.SessionReason) || changed + // A terminal session is reopened only by an explicit Restore: an + // observation may refresh the runtime axis above but must touch neither + // the session axis nor the detecting memory. + if !isTerminal(cur.Session.State) { + if shouldWriteSessionRuntime(d, cur) { + changed = setSessionIfChanged(&patch, cur, d.SessionState, d.SessionReason) || changed + } + changed = setDetecting(&patch, cur, d.Detecting) || changed } - changed = setDetecting(&patch, cur, d.Detecting) || changed return patch, changed, nil }) @@ -203,10 +227,16 @@ func (m *Manager) ApplyActivitySignal(ctx context.Context, id domain.SessionID, // (display: spawning) — the agent "acknowledges" via the first activity signal. func (m *Manager) OnSpawnCompleted(ctx context.Context, id domain.SessionID, o ports.SpawnOutcome) error { return m.withLock(id, func() error { - cur, _, err := m.store.Load(ctx, id) + cur, exists, err := m.store.Load(ctx, id) if err != nil { return err } + if !exists { + // The SM seeds the initial lifecycle before spawning; a completion + // for an unseeded session is a contract violation, not a stray + // observation, so surface it rather than fabricating a record. + return fmt.Errorf("lifecycle: OnSpawnCompleted for unseeded session %q", id) + } rt := domain.RuntimeSubstate{State: domain.RuntimeAlive, Reason: domain.RuntimeReasonProcessRunning} if cur.Runtime != rt { if err := m.store.PatchLifecycle(ctx, id, ports.LifecyclePatch{Runtime: &rt}); err != nil { @@ -228,6 +258,13 @@ func (m *Manager) OnSpawnCompleted(ctx context.Context, id domain.SessionID, o p // in-flight detecting memory. func (m *Manager) OnKillRequested(ctx context.Context, id domain.SessionID, r ports.KillReason) error { return m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) { + if !exists { + // Killing an unknown/already-gone session is a benign race; no-op + // rather than fabricating a terminal record for a session we never + // knew about. + return ports.LifecyclePatch{}, false, nil + } + var patch ports.LifecyclePatch changed := false diff --git a/backend/internal/lifecycle/manager_test.go b/backend/internal/lifecycle/manager_test.go index 6f9f7f71..dae266b8 100644 --- a/backend/internal/lifecycle/manager_test.go +++ b/backend/internal/lifecycle/manager_test.go @@ -123,6 +123,25 @@ func TestApplyRuntimeObservation_NoRecordIsNoOp(t *testing.T) { } } +func TestApplyRuntimeObservation_DoesNotResurrectTerminal(t *testing.T) { + mgr, store := newManager() + store.seed(sid, lc(domain.SessionTerminated, domain.ReasonManuallyKilled, domain.RuntimeExited)) + + // A failed probe would normally route to detecting, but a terminal session + // must not be reopened by an observation (only an explicit Restore does). + if err := mgr.ApplyRuntimeObservation(context.Background(), sid, ports.RuntimeFacts{RuntimeState: ports.RuntimeProbeFailed, ProcessState: ports.ProcessProbeAlive, ObservedAt: t0}); err != nil { + t.Fatalf("apply: %v", err) + } + + l := mustLoad(t, store) + if l.Session.State != domain.SessionTerminated || l.Session.Reason != domain.ReasonManuallyKilled { + t.Errorf("session = %v/%v, want terminated/manually_killed (no resurrection)", l.Session.State, l.Session.Reason) + } + if l.Detecting != nil { + t.Errorf("terminal session must not gain detecting memory, got %+v", l.Detecting) + } +} + // ---- ApplyActivitySignal ---- func TestApplyActivitySignal(t *testing.T) { @@ -314,6 +333,27 @@ func TestOnKillRequested(t *testing.T) { } } +func TestOnSpawnCompleted_UnseededErrors(t *testing.T) { + mgr, store := newManager() + err := mgr.OnSpawnCompleted(context.Background(), sid, ports.SpawnOutcome{Branch: "x"}) + if err == nil { + t.Error("OnSpawnCompleted for an unseeded session must error, not fabricate a record") + } + if _, ok, _ := store.Load(context.Background(), sid); ok { + t.Error("no record should have been created") + } +} + +func TestOnKillRequested_UnseededIsNoOp(t *testing.T) { + mgr, store := newManager() + if err := mgr.OnKillRequested(context.Background(), sid, ports.KillReason{Kind: ports.KillManual}); err != nil { + t.Fatalf("kill of unknown session should be a benign no-op, got %v", err) + } + if _, ok, _ := store.Load(context.Background(), sid); ok { + t.Error("killing an unknown session must not fabricate a terminal record") + } +} + func TestTickEscalationsIsNoOp(t *testing.T) { mgr, store := newManager() store.seed(sid, lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive)) From 9eb5348604402198d9b0337bd4925287c5f7b3ec Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Wed, 27 May 2026 01:49:28 +0530 Subject: [PATCH 3/3] feat(lifecycle): activity resolves detecting + review polish MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address Harshit's PR #5 review (approve w/ design confirms + polish): - #1 (design decision): a valid activity signal is proof of life, so it now resolves a detecting session — writes the activity-mapped session state and clears the quarantine memory. Scoped to detecting only; a liveness-escalated stuck stays the probe pipeline's to resolve. Terminal still never reopens. - #2: document why a merged/closed PR parks the session axis even over an activity-owned needs_input/blocked (a merge is a milestone), unlike the open-PR path that defers to activity. - #3: map plain idle activity to a neutral session reason instead of the misleading research_complete (kept for ready, which implies completion). - #6: cover all three kill kinds (manual/cleanup/error), the open-PR review branches (changes_requested/mergeable/review_pending), and the neutral idle reason. Coverage 86.5% -> 88.6%. Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/internal/lifecycle/decide_bridge.go | 23 ++++- backend/internal/lifecycle/manager.go | 17 +++- backend/internal/lifecycle/manager_test.go | 104 +++++++++++++++----- 3 files changed, 115 insertions(+), 29 deletions(-) diff --git a/backend/internal/lifecycle/decide_bridge.go b/backend/internal/lifecycle/decide_bridge.go index d1ac7f65..942fdad4 100644 --- a/backend/internal/lifecycle/decide_bridge.go +++ b/backend/internal/lifecycle/decide_bridge.go @@ -121,8 +121,13 @@ func activityToSession(a domain.ActivityState) (domain.SessionState, domain.Sess switch a { case domain.ActivityActive: return domain.SessionWorking, domain.ReasonTaskInProgress, true - case domain.ActivityReady, domain.ActivityIdle: + case domain.ActivityReady: + // ready = the agent finished a unit and is waiting for more work. return domain.SessionIdle, domain.ReasonResearchComplete, true + case domain.ActivityIdle: + // plain inactivity carries no completion claim, so no specific reason + // (research_complete here would read misleadingly in diagnostics). + return domain.SessionIdle, "", true case domain.ActivityWaitingInput: return domain.SessionNeedsInput, domain.ReasonAwaitingUserInput, true case domain.ActivityBlocked: @@ -175,11 +180,19 @@ func shouldWriteSessionRuntime(d decide.LifecycleDecision, cur domain.CanonicalS } // shouldWriteSessionActivity is the mirror rule for ApplyActivitySignal: the -// activity axis owns working/idle/waiting, but it must not touch the death axis. -// It writes unless the session is terminal or currently liveness-owned (let the -// probe pipeline resolve detecting / death-inferred states instead). +// activity axis owns working/idle/waiting. A valid activity signal is direct +// proof of life, so it is allowed to RESOLVE a detecting session (pull it out of +// the liveness quarantine) — but it must not resurrect a terminal session, and +// it leaves a liveness-escalated stuck state to the probe pipeline (stuck is a +// deliberate human-facing escalation, not a transient quarantine). func shouldWriteSessionActivity(cur domain.CanonicalSessionLifecycle) bool { - return !isTerminal(cur.Session.State) && !isLivenessOwned(cur.Session) + if isTerminal(cur.Session.State) { + return false + } + if cur.Session.State == domain.SessionDetecting { + return true + } + return !isLivenessOwned(cur.Session) } // ---- explicit-kill mapping (SM's terminal-write authority) ---- diff --git a/backend/internal/lifecycle/manager.go b/backend/internal/lifecycle/manager.go index b7f9d0aa..eb8538d3 100644 --- a/backend/internal/lifecycle/manager.go +++ b/backend/internal/lifecycle/manager.go @@ -181,6 +181,11 @@ func (m *Manager) ApplySCMObservation(ctx context.Context, id domain.SessionID, d := decide.ResolveTerminalPRStateDecision(f.PRState) var patch ports.LifecyclePatch changed := setPRIfChanged(&patch, cur, d, f) + // A merge/close is a milestone that ends the work, so it parks the + // session axis (idle / merged_waiting_decision) even over an + // activity-owned needs_input/blocked — unlike the open-PR path, + // which leaves the session axis to activity. A terminal session is + // still never reopened. if !isTerminal(cur.Session.State) { changed = setSessionIfChanged(&patch, cur, d.SessionState, d.SessionReason) || changed } @@ -195,8 +200,9 @@ func (m *Manager) ApplySCMObservation(ctx context.Context, id domain.SessionID, // ApplyActivitySignal updates the activity axis. Only a valid-confidence signal // is authoritative (stale/unavailable/probe_failure != idleness). It refreshes // the persisted activity sub-state (the probe decider's RecentActivity input) -// and maps the classification onto the session axis, subject to the mirror -// composition rule that keeps activity off the death axis. +// and maps the classification onto the session axis. A valid signal is proof of +// life, so it may resolve a detecting session — clearing the quarantine memory +// so a later probe doesn't resume counting from a stale prior. func (m *Manager) ApplyActivitySignal(ctx context.Context, id domain.SessionID, s ports.ActivitySignal) error { return m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) { if !exists || s.State != ports.SignalValid { @@ -213,6 +219,13 @@ func (m *Manager) ApplyActivitySignal(ctx context.Context, id domain.SessionID, } if st, rs, ok := activityToSession(s.Activity); ok && shouldWriteSessionActivity(cur) { changed = setSessionIfChanged(&patch, cur, st, rs) || changed + // Proof of life that pulls the session out of detecting must also + // drop the quarantine memory (detecting memory only exists while + // detecting, so this is a no-op otherwise). + if cur.Detecting != nil { + patch.ClearDetecting = true + changed = true + } } return patch, changed, nil diff --git a/backend/internal/lifecycle/manager_test.go b/backend/internal/lifecycle/manager_test.go index dae266b8..e8c47c0a 100644 --- a/backend/internal/lifecycle/manager_test.go +++ b/backend/internal/lifecycle/manager_test.go @@ -150,6 +150,8 @@ func TestApplyActivitySignal(t *testing.T) { seed domain.CanonicalSessionLifecycle signal ports.ActivitySignal wantSession domain.SessionState + wantReason domain.SessionReason + checkReason bool wantActivity domain.ActivityState wantChanged bool }{ @@ -169,6 +171,16 @@ func TestApplyActivitySignal(t *testing.T) { wantActivity: domain.ActivityActive, wantChanged: true, }, + { + name: "valid idle maps to idle with a neutral reason", + seed: lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive), + signal: ports.ActivitySignal{State: ports.SignalValid, Activity: domain.ActivityIdle, Timestamp: t0, Source: domain.SourceHook}, + wantSession: domain.SessionIdle, + wantReason: "", + checkReason: true, + wantActivity: domain.ActivityIdle, + wantChanged: true, + }, { name: "low-confidence signal is dropped (no idleness inferred)", seed: lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive), @@ -177,12 +189,12 @@ func TestApplyActivitySignal(t *testing.T) { wantChanged: false, }, { - name: "activity does not touch a liveness-owned detecting session", + name: "valid activity resolves a detecting session (proof of life)", seed: detectingLC(), signal: ports.ActivitySignal{State: ports.SignalValid, Activity: domain.ActivityActive, Timestamp: t0, Source: domain.SourceHook}, - wantSession: domain.SessionDetecting, + wantSession: domain.SessionWorking, wantActivity: domain.ActivityActive, - wantChanged: true, // activity sub-state still updates + wantChanged: true, }, } @@ -199,6 +211,9 @@ func TestApplyActivitySignal(t *testing.T) { if l.Session.State != tt.wantSession { t.Errorf("session = %v, want %v", l.Session.State, tt.wantSession) } + if tt.checkReason && l.Session.Reason != tt.wantReason { + t.Errorf("session reason = %q, want %q", l.Session.Reason, tt.wantReason) + } if tt.wantChanged && l.Revision != 1 { t.Errorf("revision = %d, want 1 (expected a write)", l.Revision) } @@ -208,8 +223,8 @@ func TestApplyActivitySignal(t *testing.T) { if tt.wantChanged && tt.wantActivity != "" && l.Activity.State != tt.wantActivity { t.Errorf("activity = %v, want %v", l.Activity.State, tt.wantActivity) } - if tt.name == "activity does not touch a liveness-owned detecting session" && l.Detecting == nil { - t.Error("activity must leave detecting memory for the probe pipeline to resolve") + if tt.name == "valid activity resolves a detecting session (proof of life)" && l.Detecting != nil { + t.Errorf("resolving detecting must clear the quarantine memory, got %+v", l.Detecting) } }) } @@ -266,6 +281,35 @@ func TestApplySCMObservation(t *testing.T) { } }) + t.Run("open-PR review branches map to the PR axis", func(t *testing.T) { + cases := []struct { + name string + facts ports.SCMFacts + wantReason domain.PRReason + wantStatus domain.SessionStatus + }{ + {"changes requested", ports.SCMFacts{Fetched: true, PRState: domain.PROpen, ReviewDecision: ports.ReviewChangesRequested}, domain.PRReasonChangesRequested, domain.StatusChangesRequested}, + {"approved + mergeable", ports.SCMFacts{Fetched: true, PRState: domain.PROpen, ReviewDecision: ports.ReviewApproved, Mergeability: ports.Mergeability{Mergeable: true}}, domain.PRReasonMergeReady, domain.StatusMergeable}, + {"review pending", ports.SCMFacts{Fetched: true, PRState: domain.PROpen, ReviewDecision: ports.ReviewPending}, domain.PRReasonReviewPending, domain.StatusReviewPending}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + mgr, store := newManager() + store.seed(sid, lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive)) + if err := mgr.ApplySCMObservation(context.Background(), sid, c.facts); err != nil { + t.Fatalf("apply: %v", err) + } + l := mustLoad(t, store) + if l.PR.State != domain.PROpen || l.PR.Reason != c.wantReason { + t.Errorf("pr = %v/%v, want open/%v", l.PR.State, l.PR.Reason, c.wantReason) + } + if got := domain.DeriveLegacyStatus(l); got != c.wantStatus { + t.Errorf("display = %v, want %v", got, c.wantStatus) + } + }) + } + }) + t.Run("no PR is a no-op in split A", func(t *testing.T) { mgr, store := newManager() store.seed(sid, lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive)) @@ -311,25 +355,41 @@ func TestOnSpawnCompleted(t *testing.T) { } func TestOnKillRequested(t *testing.T) { - mgr, store := newManager() - store.seed(sid, detectingLC()) - - if err := mgr.OnKillRequested(context.Background(), sid, ports.KillReason{Kind: ports.KillManual, Detail: "user"}); err != nil { - t.Fatalf("apply: %v", err) + tests := []struct { + name string + kind ports.LifecycleKillReason + wantReason domain.SessionReason + wantRuntime domain.RuntimeReason + wantDisplay domain.SessionStatus + }{ + {"manual", ports.KillManual, domain.ReasonManuallyKilled, domain.RuntimeReasonManualKillRequested, domain.StatusKilled}, + {"cleanup", ports.KillCleanup, domain.ReasonAutoCleanup, domain.RuntimeReasonAutoCleanup, domain.StatusCleanup}, + {"error", ports.KillError, domain.ReasonErrorInProcess, domain.RuntimeReasonProbeError, domain.StatusErrored}, } - l := mustLoad(t, store) - if l.Session.State != domain.SessionTerminated || l.Session.Reason != domain.ReasonManuallyKilled { - t.Errorf("session = %v/%v, want terminated/manually_killed", l.Session.State, l.Session.Reason) - } - if l.Runtime.Reason != domain.RuntimeReasonManualKillRequested { - t.Errorf("runtime reason = %v, want manual_kill_requested", l.Runtime.Reason) - } - if l.Detecting != nil { - t.Errorf("kill must clear detecting memory, got %+v", l.Detecting) - } - if got := domain.DeriveLegacyStatus(l); got != domain.StatusKilled { - t.Errorf("display = %v, want killed", got) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mgr, store := newManager() + store.seed(sid, detectingLC()) + + if err := mgr.OnKillRequested(context.Background(), sid, ports.KillReason{Kind: tt.kind, Detail: "x"}); err != nil { + t.Fatalf("apply: %v", err) + } + + l := mustLoad(t, store) + if l.Session.State != domain.SessionTerminated || l.Session.Reason != tt.wantReason { + t.Errorf("session = %v/%v, want terminated/%v", l.Session.State, l.Session.Reason, tt.wantReason) + } + if l.Runtime.Reason != tt.wantRuntime { + t.Errorf("runtime reason = %v, want %v", l.Runtime.Reason, tt.wantRuntime) + } + if l.Detecting != nil { + t.Errorf("kill must clear detecting memory, got %+v", l.Detecting) + } + if got := domain.DeriveLegacyStatus(l); got != tt.wantDisplay { + t.Errorf("display = %v, want %v", got, tt.wantDisplay) + } + }) } }