Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 64 additions & 18 deletions backend/internal/lifecycle/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
// decider, diff the result into a sparse merge-patch, persist. The LCM never
// polls and never writes the display status (that is derived on read).
//
// Split A scope is the Apply* pipeline only. The reaction table + escalation
// engine (ACT) and the Session Manager land in later splits; TickEscalations is
// a documented no-op here.
// After a transition is persisted, the Apply* paths fire the mapped reaction
// (the ACT layer: reaction table + escalation engine) via the react() chokepoint
// in reactions.go. The Session Manager lands in a later split.
package lifecycle

import (
Expand All @@ -29,15 +29,23 @@ const (
MetaAgentSessionID = "agentSessionId"
)

// Manager is the LCM. Notifier/AgentMessenger are held for the ACT lane (split
// B); the Apply* pipeline does not fire reactions yet.
// Manager is the LCM. The Apply* pipeline persists a transition and then fires
// the mapped reaction via Notifier/AgentMessenger (see reactions.go).
type Manager struct {
store ports.LifecycleStore
notifier ports.Notifier
messenger ports.AgentMessenger

recentActivityWindow time.Duration
locks keyedMutex

// trackers hold per-(session,reaction) escalation budgets (ACT policy, not
// canonical state). trackerMu guards them: react() touches them from the
// caller's goroutine, TickEscalations from the reaper's. clock is the time
// source for escalation stamping (overridable in tests).
trackers map[trackerKey]*reactionTracker
trackerMu sync.Mutex
clock func() time.Time
}

var _ ports.LifecycleManager = (*Manager)(nil)
Expand All @@ -48,6 +56,8 @@ func New(store ports.LifecycleStore, notifier ports.Notifier, messenger ports.Ag
notifier: notifier,
messenger: messenger,
recentActivityWindow: defaultRecentActivityWindow,
trackers: map[trackerKey]*reactionTracker{},
clock: time.Now,
}
}

Expand Down Expand Up @@ -100,16 +110,28 @@ func (m *Manager) withLock(id domain.SessionID, fn func() error) error {
return fn()
}

// transition is what a persisted write produced: the canonical before and after
// the patch. The ACT layer (react) derives the reaction from these. It is nil
// when the pipeline made no write.
type transition struct {
beforeLC domain.CanonicalSessionLifecycle
afterLC domain.CanonicalSessionLifecycle
}

