From 538210844edb167220a038a8c031a71603ed56f9 Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Wed, 27 May 2026 13:43:40 +0530 Subject: [PATCH 1/3] feat(session): implement Session Manager (spawn/kill/list/get/send/restore/cleanup) Implements ports.SessionManager against fakes for the outbound ports. The SM is the explicit-mutation half of the lane: it drives Runtime/Agent/Workspace, seeds the initial lifecycle, and routes outcomes to the LCM (OnSpawnCompleted / OnKillRequested). It never derives observed state and is the single producer of the derived display status (attached on read, never persisted). - Spawn: Workspace.Create -> Runtime.Create (AO_* identity env) -> Seed -> OnSpawnCompleted, with eager rollback of completed steps on failure. - Kill: OnKillRequested first -> Runtime.Destroy -> Workspace.Destroy, honoring the worktree-remove safety (refusal surfaced, never forced). - List/Get: derive status via DeriveLegacyStatus. Send: via AgentMessenger. Restore: re-seed (reopen) + relaunch via GetRestoreCommand. Cleanup: reclaim terminal sessions, skip worktrees holding uncommitted work. Store-contract additions (co-owned with Tom's persistence layer, flagged for review): LifecycleStore.Seed (explicit create-with-identity; OnSpawnCompleted requires a seeded record) and LifecycleStore.Get (single record-with-identity read; Load is lifecycle-only). Lifecycle test fake updated to satisfy both. Tests route through the real LCM Manager (wrapped to record call order). Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/internal/lifecycle/fakes_test.go | 24 ++ backend/internal/ports/outbound.go | 18 + backend/internal/session/fakes_test.go | 399 ++++++++++++++++++++++ backend/internal/session/manager.go | 401 +++++++++++++++++++++++ backend/internal/session/manager_test.go | 388 ++++++++++++++++++++++ 5 files changed, 1230 insertions(+) create mode 100644 backend/internal/session/fakes_test.go create mode 100644 backend/internal/session/manager.go create mode 100644 backend/internal/session/manager_test.go diff --git a/backend/internal/lifecycle/fakes_test.go b/backend/internal/lifecycle/fakes_test.go index 904693aa..cc47ad84 100644 --- a/backend/internal/lifecycle/fakes_test.go +++ b/backend/internal/lifecycle/fakes_test.go @@ -90,6 +90,30 @@ func (s *fakeStore) PatchLifecycle(_ context.Context, id domain.SessionID, p por return nil } +func (s *fakeStore) Seed(_ context.Context, rec domain.SessionRecord) error { + s.mu.Lock() + defer s.mu.Unlock() + if _, ok := s.records[rec.ID]; ok { + return fmt.Errorf("seed: session %s already exists", rec.ID) + } + if rec.Lifecycle.Version == 0 { + rec.Lifecycle.Version = domain.LifecycleVersion + } + r := rec + s.records[rec.ID] = &r + return nil +} + +func (s *fakeStore) Get(_ context.Context, id domain.SessionID) (domain.SessionRecord, bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + rec, ok := s.records[id] + if !ok { + return domain.SessionRecord{}, false, nil + } + return *rec, true, nil +} + func (s *fakeStore) List(_ context.Context, project domain.ProjectID) ([]domain.SessionRecord, error) { s.mu.Lock() defer s.mu.Unlock() diff --git a/backend/internal/ports/outbound.go b/backend/internal/ports/outbound.go index 7a3649ae..a9c03e22 100644 --- a/backend/internal/ports/outbound.go +++ b/backend/internal/ports/outbound.go @@ -12,12 +12,30 @@ import ( // // List returns persistence records (no derived status); the Session Manager // turns those into domain.Session by attaching the derived display status. +// +// Seed and Get are the two record-with-identity methods the Session Manager +// needs that the LCM does not: Load returns lifecycle only (all the decider +// needs), so the SM read-model and explicit-create path would otherwise have no +// way to write or read a record's identity (ID/ProjectID/IssueID/Kind/CreatedAt) +// by id. (Co-owned with Tom's persistence layer — added here to close that gap.) type LifecycleStore interface { Load(ctx context.Context, id domain.SessionID) (domain.CanonicalSessionLifecycle, bool, error) PatchLifecycle(ctx context.Context, id domain.SessionID, patch LifecyclePatch) error List(ctx context.Context, project domain.ProjectID) ([]domain.SessionRecord, error) GetMetadata(ctx context.Context, id domain.SessionID) (map[string]string, error) PatchMetadata(ctx context.Context, id domain.SessionID, kv map[string]string) error + + // Seed creates a new record with its identity and initial lifecycle. It is + // the SM's explicit-create path (the LCM only ever patches existing records); + // OnSpawnCompleted requires a seeded record, so Spawn calls this first. It + // must reject a seed for an id that already exists rather than overwrite — + // re-seeding an existing session (e.g. Restore) goes through PatchLifecycle. + Seed(ctx context.Context, rec domain.SessionRecord) error + + // Get returns a single full record (with identity) by id. Load is + // lifecycle-only, so the SM uses this to build the read-model and to + // reconstruct teardown handles for Kill/Restore on one id. + Get(ctx context.Context, id domain.SessionID) (domain.SessionRecord, bool, error) } // LifecyclePatch is a sparse merge-patch: a nil field is left untouched, a diff --git a/backend/internal/session/fakes_test.go b/backend/internal/session/fakes_test.go new file mode 100644 index 00000000..d8e4b248 --- /dev/null +++ b/backend/internal/session/fakes_test.go @@ -0,0 +1,399 @@ +package session + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/lifecycle" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +// callLog records the cross-fake call order so tests can assert pipeline +// sequencing (e.g. OnKillRequested before Runtime.Destroy before Workspace.Destroy). +type callLog struct { + mu sync.Mutex + calls []string +} + +func (c *callLog) add(s string) { + c.mu.Lock() + defer c.mu.Unlock() + c.calls = append(c.calls, s) +} + +func (c *callLog) snapshot() []string { + c.mu.Lock() + defer c.mu.Unlock() + out := make([]string, len(c.calls)) + copy(out, c.calls) + return out +} + +// indexOf returns the position of the first call equal to name, or -1. +func (c *callLog) indexOf(name string) int { + for i, s := range c.snapshot() { + if s == name { + return i + } + } + return -1 +} + +// ---- fakeStore: in-memory LifecycleStore with faithful merge-patch + Seed/Get ---- + +type fakeStore struct { + mu sync.Mutex + records map[domain.SessionID]*domain.SessionRecord + metadata map[domain.SessionID]map[string]string +} + +var _ ports.LifecycleStore = (*fakeStore)(nil) + +func newFakeStore() *fakeStore { + return &fakeStore{ + records: map[domain.SessionID]*domain.SessionRecord{}, + metadata: map[domain.SessionID]map[string]string{}, + } +} + +func (s *fakeStore) Seed(_ context.Context, rec domain.SessionRecord) error { + s.mu.Lock() + defer s.mu.Unlock() + if _, ok := s.records[rec.ID]; ok { + return fmt.Errorf("seed: session %s already exists", rec.ID) + } + if rec.Lifecycle.Version == 0 { + rec.Lifecycle.Version = domain.LifecycleVersion + } + r := rec + s.records[rec.ID] = &r + return nil +} + +func (s *fakeStore) Get(_ context.Context, id domain.SessionID) (domain.SessionRecord, bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + rec, ok := s.records[id] + if !ok { + return domain.SessionRecord{}, false, nil + } + return s.withMetadata(*rec), true, nil +} + +func (s *fakeStore) Load(_ context.Context, id domain.SessionID) (domain.CanonicalSessionLifecycle, bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + rec, ok := s.records[id] + if !ok { + return domain.CanonicalSessionLifecycle{}, false, nil + } + return rec.Lifecycle, true, nil +} + +func (s *fakeStore) PatchLifecycle(_ context.Context, id domain.SessionID, p ports.LifecyclePatch) error { + s.mu.Lock() + defer s.mu.Unlock() + + rec, ok := s.records[id] + if !ok { + rec = &domain.SessionRecord{ID: id, Lifecycle: domain.CanonicalSessionLifecycle{Version: domain.LifecycleVersion}} + s.records[id] = rec + } + l := &rec.Lifecycle + + if p.ExpectedRevision != nil && *p.ExpectedRevision != l.Revision { + return fmt.Errorf("revision mismatch for %s: have %d, expected %d", id, l.Revision, *p.ExpectedRevision) + } + + if p.Session != nil { + l.Session = *p.Session + } + if p.PR != nil { + l.PR = *p.PR + } + if p.Runtime != nil { + l.Runtime = *p.Runtime + } + if p.Activity != nil { + l.Activity = *p.Activity + } + switch { + case p.ClearDetecting: + l.Detecting = nil + case p.Detecting != nil: + d := *p.Detecting + l.Detecting = &d + } + + l.Version = domain.LifecycleVersion + l.Revision++ + rec.UpdatedAt = time.Now() + return nil +} + +func (s *fakeStore) List(_ context.Context, project domain.ProjectID) ([]domain.SessionRecord, error) { + s.mu.Lock() + defer s.mu.Unlock() + var out []domain.SessionRecord + for _, rec := range s.records { + if rec.ProjectID == project { + out = append(out, s.withMetadata(*rec)) + } + } + return out, nil +} + +func (s *fakeStore) GetMetadata(_ context.Context, id domain.SessionID) (map[string]string, error) { + s.mu.Lock() + defer s.mu.Unlock() + return cloneMap(s.metadata[id]), nil +} + +func (s *fakeStore) PatchMetadata(_ context.Context, id domain.SessionID, kv map[string]string) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.metadata[id] == nil { + s.metadata[id] = map[string]string{} + } + for k, v := range kv { + s.metadata[id][k] = v + } + return nil +} + +// withMetadata attaches the separately-stored metadata to a record copy (a real +// store would return them together). Caller holds s.mu. +func (s *fakeStore) withMetadata(rec domain.SessionRecord) domain.SessionRecord { + if md := s.metadata[rec.ID]; len(md) > 0 { + rec.Metadata = cloneMap(md) + } + return rec +} + +// ---- fakeRuntime ---- + +type fakeRuntime struct { + log *callLog + createErr error + alive bool + + created []ports.RuntimeConfig + destroyed []ports.RuntimeHandle + sent []string +} + +var _ ports.Runtime = (*fakeRuntime)(nil) + +func (r *fakeRuntime) Create(_ context.Context, cfg ports.RuntimeConfig) (ports.RuntimeHandle, error) { + r.log.add("Runtime.Create") + if r.createErr != nil { + return ports.RuntimeHandle{}, r.createErr + } + r.created = append(r.created, cfg) + return ports.RuntimeHandle{ID: "rt-" + string(cfg.SessionID), RuntimeName: "tmux"}, nil +} + +func (r *fakeRuntime) Destroy(_ context.Context, h ports.RuntimeHandle) error { + r.log.add("Runtime.Destroy") + r.destroyed = append(r.destroyed, h) + return nil +} + +func (r *fakeRuntime) SendMessage(_ context.Context, _ ports.RuntimeHandle, message string) error { + r.sent = append(r.sent, message) + return nil +} + +func (r *fakeRuntime) GetOutput(_ context.Context, _ ports.RuntimeHandle, _ int) (string, error) { + return "", nil +} + +func (r *fakeRuntime) IsAlive(_ context.Context, _ ports.RuntimeHandle) (bool, error) { + return r.alive, nil +} + +// ---- fakeAgent ---- + +type fakeAgent struct { + env map[string]string +} + +var _ ports.Agent = (*fakeAgent)(nil) + +func (a *fakeAgent) GetLaunchCommand(_ ports.AgentConfig) string { return "claude" } + +func (a *fakeAgent) GetEnvironment(_ ports.AgentConfig) map[string]string { return cloneMap(a.env) } + +func (a *fakeAgent) ProbeProcess(_ context.Context, _ ports.RuntimeHandle) (ports.ProcessProbe, error) { + return ports.ProcessProbeAlive, nil +} + +func (a *fakeAgent) GetRestoreCommand(agentSessionID string) string { + return "claude --resume " + agentSessionID +} + +// ---- fakeWorkspace (with worktree-remove refusal mode) ---- + +type fakeWorkspace struct { + log *callLog + createErr error + refuse map[string]bool // path -> still registered after prune (uncommitted work) + created []ports.WorkspaceConfig + destroyed []ports.WorkspaceInfo + restoredID []domain.SessionID +} + +var _ ports.Workspace = (*fakeWorkspace)(nil) + +func (w *fakeWorkspace) Create(_ context.Context, cfg ports.WorkspaceConfig) (ports.WorkspaceInfo, error) { + w.log.add("Workspace.Create") + if w.createErr != nil { + return ports.WorkspaceInfo{}, w.createErr + } + w.created = append(w.created, cfg) + return workspaceFor(cfg), nil +} + +func (w *fakeWorkspace) Destroy(_ context.Context, info ports.WorkspaceInfo) error { + w.log.add("Workspace.Destroy") + if w.refuse[info.Path] { + // Worktree-remove safety: after `git worktree prune` the path is still + // registered, so it may hold the agent's uncommitted work — refuse. + return fmt.Errorf("workspace: refusing to rm -rf %s: still registered after prune", info.Path) + } + w.destroyed = append(w.destroyed, info) + return nil +} + +func (w *fakeWorkspace) List(_ context.Context, _ domain.ProjectID) ([]ports.WorkspaceInfo, error) { + return nil, nil +} + +func (w *fakeWorkspace) Restore(_ context.Context, cfg ports.WorkspaceConfig) (ports.WorkspaceInfo, error) { + w.log.add("Workspace.Restore") + w.restoredID = append(w.restoredID, cfg.SessionID) + return workspaceFor(cfg), nil +} + +func workspaceFor(cfg ports.WorkspaceConfig) ports.WorkspaceInfo { + return ports.WorkspaceInfo{ + Path: "/tmp/ws/" + string(cfg.SessionID), + Branch: cfg.Branch, + SessionID: cfg.SessionID, + ProjectID: cfg.ProjectID, + } +} + +// ---- recordingMessenger ---- + +type recordingMessenger struct { + sent []struct { + ID domain.SessionID + Message string + } +} + +var _ ports.AgentMessenger = (*recordingMessenger)(nil) + +func (m *recordingMessenger) Send(_ context.Context, id domain.SessionID, message string) error { + m.sent = append(m.sent, struct { + ID domain.SessionID + Message string + }{id, message}) + return nil +} + +// ---- noopNotifier ---- + +type noopNotifier struct{} + +var _ ports.Notifier = (*noopNotifier)(nil) + +func (noopNotifier) Notify(_ context.Context, _ ports.OrchestratorEvent) error { return nil } + +// ---- recordingLCM: wraps the REAL lifecycle.Manager and logs SM-facing calls ---- + +type recordingLCM struct { + log *callLog + inner ports.LifecycleManager +} + +var _ ports.LifecycleManager = (*recordingLCM)(nil) + +func (l *recordingLCM) OnSpawnCompleted(ctx context.Context, id domain.SessionID, o ports.SpawnOutcome) error { + l.log.add("OnSpawnCompleted") + return l.inner.OnSpawnCompleted(ctx, id, o) +} + +func (l *recordingLCM) OnKillRequested(ctx context.Context, id domain.SessionID, r ports.KillReason) error { + l.log.add("OnKillRequested") + return l.inner.OnKillRequested(ctx, id, r) +} + +func (l *recordingLCM) ApplySCMObservation(ctx context.Context, id domain.SessionID, f ports.SCMFacts) error { + return l.inner.ApplySCMObservation(ctx, id, f) +} + +func (l *recordingLCM) ApplyRuntimeObservation(ctx context.Context, id domain.SessionID, f ports.RuntimeFacts) error { + return l.inner.ApplyRuntimeObservation(ctx, id, f) +} + +func (l *recordingLCM) ApplyActivitySignal(ctx context.Context, id domain.SessionID, s ports.ActivitySignal) error { + return l.inner.ApplyActivitySignal(ctx, id, s) +} + +func (l *recordingLCM) TickEscalations(ctx context.Context, now time.Time) error { + return l.inner.TickEscalations(ctx, now) +} + +// ---- harness: wires the SM against the fakes + the real LCM ---- + +type harness struct { + sm *Manager + store *fakeStore + runtime *fakeRuntime + agent *fakeAgent + workspace *fakeWorkspace + messenger *recordingMessenger + log *callLog +} + +var fixedTime = time.Date(2026, 5, 27, 12, 0, 0, 0, time.UTC) + +func newHarness(id domain.SessionID) *harness { + log := &callLog{} + store := newFakeStore() + rt := &fakeRuntime{log: log, alive: true} + ag := &fakeAgent{env: map[string]string{"BASE": "1"}} + ws := &fakeWorkspace{log: log, refuse: map[string]bool{}} + msg := &recordingMessenger{} + + lcm := &recordingLCM{log: log, inner: lifecycle.New(store, noopNotifier{}, msg)} + + sm := New(Deps{ + Runtime: rt, + Agent: ag, + Workspace: ws, + Store: store, + Messenger: msg, + Lifecycle: lcm, + Clock: func() time.Time { return fixedTime }, + NewID: func(ports.SpawnConfig) domain.SessionID { return id }, + }) + + return &harness{sm: sm, store: store, runtime: rt, agent: ag, workspace: ws, messenger: msg, log: log} +} + +func cloneMap(in map[string]string) map[string]string { + if in == nil { + return nil + } + out := make(map[string]string, len(in)) + for k, v := range in { + out[k] = v + } + return out +} diff --git a/backend/internal/session/manager.go b/backend/internal/session/manager.go new file mode 100644 index 00000000..61bb9274 --- /dev/null +++ b/backend/internal/session/manager.go @@ -0,0 +1,401 @@ +// Package session implements ports.SessionManager: the explicit-mutation half +// of the lane. The SM is impure plumbing — it drives the Runtime/Agent/Workspace +// plugins to create and tear down sessions, seeds the initial lifecycle record, +// and routes mutation outcomes to the LCM (OnSpawnCompleted / OnKillRequested). +// +// It NEVER derives or observes lifecycle state: observed transitions are the +// LCM's job. The SM's only canonical writes are the explicit ones — seeding a +// new record on Spawn and re-seeding (reopening) on Restore — and it is the +// single producer of the derived display status, attached on read in List/Get +// and never persisted. +package session + +import ( + "context" + "crypto/rand" + "encoding/hex" + "errors" + "fmt" + "strconv" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/lifecycle" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +// ErrNotFound is returned by Get/Restore when no record exists for the id. +var ErrNotFound = errors.New("session: not found") + +// Env vars a spawned process reads to learn who it is (distillation §5.4). +const ( + EnvSessionID = "AO_SESSION_ID" + EnvProjectID = "AO_PROJECT_ID" + EnvIssueID = "AO_ISSUE_ID" +) + +// Manager implements ports.SessionManager against the outbound ports. Every +// dependency is an interface so the SM runs entirely against fakes in tests. +type Manager struct { + runtime ports.Runtime + agent ports.Agent + workspace ports.Workspace + store ports.LifecycleStore + messenger ports.AgentMessenger + lcm ports.LifecycleManager + + clock func() time.Time + newID func(ports.SpawnConfig) domain.SessionID +} + +var _ ports.SessionManager = (*Manager)(nil) + +// Deps groups the SM's collaborators. Clock and NewID are optional (defaulted) +// so production wiring only supplies the ports. +type Deps struct { + Runtime ports.Runtime + Agent ports.Agent + Workspace ports.Workspace + Store ports.LifecycleStore + Messenger ports.AgentMessenger + Lifecycle ports.LifecycleManager + + Clock func() time.Time + NewID func(ports.SpawnConfig) domain.SessionID +} + +func New(d Deps) *Manager { + m := &Manager{ + runtime: d.Runtime, + agent: d.Agent, + workspace: d.Workspace, + store: d.Store, + messenger: d.Messenger, + lcm: d.Lifecycle, + clock: d.Clock, + newID: d.NewID, + } + if m.clock == nil { + m.clock = time.Now + } + if m.newID == nil { + m.newID = defaultNewID + } + return m +} + +// ---- Spawn ---- + +// Spawn runs the create pipeline in spec order: workspace -> runtime -> seed -> +// report to the LCM. The record is seeded LATE (after the runtime is up), so a +// failure before the seed leaves no record for Cleanup to reclaim — hence each +// step eagerly rolls back the steps that already succeeded. +func (m *Manager) Spawn(ctx context.Context, cfg ports.SpawnConfig) (domain.Session, error) { + id := m.newID(cfg) + + ws, err := m.workspace.Create(ctx, ports.WorkspaceConfig{ + ProjectID: cfg.ProjectID, + SessionID: id, + Branch: cfg.Branch, + }) + if err != nil { + return domain.Session{}, fmt.Errorf("spawn %s: workspace create: %w", id, err) + } + + agentCfg := ports.AgentConfig{SessionID: id, WorkspacePath: ws.Path, Prompt: buildPrompt(cfg)} + handle, err := m.runtime.Create(ctx, ports.RuntimeConfig{ + SessionID: id, + WorkspacePath: ws.Path, + LaunchCommand: m.agent.GetLaunchCommand(agentCfg), + Env: spawnEnv(m.agent.GetEnvironment(agentCfg), id, cfg.ProjectID, cfg.IssueID), + }) + if err != nil { + m.rollbackWorkspace(ctx, ws) // nothing seeded yet + return domain.Session{}, fmt.Errorf("spawn %s: runtime create: %w", id, err) + } + + if err := m.store.Seed(ctx, seedRecord(id, cfg, m.clock())); err != nil { + m.rollbackRuntime(ctx, handle) + m.rollbackWorkspace(ctx, ws) + return domain.Session{}, fmt.Errorf("spawn %s: seed: %w", id, err) + } + + outcome := ports.SpawnOutcome{Branch: ws.Branch, WorkspacePath: ws.Path, RuntimeHandle: handle} + if err := m.lcm.OnSpawnCompleted(ctx, id, outcome); err != nil { + // The record is seeded but the runtime/workspace are about to be torn + // down. The store has no delete, so route the orphan to a terminal + // errored state (best effort) rather than strand a phantom "spawning". + _ = m.lcm.OnKillRequested(ctx, id, ports.KillReason{Kind: ports.KillError, Detail: "spawn completion failed"}) + m.rollbackRuntime(ctx, handle) + m.rollbackWorkspace(ctx, ws) + return domain.Session{}, fmt.Errorf("spawn %s: on spawn completed: %w", id, err) + } + + return m.Get(ctx, id) +} + +// rollback* are best-effort: the caller already has the originating failure, and +// there is no logger at this layer, so a secondary teardown error is dropped +// rather than masking the real cause. +func (m *Manager) rollbackWorkspace(ctx context.Context, ws ports.WorkspaceInfo) { + _ = m.workspace.Destroy(ctx, ws) +} + +func (m *Manager) rollbackRuntime(ctx context.Context, h ports.RuntimeHandle) { + _ = m.runtime.Destroy(ctx, h) +} + +// ---- Kill ---- + +// Kill records terminal intent with the LCM FIRST, then tears down the runtime +// and workspace. There is no separate Agent stop: the agent runs inside the +// runtime, so Runtime.Destroy stops it. The workspace teardown honors the +// worktree-remove safety — a refusal (path still registered after prune, so it +// may hold uncommitted work) surfaces as an error with WorkspaceFreed=false and +// is never forced. +func (m *Manager) Kill(ctx context.Context, id domain.SessionID, opts ports.KillOptions) (ports.KillResult, error) { + rec, ok, err := m.store.Get(ctx, id) + if err != nil { + return ports.KillResult{ID: id}, fmt.Errorf("kill %s: %w", id, err) + } + if !ok { + // Already gone: benign race, mirrors LCM.OnKillRequested's no-op. + return ports.KillResult{ID: id}, nil + } + meta, err := m.store.GetMetadata(ctx, id) + if err != nil { + return ports.KillResult{ID: id}, fmt.Errorf("kill %s: metadata: %w", id, err) + } + + if err := m.lcm.OnKillRequested(ctx, id, ports.KillReason{Kind: opts.Reason, Detail: opts.Detail}); err != nil { + return ports.KillResult{ID: id}, fmt.Errorf("kill %s: on kill requested: %w", id, err) + } + if err := m.runtime.Destroy(ctx, runtimeHandle(meta)); err != nil { + return ports.KillResult{ID: id}, fmt.Errorf("kill %s: runtime destroy: %w", id, err) + } + if err := m.workspace.Destroy(ctx, workspaceInfo(rec, meta)); err != nil { + return ports.KillResult{ID: id, WorkspaceFreed: false}, fmt.Errorf("kill %s: workspace destroy: %w", id, err) + } + return ports.KillResult{ID: id, WorkspaceFreed: true}, nil +} + +// ---- read-model ---- + +// List builds the read-model for a project: stored records with the display +// status derived on read. The SM is the single producer of that status. +func (m *Manager) List(ctx context.Context, project domain.ProjectID) ([]domain.Session, error) { + recs, err := m.store.List(ctx, project) + if err != nil { + return nil, fmt.Errorf("list %s: %w", project, err) + } + out := make([]domain.Session, 0, len(recs)) + for _, rec := range recs { + out = append(out, toSession(rec)) + } + return out, nil +} + +func (m *Manager) Get(ctx context.Context, id domain.SessionID) (domain.Session, error) { + rec, ok, err := m.store.Get(ctx, id) + if err != nil { + return domain.Session{}, fmt.Errorf("get %s: %w", id, err) + } + if !ok { + return domain.Session{}, fmt.Errorf("get %s: %w", id, ErrNotFound) + } + return toSession(rec), nil +} + +// ---- Send ---- + +// Send routes a message to the running agent through the AgentMessenger, which +// busy-detects and verifies delivery. +func (m *Manager) Send(ctx context.Context, id domain.SessionID, message string) error { + if err := m.messenger.Send(ctx, id, message); err != nil { + return fmt.Errorf("send %s: %w", id, err) + } + return nil +} + +// ---- Restore ---- + +// Restore relaunches a previously torn-down session in its workspace. The +// fallible I/O (workspace restore + runtime create) runs first so a failure +// touches no canonical state and never destroys the worktree (it may hold the +// agent's prior work). Only once the runtime is up do we reopen the lifecycle: +// resetting a terminal session is an explicit mutation (the SM's authority; the +// LCM's observe path would never resurrect a terminal session), and the PR axis +// is cleared. OnSpawnCompleted then flips the runtime to alive. +func (m *Manager) Restore(ctx context.Context, id domain.SessionID) (domain.Session, error) { + rec, ok, err := m.store.Get(ctx, id) + if err != nil { + return domain.Session{}, fmt.Errorf("restore %s: %w", id, err) + } + if !ok { + return domain.Session{}, fmt.Errorf("restore %s: %w", id, ErrNotFound) + } + meta, err := m.store.GetMetadata(ctx, id) + if err != nil { + return domain.Session{}, fmt.Errorf("restore %s: metadata: %w", id, err) + } + + ws, err := m.workspace.Restore(ctx, ports.WorkspaceConfig{ + ProjectID: rec.ProjectID, + SessionID: id, + Branch: meta[lifecycle.MetaBranch], + }) + if err != nil { + return domain.Session{}, fmt.Errorf("restore %s: workspace restore: %w", id, err) + } + + agentCfg := ports.AgentConfig{SessionID: id, WorkspacePath: ws.Path} + handle, err := m.runtime.Create(ctx, ports.RuntimeConfig{ + SessionID: id, + WorkspacePath: ws.Path, + LaunchCommand: m.agent.GetRestoreCommand(meta[lifecycle.MetaAgentSessionID]), + Env: spawnEnv(m.agent.GetEnvironment(agentCfg), id, rec.ProjectID, rec.IssueID), + }) + if err != nil { + return domain.Session{}, fmt.Errorf("restore %s: runtime create: %w", id, err) + } + + reopen := ports.LifecyclePatch{ + Session: &domain.SessionSubstate{State: domain.SessionNotStarted, Reason: domain.ReasonSpawnRequested}, + PR: &domain.PRSubstate{State: domain.PRNone, Reason: domain.PRReasonClearedOnRestore}, + } + if err := m.store.PatchLifecycle(ctx, id, reopen); err != nil { + return domain.Session{}, fmt.Errorf("restore %s: reopen: %w", id, err) + } + + outcome := ports.SpawnOutcome{ + Branch: ws.Branch, + WorkspacePath: ws.Path, + RuntimeHandle: handle, + AgentSessionID: meta[lifecycle.MetaAgentSessionID], + } + if err := m.lcm.OnSpawnCompleted(ctx, id, outcome); err != nil { + return domain.Session{}, fmt.Errorf("restore %s: on spawn completed: %w", id, err) + } + return m.Get(ctx, id) +} + +// ---- Cleanup ---- + +// Cleanup reclaims the workspaces of terminal sessions in a project. A workspace +// whose teardown is refused by the worktree-remove safety (uncommitted work) is +// skipped, never forced. Runtime teardown is best-effort (a terminal session's +// runtime is usually already gone); the workspace result decides cleaned/skipped. +func (m *Manager) Cleanup(ctx context.Context, project domain.ProjectID) (ports.CleanupResult, error) { + recs, err := m.store.List(ctx, project) + if err != nil { + return ports.CleanupResult{}, fmt.Errorf("cleanup %s: %w", project, err) + } + var res ports.CleanupResult + for _, rec := range recs { + if !isTerminalSession(rec.Lifecycle.Session.State) { + continue + } + meta, err := m.store.GetMetadata(ctx, rec.ID) + if err != nil { + return res, fmt.Errorf("cleanup %s: metadata %s: %w", project, rec.ID, err) + } + _ = m.runtime.Destroy(ctx, runtimeHandle(meta)) // best effort; usually already gone + if err := m.workspace.Destroy(ctx, workspaceInfo(rec, meta)); err != nil { + res.Skipped = append(res.Skipped, rec.ID) + continue + } + res.Cleaned = append(res.Cleaned, rec.ID) + } + return res, nil +} + +// ---- helpers ---- + +func toSession(rec domain.SessionRecord) domain.Session { + return domain.Session{SessionRecord: rec, Status: domain.DeriveLegacyStatus(rec.Lifecycle)} +} + +func isTerminalSession(s domain.SessionState) bool { + return s == domain.SessionDone || s == domain.SessionTerminated +} + +// buildPrompt assembles the spawn prompt from the explicit config only; the full +// 3-layer assembly (base protocol + config-derived + user rules) lands later. +func buildPrompt(cfg ports.SpawnConfig) string { + switch { + case cfg.AgentRules == "": + return cfg.Prompt + case cfg.Prompt == "": + return cfg.AgentRules + default: + return cfg.Prompt + "\n\n" + cfg.AgentRules + } +} + +// spawnEnv overlays the AO_* identity vars onto the agent's environment without +// mutating the map the agent returned. +func spawnEnv(base map[string]string, id domain.SessionID, project domain.ProjectID, issue domain.IssueID) map[string]string { + env := make(map[string]string, len(base)+3) + for k, v := range base { + env[k] = v + } + env[EnvSessionID] = string(id) + env[EnvProjectID] = string(project) + env[EnvIssueID] = string(issue) + return env +} + +func seedRecord(id domain.SessionID, cfg ports.SpawnConfig, now time.Time) domain.SessionRecord { + return domain.SessionRecord{ + ID: id, + ProjectID: cfg.ProjectID, + IssueID: cfg.IssueID, + Kind: cfg.Kind, + CreatedAt: now, + UpdatedAt: now, + Lifecycle: domain.CanonicalSessionLifecycle{ + Version: domain.LifecycleVersion, + Session: domain.SessionSubstate{State: domain.SessionNotStarted, Reason: domain.ReasonSpawnRequested}, + Runtime: domain.RuntimeSubstate{State: domain.RuntimeUnknown, Reason: domain.RuntimeReasonSpawnIncomplete}, + PR: domain.PRSubstate{State: domain.PRNone, Reason: domain.PRReasonNotCreated}, + }, + } +} + +// runtimeHandle / workspaceInfo reconstruct teardown handles from the metadata +// the LCM persisted in OnSpawnCompleted (the metadata-key contract is shared +// with the lifecycle package). +func runtimeHandle(meta map[string]string) ports.RuntimeHandle { + return ports.RuntimeHandle{ + ID: meta[lifecycle.MetaRuntimeHandleID], + RuntimeName: meta[lifecycle.MetaRuntimeName], + } +} + +func workspaceInfo(rec domain.SessionRecord, meta map[string]string) ports.WorkspaceInfo { + return ports.WorkspaceInfo{ + Path: meta[lifecycle.MetaWorkspacePath], + Branch: meta[lifecycle.MetaBranch], + SessionID: rec.ID, + ProjectID: rec.ProjectID, + } +} + +func defaultNewID(cfg ports.SpawnConfig) domain.SessionID { + base := string(cfg.IssueID) + if base == "" { + base = string(cfg.Kind) + } + if base == "" { + base = "session" + } + return domain.SessionID(base + "-" + randHex(4)) +} + +func randHex(n int) string { + b := make([]byte, n) + if _, err := rand.Read(b); err != nil { + return strconv.FormatInt(time.Now().UnixNano(), 16) + } + return hex.EncodeToString(b) +} diff --git a/backend/internal/session/manager_test.go b/backend/internal/session/manager_test.go new file mode 100644 index 00000000..e28eded1 --- /dev/null +++ b/backend/internal/session/manager_test.go @@ -0,0 +1,388 @@ +package session + +import ( + "context" + "errors" + "testing" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/lifecycle" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +const ( + testProject = domain.ProjectID("proj") + testIssue = domain.IssueID("42") +) + +func spawnCfg() ports.SpawnConfig { + return ports.SpawnConfig{ + ProjectID: testProject, + IssueID: testIssue, + Kind: domain.KindWorker, + Branch: "feat/42", + Prompt: "do the thing", + AgentRules: "be careful", + } +} + +func TestSpawn_HappyPath(t *testing.T) { + h := newHarness("sess-1") + ctx := context.Background() + + sess, err := h.sm.Spawn(ctx, spawnCfg()) + if err != nil { + t.Fatalf("spawn: %v", err) + } + + // Display status is derived (single producer) — a freshly spawned, not_started + // session shows as spawning. + if sess.Status != domain.StatusSpawning { + t.Errorf("status = %q, want %q", sess.Status, domain.StatusSpawning) + } + + // Record seeded with identity + initial lifecycle, then OnSpawnCompleted flipped + // the runtime axis to alive. + rec, ok, err := h.store.Get(ctx, "sess-1") + if err != nil || !ok { + t.Fatalf("get seeded record: ok=%v err=%v", ok, err) + } + if rec.ProjectID != testProject || rec.IssueID != testIssue || rec.Kind != domain.KindWorker { + t.Errorf("identity = %+v, want proj/42/worker", rec) + } + if !rec.CreatedAt.Equal(fixedTime) { + t.Errorf("createdAt = %v, want %v", rec.CreatedAt, fixedTime) + } + if got := rec.Lifecycle.Session; got.State != domain.SessionNotStarted || got.Reason != domain.ReasonSpawnRequested { + t.Errorf("session substate = %+v, want not_started/spawn_requested", got) + } + if got := rec.Lifecycle.Runtime; got.State != domain.RuntimeAlive || got.Reason != domain.RuntimeReasonProcessRunning { + t.Errorf("runtime substate = %+v, want alive/process_running", got) + } + + // Pipeline order: workspace -> runtime -> (seed) -> LCM. + wantOrder := []string{"Workspace.Create", "Runtime.Create", "OnSpawnCompleted"} + if got := h.log.snapshot(); !equalStrings(got, wantOrder) { + t.Errorf("call order = %v, want %v", got, wantOrder) + } + + // Identity env wired onto the runtime config, layered over the agent's env. + if len(h.runtime.created) != 1 { + t.Fatalf("runtime.created = %d, want 1", len(h.runtime.created)) + } + env := h.runtime.created[0].Env + for k, want := range map[string]string{ + EnvSessionID: "sess-1", + EnvProjectID: "proj", + EnvIssueID: "42", + "BASE": "1", + } { + if env[k] != want { + t.Errorf("env[%q] = %q, want %q", k, env[k], want) + } + } + + // Handles persisted to metadata for later teardown/restore. + meta, _ := h.store.GetMetadata(ctx, "sess-1") + for k, want := range map[string]string{ + lifecycle.MetaBranch: "feat/42", + lifecycle.MetaWorkspacePath: "/tmp/ws/sess-1", + lifecycle.MetaRuntimeHandleID: "rt-sess-1", + lifecycle.MetaRuntimeName: "tmux", + } { + if meta[k] != want { + t.Errorf("meta[%q] = %q, want %q", k, meta[k], want) + } + } +} + +func TestSpawn_RuntimeCreateFailure_RollsBack(t *testing.T) { + h := newHarness("sess-1") + ctx := context.Background() + h.runtime.createErr = errors.New("boom") + + _, err := h.sm.Spawn(ctx, spawnCfg()) + if err == nil { + t.Fatal("spawn: want error, got nil") + } + + // No record seeded for a spawn that never completed. + if _, ok, _ := h.store.Get(ctx, "sess-1"); ok { + t.Error("record was seeded despite runtime-create failure") + } + // The already-created workspace was rolled back (eager rollback), since a + // late-seeded record means Cleanup could never find this orphan. + if len(h.workspace.destroyed) != 1 || h.workspace.destroyed[0].Path != "/tmp/ws/sess-1" { + t.Errorf("workspace.destroyed = %+v, want the created worktree", h.workspace.destroyed) + } + // LCM never told a spawn completed. + if h.log.indexOf("OnSpawnCompleted") != -1 { + t.Error("OnSpawnCompleted should not fire on a failed spawn") + } +} + +func TestKill_OrderingAndTerminalState(t *testing.T) { + h := newHarness("sess-1") + ctx := context.Background() + if _, err := h.sm.Spawn(ctx, spawnCfg()); err != nil { + t.Fatalf("spawn: %v", err) + } + + res, err := h.sm.Kill(ctx, "sess-1", ports.KillOptions{Reason: ports.KillManual}) + if err != nil { + t.Fatalf("kill: %v", err) + } + if !res.WorkspaceFreed { + t.Error("WorkspaceFreed = false, want true") + } + + // Intent recorded with the LCM BEFORE any teardown, runtime before workspace. + iKill := h.log.indexOf("OnKillRequested") + iRT := h.log.indexOf("Runtime.Destroy") + iWS := h.log.indexOf("Workspace.Destroy") + if !(iKill >= 0 && iKill < iRT && iRT < iWS) { + t.Errorf("kill order indices: OnKillRequested=%d Runtime.Destroy=%d Workspace.Destroy=%d (want ascending)", iKill, iRT, iWS) + } + + // Terminal canonical written by the LCM; display derives to killed. + rec, _, _ := h.store.Get(ctx, "sess-1") + if got := rec.Lifecycle.Session; got.State != domain.SessionTerminated || got.Reason != domain.ReasonManuallyKilled { + t.Errorf("session substate = %+v, want terminated/manually_killed", got) + } + if status := domain.DeriveLegacyStatus(rec.Lifecycle); status != domain.StatusKilled { + t.Errorf("status = %q, want killed", status) + } +} + +func TestKill_WorktreeRemoveRefusalSurfaced(t *testing.T) { + h := newHarness("sess-1") + ctx := context.Background() + if _, err := h.sm.Spawn(ctx, spawnCfg()); err != nil { + t.Fatalf("spawn: %v", err) + } + // The worktree path is still registered after prune (uncommitted work). + h.workspace.refuse["/tmp/ws/sess-1"] = true + + res, err := h.sm.Kill(ctx, "sess-1", ports.KillOptions{Reason: ports.KillManual}) + if err == nil { + t.Fatal("kill: want refusal error, got nil") + } + if res.WorkspaceFreed { + t.Error("WorkspaceFreed = true, want false on refusal") + } + // The refusal must be honored — the path is never force-deleted. + if len(h.workspace.destroyed) != 0 { + t.Errorf("workspace.destroyed = %+v, want none (refused)", h.workspace.destroyed) + } + // Runtime still torn down and intent still recorded — only the worktree is spared. + if h.log.indexOf("Runtime.Destroy") == -1 || h.log.indexOf("OnKillRequested") == -1 { + t.Error("runtime teardown / kill intent should still happen on a workspace refusal") + } +} + +func TestListAndGet_DeriveStatus(t *testing.T) { + cases := []struct { + name string + lc domain.CanonicalSessionLifecycle + want domain.SessionStatus + }{ + {"not_started", lc(domain.SessionNotStarted, domain.ReasonSpawnRequested, domain.PRNone, ""), domain.StatusSpawning}, + {"working", lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.PRNone, ""), domain.StatusWorking}, + {"idle", lc(domain.SessionIdle, domain.ReasonResearchComplete, domain.PRNone, ""), domain.StatusIdle}, + {"needs_input", lc(domain.SessionNeedsInput, domain.ReasonAwaitingUserInput, domain.PRNone, ""), domain.StatusNeedsInput}, + {"pr_ci_failed", lc(domain.SessionWorking, domain.ReasonFixingCI, domain.PROpen, domain.PRReasonCIFailing), domain.StatusCIFailed}, + {"pr_merged", lc(domain.SessionIdle, domain.ReasonMergedWaitingDecision, domain.PRMerged, domain.PRReasonMerged), domain.StatusMerged}, + {"killed", lc(domain.SessionTerminated, domain.ReasonManuallyKilled, domain.PRNone, ""), domain.StatusKilled}, + } + + h := newHarness("unused") + ctx := context.Background() + for _, c := range cases { + if err := h.store.Seed(ctx, domain.SessionRecord{ID: domain.SessionID(c.name), ProjectID: testProject, Lifecycle: c.lc}); err != nil { + t.Fatalf("seed %s: %v", c.name, err) + } + } + + // Get derives per-record. + for _, c := range cases { + got, err := h.sm.Get(ctx, domain.SessionID(c.name)) + if err != nil { + t.Fatalf("get %s: %v", c.name, err) + } + if got.Status != c.want { + t.Errorf("get %s: status = %q, want %q", c.name, got.Status, c.want) + } + } + + // List derives for every record in the project. + got, err := h.sm.List(ctx, testProject) + if err != nil { + t.Fatalf("list: %v", err) + } + if len(got) != len(cases) { + t.Fatalf("list len = %d, want %d", len(got), len(cases)) + } + byID := map[domain.SessionID]domain.SessionStatus{} + for _, s := range got { + byID[s.ID] = s.Status + } + for _, c := range cases { + if byID[domain.SessionID(c.name)] != c.want { + t.Errorf("list %s: status = %q, want %q", c.name, byID[domain.SessionID(c.name)], c.want) + } + } +} + +func TestGet_NotFound(t *testing.T) { + h := newHarness("sess-1") + if _, err := h.sm.Get(context.Background(), "missing"); !errors.Is(err, ErrNotFound) { + t.Errorf("get missing: err = %v, want ErrNotFound", err) + } +} + +func TestSend_RoutesToMessenger(t *testing.T) { + h := newHarness("sess-1") + if err := h.sm.Send(context.Background(), "sess-1", "hello"); err != nil { + t.Fatalf("send: %v", err) + } + if len(h.messenger.sent) != 1 || h.messenger.sent[0].ID != "sess-1" || h.messenger.sent[0].Message != "hello" { + t.Errorf("messenger.sent = %+v, want one {sess-1, hello}", h.messenger.sent) + } +} + +func TestRestore_RelaunchesWithResumeCommand(t *testing.T) { + h := newHarness("sess-1") + ctx := context.Background() + if _, err := h.sm.Spawn(ctx, spawnCfg()); err != nil { + t.Fatalf("spawn: %v", err) + } + if _, err := h.sm.Kill(ctx, "sess-1", ports.KillOptions{Reason: ports.KillManual}); err != nil { + t.Fatalf("kill: %v", err) + } + // The agent's resume id is captured in metadata (here set explicitly). + if err := h.store.PatchMetadata(ctx, "sess-1", map[string]string{lifecycle.MetaAgentSessionID: "agent-xyz"}); err != nil { + t.Fatalf("patch metadata: %v", err) + } + + sess, err := h.sm.Restore(ctx, "sess-1") + if err != nil { + t.Fatalf("restore: %v", err) + } + + // Reopened: terminal session reset to a fresh spawn, PR cleared, runtime alive. + if sess.Status != domain.StatusSpawning { + t.Errorf("status = %q, want spawning", sess.Status) + } + rec, _, _ := h.store.Get(ctx, "sess-1") + if got := rec.Lifecycle.Session; got.State != domain.SessionNotStarted || got.Reason != domain.ReasonSpawnRequested { + t.Errorf("session substate = %+v, want not_started/spawn_requested", got) + } + if got := rec.Lifecycle.PR; got.State != domain.PRNone || got.Reason != domain.PRReasonClearedOnRestore { + t.Errorf("pr substate = %+v, want none/cleared_on_restore", got) + } + if rec.Lifecycle.Runtime.State != domain.RuntimeAlive { + t.Errorf("runtime state = %q, want alive", rec.Lifecycle.Runtime.State) + } + + // Relaunched via the agent's resume command (created[0] is the original spawn). + if len(h.runtime.created) != 2 { + t.Fatalf("runtime.created = %d, want 2 (spawn + restore)", len(h.runtime.created)) + } + if got := h.runtime.created[1].LaunchCommand; got != "claude --resume agent-xyz" { + t.Errorf("restore launch command = %q, want resume", got) + } + if h.log.indexOf("Workspace.Restore") == -1 { + t.Error("Workspace.Restore was not called") + } +} + +func TestCleanup_SkipsUncommittedWork(t *testing.T) { + h := newHarness("unused") + ctx := context.Background() + + // Two terminal sessions (reclaimable) + one working session (must be ignored). + seedTerminal(t, h, "done-1", "/tmp/ws/done-1") + seedTerminal(t, h, "dirty-1", "/tmp/ws/dirty-1") + if err := h.store.Seed(ctx, domain.SessionRecord{ + ID: "live-1", ProjectID: testProject, + Lifecycle: lc(domain.SessionWorking, domain.ReasonTaskInProgress, domain.PRNone, ""), + }); err != nil { + t.Fatalf("seed live: %v", err) + } + // dirty-1's worktree still holds uncommitted work — Destroy refuses it. + h.workspace.refuse["/tmp/ws/dirty-1"] = true + + res, err := h.sm.Cleanup(ctx, testProject) + if err != nil { + t.Fatalf("cleanup: %v", err) + } + + if !equalIDSet(res.Cleaned, []domain.SessionID{"done-1"}) { + t.Errorf("cleaned = %v, want [done-1]", res.Cleaned) + } + if !equalIDSet(res.Skipped, []domain.SessionID{"dirty-1"}) { + t.Errorf("skipped = %v, want [dirty-1]", res.Skipped) + } + // The live session was never a candidate. + if contains(res.Cleaned, "live-1") || contains(res.Skipped, "live-1") { + t.Error("non-terminal session must not be cleaned or skipped") + } +} + +// ---- test helpers ---- + +func lc(s domain.SessionState, r domain.SessionReason, prs domain.PRState, prr domain.PRReason) domain.CanonicalSessionLifecycle { + return domain.CanonicalSessionLifecycle{ + Version: domain.LifecycleVersion, + Session: domain.SessionSubstate{State: s, Reason: r}, + PR: domain.PRSubstate{State: prs, Reason: prr}, + Runtime: domain.RuntimeSubstate{State: domain.RuntimeAlive, Reason: domain.RuntimeReasonProcessRunning}, + } +} + +func seedTerminal(t *testing.T, h *harness, id domain.SessionID, wsPath string) { + t.Helper() + ctx := context.Background() + if err := h.store.Seed(ctx, domain.SessionRecord{ + ID: id, ProjectID: testProject, + Lifecycle: lc(domain.SessionTerminated, domain.ReasonManuallyKilled, domain.PRNone, ""), + }); err != nil { + t.Fatalf("seed %s: %v", id, err) + } + if err := h.store.PatchMetadata(ctx, id, map[string]string{lifecycle.MetaWorkspacePath: wsPath}); err != nil { + t.Fatalf("patch metadata %s: %v", id, err) + } +} + +func equalStrings(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +func contains(ids []domain.SessionID, id domain.SessionID) bool { + for _, x := range ids { + if x == id { + return true + } + } + return false +} + +func equalIDSet(got, want []domain.SessionID) bool { + if len(got) != len(want) { + return false + } + for _, w := range want { + if !contains(got, w) { + return false + } + } + return true +} From 162881d2bda4f8f34dad5b450576a727ffe71db0 Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Wed, 27 May 2026 13:50:24 +0530 Subject: [PATCH 2/3] =?UTF-8?q?fix(session):=20harden=20Restore=20?= =?UTF-8?q?=E2=80=94=20require=20agent=20session=20id,=20roll=20back=20run?= =?UTF-8?q?time=20on=20post-create=20failure=20(PR=20#7=20review)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Restore now fails early with a clear error if MetaAgentSessionID is missing, rather than emitting an ambiguous "resume nothing" launch command (no stored prompt means a fresh-launch fallback isn't possible). - On a post-runtime-create failure (reopen patch or OnSpawnCompleted), best-effort destroy the newly created runtime (never the workspace, which holds prior work) so we don't strand a live process while parking the session terminal. - Added a test for Restore with a missing agent session id: errors early, touches no workspace/runtime, leaves the session terminal. Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/internal/session/manager.go | 18 ++++++++++++-- backend/internal/session/manager_test.go | 30 ++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/backend/internal/session/manager.go b/backend/internal/session/manager.go index 61bb9274..62736739 100644 --- a/backend/internal/session/manager.go +++ b/backend/internal/session/manager.go @@ -239,6 +239,15 @@ func (m *Manager) Restore(ctx context.Context, id domain.SessionID) (domain.Sess return domain.Session{}, fmt.Errorf("restore %s: metadata: %w", id, err) } + // Resume is only possible with the agent's captured session id. Without it, + // GetRestoreCommand would produce an ambiguous "resume nothing" launch, and + // we have no stored prompt to fall back to a fresh launch — so fail early, + // before any I/O. + agentSessionID := meta[lifecycle.MetaAgentSessionID] + if agentSessionID == "" { + return domain.Session{}, fmt.Errorf("restore %s: missing agent session id (cannot resume)", id) + } + ws, err := m.workspace.Restore(ctx, ports.WorkspaceConfig{ ProjectID: rec.ProjectID, SessionID: id, @@ -252,18 +261,22 @@ func (m *Manager) Restore(ctx context.Context, id domain.SessionID) (domain.Sess handle, err := m.runtime.Create(ctx, ports.RuntimeConfig{ SessionID: id, WorkspacePath: ws.Path, - LaunchCommand: m.agent.GetRestoreCommand(meta[lifecycle.MetaAgentSessionID]), + LaunchCommand: m.agent.GetRestoreCommand(agentSessionID), Env: spawnEnv(m.agent.GetEnvironment(agentCfg), id, rec.ProjectID, rec.IssueID), }) if err != nil { return domain.Session{}, fmt.Errorf("restore %s: runtime create: %w", id, err) } + // Past this point the runtime is live: a failure must tear it back down (but + // never the workspace, which holds the agent's prior work) so we don't strand + // a process while parking the session in a terminal lifecycle. reopen := ports.LifecyclePatch{ Session: &domain.SessionSubstate{State: domain.SessionNotStarted, Reason: domain.ReasonSpawnRequested}, PR: &domain.PRSubstate{State: domain.PRNone, Reason: domain.PRReasonClearedOnRestore}, } if err := m.store.PatchLifecycle(ctx, id, reopen); err != nil { + m.rollbackRuntime(ctx, handle) return domain.Session{}, fmt.Errorf("restore %s: reopen: %w", id, err) } @@ -271,9 +284,10 @@ func (m *Manager) Restore(ctx context.Context, id domain.SessionID) (domain.Sess Branch: ws.Branch, WorkspacePath: ws.Path, RuntimeHandle: handle, - AgentSessionID: meta[lifecycle.MetaAgentSessionID], + AgentSessionID: agentSessionID, } if err := m.lcm.OnSpawnCompleted(ctx, id, outcome); err != nil { + m.rollbackRuntime(ctx, handle) return domain.Session{}, fmt.Errorf("restore %s: on spawn completed: %w", id, err) } return m.Get(ctx, id) diff --git a/backend/internal/session/manager_test.go b/backend/internal/session/manager_test.go index e28eded1..85f9297c 100644 --- a/backend/internal/session/manager_test.go +++ b/backend/internal/session/manager_test.go @@ -296,6 +296,36 @@ func TestRestore_RelaunchesWithResumeCommand(t *testing.T) { } } +func TestRestore_MissingAgentSessionID_Errors(t *testing.T) { + h := newHarness("sess-1") + ctx := context.Background() + if _, err := h.sm.Spawn(ctx, spawnCfg()); err != nil { + t.Fatalf("spawn: %v", err) + } + if _, err := h.sm.Kill(ctx, "sess-1", ports.KillOptions{Reason: ports.KillManual}); err != nil { + t.Fatalf("kill: %v", err) + } + // No agent session id was ever captured (spawn leaves it empty) — resume is + // impossible, so Restore must fail early without touching workspace/runtime. + beforeRestores := len(h.workspace.restoredID) + beforeCreated := len(h.runtime.created) + + if _, err := h.sm.Restore(ctx, "sess-1"); err == nil { + t.Fatal("restore: want error for missing agent session id, got nil") + } + if len(h.workspace.restoredID) != beforeRestores { + t.Error("workspace was touched despite a doomed restore") + } + if len(h.runtime.created) != beforeCreated { + t.Error("runtime was created despite a doomed restore") + } + // The session stays terminal — a failed restore does not reopen it. + rec, _, _ := h.store.Get(ctx, "sess-1") + if rec.Lifecycle.Session.State != domain.SessionTerminated { + t.Errorf("session state = %q, want terminated (unchanged)", rec.Lifecycle.Session.State) + } +} + func TestCleanup_SkipsUncommittedWork(t *testing.T) { h := newHarness("unused") ctx := context.Background() From 4c331d907e034fbb8d71f5d25ccc0a6d56672f8c Mon Sep 17 00:00:00 2001 From: harshitsinghbhandari <24b4506@iitb.ac.in> Date: Wed, 27 May 2026 14:00:44 +0530 Subject: [PATCH 3/3] test(session): cover spawn orphan-to-errored route and restore runtime rollback (review follow-ups) Adds an injectable OnSpawnCompleted failure to the recording LCM and two tests: - Spawn: when OnSpawnCompleted fails, the seeded record is parked terminal/errored (via OnKillRequested(KillError)) and runtime+workspace are torn down. - Restore: when OnSpawnCompleted fails post-create, the new runtime is destroyed while the workspace is preserved. Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/internal/session/fakes_test.go | 10 +++- backend/internal/session/manager_test.go | 64 ++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/backend/internal/session/fakes_test.go b/backend/internal/session/fakes_test.go index d8e4b248..648172de 100644 --- a/backend/internal/session/fakes_test.go +++ b/backend/internal/session/fakes_test.go @@ -319,12 +319,19 @@ func (noopNotifier) Notify(_ context.Context, _ ports.OrchestratorEvent) error { type recordingLCM struct { log *callLog inner ports.LifecycleManager + + // onSpawnErr, when set, makes OnSpawnCompleted fail (without touching the + // inner manager) so tests can exercise the SM's post-spawn failure paths. + onSpawnErr error } var _ ports.LifecycleManager = (*recordingLCM)(nil) func (l *recordingLCM) OnSpawnCompleted(ctx context.Context, id domain.SessionID, o ports.SpawnOutcome) error { l.log.add("OnSpawnCompleted") + if l.onSpawnErr != nil { + return l.onSpawnErr + } return l.inner.OnSpawnCompleted(ctx, id, o) } @@ -358,6 +365,7 @@ type harness struct { agent *fakeAgent workspace *fakeWorkspace messenger *recordingMessenger + lcm *recordingLCM log *callLog } @@ -384,7 +392,7 @@ func newHarness(id domain.SessionID) *harness { NewID: func(ports.SpawnConfig) domain.SessionID { return id }, }) - return &harness{sm: sm, store: store, runtime: rt, agent: ag, workspace: ws, messenger: msg, log: log} + return &harness{sm: sm, store: store, runtime: rt, agent: ag, workspace: ws, messenger: msg, lcm: lcm, log: log} } func cloneMap(in map[string]string) map[string]string { diff --git a/backend/internal/session/manager_test.go b/backend/internal/session/manager_test.go index 85f9297c..bda7e988 100644 --- a/backend/internal/session/manager_test.go +++ b/backend/internal/session/manager_test.go @@ -121,6 +121,38 @@ func TestSpawn_RuntimeCreateFailure_RollsBack(t *testing.T) { } } +func TestSpawn_OnSpawnCompletedFailure_RoutesOrphanToErrored(t *testing.T) { + h := newHarness("sess-1") + ctx := context.Background() + h.lcm.onSpawnErr = errors.New("lcm boom") + + _, err := h.sm.Spawn(ctx, spawnCfg()) + if err == nil { + t.Fatal("spawn: want error, got nil") + } + + // Runtime + workspace are torn down on the failure path. + if len(h.runtime.destroyed) != 1 { + t.Errorf("runtime.destroyed = %d, want 1", len(h.runtime.destroyed)) + } + if len(h.workspace.destroyed) != 1 { + t.Errorf("workspace.destroyed = %d, want 1", len(h.workspace.destroyed)) + } + // The record was already seeded and the store has no delete, so the orphan is + // routed to a terminal errored state (via OnKillRequested(KillError)) rather + // than stranded forever as "spawning". + rec, ok, _ := h.store.Get(ctx, "sess-1") + if !ok { + t.Fatal("seeded record vanished; expected it parked as errored") + } + if got := rec.Lifecycle.Session; got.State != domain.SessionTerminated || got.Reason != domain.ReasonErrorInProcess { + t.Errorf("session substate = %+v, want terminated/error_in_process", got) + } + if status := domain.DeriveLegacyStatus(rec.Lifecycle); status != domain.StatusErrored { + t.Errorf("status = %q, want errored", status) + } +} + func TestKill_OrderingAndTerminalState(t *testing.T) { h := newHarness("sess-1") ctx := context.Background() @@ -326,6 +358,38 @@ func TestRestore_MissingAgentSessionID_Errors(t *testing.T) { } } +func TestRestore_OnSpawnCompletedFailure_RollsBackRuntime(t *testing.T) { + h := newHarness("sess-1") + ctx := context.Background() + if _, err := h.sm.Spawn(ctx, spawnCfg()); err != nil { + t.Fatalf("spawn: %v", err) + } + if _, err := h.sm.Kill(ctx, "sess-1", ports.KillOptions{Reason: ports.KillManual}); err != nil { + t.Fatalf("kill: %v", err) + } + if err := h.store.PatchMetadata(ctx, "sess-1", map[string]string{lifecycle.MetaAgentSessionID: "agent-xyz"}); err != nil { + t.Fatalf("patch metadata: %v", err) + } + + // Fail the post-create LCM call; capture teardown counts just before restore. + h.lcm.onSpawnErr = errors.New("lcm boom") + destroyedBefore := len(h.runtime.destroyed) + wsDestroyedBefore := len(h.workspace.destroyed) + + if _, err := h.sm.Restore(ctx, "sess-1"); err == nil { + t.Fatal("restore: want error, got nil") + } + + // The runtime created during restore is torn back down so no process is + // stranded; the workspace is left intact (it holds the agent's prior work). + if len(h.runtime.destroyed) != destroyedBefore+1 { + t.Errorf("runtime.destroyed grew by %d, want 1 (restore rollback)", len(h.runtime.destroyed)-destroyedBefore) + } + if len(h.workspace.destroyed) != wsDestroyedBefore { + t.Errorf("workspace was destroyed on restore rollback; it must be preserved") + } +} + func TestCleanup_SkipsUncommittedWork(t *testing.T) { h := newHarness("unused") ctx := context.Background()