// mutate runs the shared pipeline: load -> build patch -> persist (only if the
// patch changed something). decideFn returns the diffed patch and whether it
// touches anything; a false "changed" is a clean no-op (no write, no revision
// bump), which is how failed-probe / unknown-fact inputs are dropped.
//
// On a write it returns the transition (before/after canonical) so the caller —
// which still holds the originating facts — can fire the mapped reaction.
func (m *Manager) mutate(
ctx context.Context,
id domain.SessionID,
decideFn func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error),
) error {
return m.withLock(id, func() error {
) (*transition, error) {
var tr *transition
err := m.withLock(id, func() error {
cur, exists, err := m.store.Load(ctx, id)
if err != nil {
return err
Expand All @@ -121,8 +143,17 @@ func (m *Manager) mutate(
if !changed {
return nil
}
return m.store.PatchLifecycle(ctx, id, patch)
if err := m.store.PatchLifecycle(ctx, id, patch); err != nil {
return err
}
after, _, err := m.store.Load(ctx, id)
if err != nil {
return err
}
tr = &transition{beforeLC: cur, afterLC: after}
return nil
})
return tr, err
}

// ---- OBSERVE entrypoints ----
Expand All @@ -132,7 +163,7 @@ func (m *Manager) mutate(
// non-detecting verdict clears any stale detecting memory (#3) so the next
// probe doesn't read a phantom prior.
func (m *Manager) ApplyRuntimeObservation(ctx context.Context, id domain.SessionID, f ports.RuntimeFacts) error {
return m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) {
tr, err := m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) {
if !exists {
return ports.LifecyclePatch{}, false, nil // nothing seeded; ignore stray probe
}
Expand All @@ -158,14 +189,18 @@ func (m *Manager) ApplyRuntimeObservation(ctx context.Context, id domain.Session

return patch, changed, nil
})
if err != nil {
return err
}
return m.react(ctx, id, tr, reactionContext{})
}

// ApplySCMObservation maps PR facts onto the PR axis. A failed fetch is dropped
// (failed probe != "no PR"). An open PR writes only the PR sub-state — the
// session axis stays owned by activity, and DeriveLegacyStatus surfaces the PR
// reason for display. A terminal PR (merged/closed) also parks the session.
func (m *Manager) ApplySCMObservation(ctx context.Context, id domain.SessionID, f ports.SCMFacts) error {
return m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) {
tr, err := m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) {
if !exists || !f.Fetched {
return ports.LifecyclePatch{}, false, nil
}
Expand Down Expand Up @@ -195,6 +230,10 @@ func (m *Manager) ApplySCMObservation(ctx context.Context, id domain.SessionID,
return ports.LifecyclePatch{}, false, nil
}
})
if err != nil {
return err
}
return m.react(ctx, id, tr, reactionContext{ciFailureLogTail: f.CIFailureLogTail})
}

// ApplyActivitySignal updates the activity axis. Only a valid-confidence signal
Expand All @@ -204,7 +243,7 @@ func (m *Manager) ApplySCMObservation(ctx context.Context, id domain.SessionID,
// life, so it may resolve a detecting session — clearing the quarantine memory
// so a later probe doesn't resume counting from a stale prior.
func (m *Manager) ApplyActivitySignal(ctx context.Context, id domain.SessionID, s ports.ActivitySignal) error {
return m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) {
tr, err := m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) {
if !exists || s.State != ports.SignalValid {
return ports.LifecyclePatch{}, false, nil
}
Expand All @@ -230,6 +269,10 @@ func (m *Manager) ApplyActivitySignal(ctx context.Context, id domain.SessionID,

return patch, changed, nil
})
if err != nil {
return err
}
return m.react(ctx, id, tr, reactionContext{})
}

// ---- mutation outcomes reported by the Session Manager ----
Expand Down Expand Up @@ -270,7 +313,9 @@ func (m *Manager) OnSpawnCompleted(ctx context.Context, id domain.SessionID, o p
// the terminal session/runtime sub-states for the kill kind and clears any
// in-flight detecting memory.
func (m *Manager) OnKillRequested(ctx context.Context, id domain.SessionID, r ports.KillReason) error {
return m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) {
// An explicit user kill is a human action, not an inferred event, so it
// fires no reaction — the transition is discarded.
_, err := m.mutate(ctx, id, func(cur domain.CanonicalSessionLifecycle, exists bool) (ports.LifecyclePatch, bool, error) {
Comment thread
harshitsinghbhandari marked this conversation as resolved.
if !exists {
// Killing an unknown/already-gone session is a benign race; no-op
// rather than fabricating a terminal record for a session we never
Expand All @@ -295,12 +340,13 @@ func (m *Manager) OnKillRequested(ctx context.Context, id domain.SessionID, r po
}
return patch, changed, nil
})
}

// TickEscalations is a no-op in split A. The reaper will call this to fire
// duration-based escalations the synchronous LCM can't wake itself for, but the
// reaction table + escalation engine that back it land in split B.
func (m *Manager) TickEscalations(ctx context.Context, now time.Time) error {
if err != nil {
return err
}
// A kill is terminal but bypasses react()'s incident-over cleanup (it fires
// no reaction). Drop any escalation trackers here so a later duration-based
// TickEscalations can't emit reaction.escalated for a dead session.
m.clearSessionTrackers(id)
return nil
}

Expand Down
11 changes: 0 additions & 11 deletions backend/internal/lifecycle/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,17 +414,6 @@ func TestOnKillRequested_UnseededIsNoOp(t *testing.T) {
}
}

func TestTickEscalationsIsNoOp(t *testing.T) {
mgr, store := newManager()
store.seed(sid, lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.RuntimeAlive))
if err := mgr.TickEscalations(context.Background(), t0); err != nil {
t.Fatalf("tick: %v", err)
}
if l := mustLoad(t, store); l.Revision != 0 {
t.Errorf("TickEscalations must not write, got revision=%d", l.Revision)
}
}

// ---- fake store contract ----

func TestFakeStoreExpectedRevision(t *testing.T) {
Expand Down
Loading
Loading