From 0ddc84b7abe061ca7b6c3e8e7a212e77959334f5 Mon Sep 17 00:00:00 2001 From: Bailey Dixon Date: Sun, 19 Apr 2026 14:28:31 -0400 Subject: [PATCH] feat(daemon): wire agent.run/stop/send/attach with process spawn + event stream - Add AgentRuntime (packages/daemon/src/agent-runtime.ts): spawns child processes, tracks status transitions (starting -> running -> completed/ failed/stopped), emits events into the Hub (topic agent:) and persists them to the agent_events table with (epoch, seq) cursor. - Replace stub agent.run/stop/send handlers with real AgentRuntime calls and add new agent.attach RPC that returns the persisted event history (with optional sinceEpoch/sinceSeq/limit cursor). - Frame terminal bytes on channel 1 with a 36-byte agent UUID prefix so multiplexed terminal streams are routable client-side. Shared codec lives in packages/daemon/src/terminal-frame.ts. - Extend the client SDK with client.agents.attach(agentId, handlers), per-agent terminal routing, AgentAttachParams / AgentAttachResult / AgentEventPayload schemas, and Methods.agent_attach. - Internal "_echo" test profile spawns a node one-liner that echoes stdin so tests + smoke scripts can exercise the pipe without a real adapter wired yet. - New tests/agent-lifecycle.test.ts covers spawn + terminal bytes + event replay + agent.stop. Existing daemon.test.ts still passes. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/client/src/client.ts | 61 ++- packages/client/src/protocol.ts | 23 ++ packages/daemon/src/agent-runtime.ts | 354 ++++++++++++++++++ packages/daemon/src/bootstrap.ts | 16 +- packages/daemon/src/hub.ts | 7 + packages/daemon/src/index.ts | 10 + packages/daemon/src/rpc/agent.ts | 65 +++- packages/daemon/src/rpc/index.ts | 3 +- packages/daemon/src/rpc/types.ts | 2 + packages/daemon/src/server.ts | 5 +- packages/daemon/src/terminal-frame.ts | 27 ++ packages/daemon/src/ws/session.ts | 11 +- packages/daemon/tests/agent-lifecycle.test.ts | 133 +++++++ 13 files changed, 697 insertions(+), 20 deletions(-) create mode 100644 packages/daemon/src/agent-runtime.ts create mode 100644 packages/daemon/src/terminal-frame.ts create mode 100644 packages/daemon/tests/agent-lifecycle.test.ts diff --git a/packages/client/src/client.ts b/packages/client/src/client.ts index 0cbb10b..5809819 100644 --- a/packages/client/src/client.ts +++ b/packages/client/src/client.ts @@ -1,6 +1,9 @@ import WebSocket from "ws"; import { z } from "zod"; import { + AgentAttachParams, + AgentAttachResult, + AgentEventPayload, AgentListResult, AgentOkResult, AgentRunParams, @@ -18,6 +21,9 @@ import { } from "./protocol.js"; import { decodeFrame, encodeControl, encodeFrame } from "./frame.js"; +/** Length (in bytes) of the agent UUID prefix on terminal frames. */ +const TERMINAL_ID_LEN = 36; + export interface ArcClientOptions { /** WebSocket URL, e.g. `ws://127.0.0.1:7272`. */ url: string; @@ -45,6 +51,7 @@ export class ArcClient { private pending = new Map(); private topicHandlers = new Map>(); private terminalHandler: TerminalHandler | null = null; + private agentTerminalHandlers = new Map(); private reconnectAttempt = 0; private closed = false; private idCounter = 0; @@ -76,6 +83,7 @@ export class ArcClient { this.closed = true; this.ws?.close(); this.ws = null; + this.agentTerminalHandlers.clear(); } /** Typed request/response RPC. */ @@ -150,6 +158,41 @@ export class ArcClient { this.call(Methods.agent_send, AgentSendParams.parse(params)).then((r) => AgentOkResult.parse(r), ), + /** + * Subscribe to `agent:` events and route terminal bytes for that id + * to `onTerminal`. Returns the initial replay + a dispose function. + */ + attach: async ( + agentId: string, + handlers: { + onEvent?: (event: z.infer) => void; + onTerminal?: TerminalHandler; + } = {}, + attachParams: Omit, "agentId"> = {}, + ): Promise<{ initial: z.infer; dispose: () => Promise }> => { + const initial = AgentAttachResult.parse( + await this.call( + Methods.agent_attach, + AgentAttachParams.parse({ agentId, ...attachParams }), + ), + ); + const topic = `agent:${agentId}`; + const unsubEvents = await this.subscribe(topic, (payload) => { + if (handlers.onEvent) { + handlers.onEvent(payload as z.infer); + } + }); + if (handlers.onTerminal) { + this.agentTerminalHandlers.set(agentId, handlers.onTerminal); + } + return { + initial, + dispose: async () => { + await unsubEvents(); + this.agentTerminalHandlers.delete(agentId); + }, + }; + }, }; // --- Internals ----------------------------------------------------------- @@ -201,11 +244,19 @@ export class ArcClient { this.handleEnvelope(envelope); return; } - if (frame.channel === Channel.Terminal && this.terminalHandler) { - // Terminal frames carry agent id as the first UTF-8 line, then raw bytes. - // For Phase 1 there's no producer yet; payload is opaque — handed to the - // consumer verbatim with a placeholder agent id. - this.terminalHandler("", frame.payload); + if (frame.channel === Channel.Terminal) { + // Terminal frames are prefixed with a 36-byte ASCII UUID identifying + // the source agent, followed by raw bytes. + const payload = frame.payload; + const agentId = + payload.length >= TERMINAL_ID_LEN + ? new TextDecoder().decode(payload.subarray(0, TERMINAL_ID_LEN)) + : ""; + const bytes = + payload.length >= TERMINAL_ID_LEN ? payload.subarray(TERMINAL_ID_LEN) : payload; + const perAgent = this.agentTerminalHandlers.get(agentId); + if (perAgent) perAgent(agentId, bytes); + if (this.terminalHandler) this.terminalHandler(agentId, bytes); } } diff --git a/packages/client/src/protocol.ts b/packages/client/src/protocol.ts index 5d9594e..beb8b9f 100644 --- a/packages/client/src/protocol.ts +++ b/packages/client/src/protocol.ts @@ -59,6 +59,7 @@ export const Methods = { agent_run: "agent.run", agent_stop: "agent.stop", agent_send: "agent.send", + agent_attach: "agent.attach", } as const; export type MethodName = (typeof Methods)[keyof typeof Methods]; @@ -115,6 +116,28 @@ export const AgentStopParams = z.object({ agentId: z.string() }); export const AgentSendParams = z.object({ agentId: z.string(), text: z.string() }); export const AgentOkResult = z.object({ ok: z.literal(true) }); +export const AgentAttachParams = z.object({ + agentId: z.string(), + sinceEpoch: z.number().optional(), + sinceSeq: z.number().optional(), + limit: z.number().int().min(1).max(2000).optional(), +}); + +export const AgentEventPayload = z.object({ + agentId: z.string(), + epoch: z.number(), + seq: z.number(), + ts: z.number(), + kind: z.string(), + payload: z.record(z.string(), z.unknown()).optional(), +}); + +export const AgentAttachResult = z.object({ + agentId: z.string(), + status: z.string(), + events: z.array(AgentEventPayload), +}); + // --- Topic helpers ---------------------------------------------------------- export const Topics = { diff --git a/packages/daemon/src/agent-runtime.ts b/packages/daemon/src/agent-runtime.ts new file mode 100644 index 0000000..0203081 --- /dev/null +++ b/packages/daemon/src/agent-runtime.ts @@ -0,0 +1,354 @@ +/** + * Agent runtime — spawns child processes, tracks status, persists + * lifecycle events to `agent_events`, and fans out via the Hub. + * + * Phase 4 scope: internal process spawning only. Adapter integration + * (Claude Code / Codex / Gemini) lands once the daemon owns profile + * resolution + env injection. Today we expose a small set of "builtin" + * invocations (e.g. `_echo`) for tests + smoke scripts, plus a generic + * command escape hatch used by orchestrators that already know the + * argv they want. + */ + +import crypto from "node:crypto"; +import { spawn, type ChildProcess } from "node:child_process"; +import type { DB } from "./db.js"; +import type { Hub } from "./hub.js"; +import type { Logger } from "./logger.js"; + +export { + TERMINAL_ID_LEN, + encodeTerminalPayload, + decodeTerminalPayload, +} from "./terminal-frame.js"; + +export type AgentStatus = "starting" | "running" | "completed" | "failed" | "stopped"; + +export type AgentEventKind = + | "status" + | "stdout" + | "stderr" + | "exit" + | "error" + | "spawn"; + +export interface AgentEvent { + agentId: string; + epoch: number; + seq: number; + ts: number; + kind: AgentEventKind; + payload: Record; +} + +export interface AgentRunOptions { + profile: string; + prompt?: string; + cwd?: string; + worktree?: string | null; + launchMode?: "native" | "worker"; + /** + * Optional raw argv override — used for internal test agents and for + * callers that already know what to spawn. When unset, falls back to + * builtin handlers based on `profile`. + */ + command?: { bin: string; args: string[] }; + env?: Record; +} + +export interface RunningAgent { + id: string; + status: AgentStatus; + child: ChildProcess | null; + epoch: number; + seq: number; + createdAt: number; +} + +export interface AgentRuntimeDeps { + db: DB; + hub: Hub; + logger: Logger; + /** Fan terminal bytes to every session subscribed to `agent:`. */ + broadcastTerminal: (agentId: string, bytes: Uint8Array) => void; +} + +export class AgentRuntime { + private agents = new Map(); + private closed = false; + + constructor(private deps: AgentRuntimeDeps) {} + + /** Insert a row in `agents`, spawn the child, return the id. */ + async run(opts: AgentRunOptions): Promise { + const id = crypto.randomUUID(); + const now = Date.now(); + const launchMode = opts.launchMode ?? "worker"; + const cwd = opts.cwd ?? process.cwd(); + + this.deps.db + .prepare( + `INSERT INTO agents (id, profile, cwd, status, launch_mode, created_at, updated_at, worktree) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, + ) + .run(id, opts.profile, cwd, "starting", launchMode, now, now, opts.worktree ?? null); + + const agent: RunningAgent = { + id, + status: "starting", + child: null, + epoch: now, + seq: 0, + createdAt: now, + }; + this.agents.set(id, agent); + this.emit(agent, "status", { status: "starting", profile: opts.profile }); + + // Resolve argv: either caller-provided command, builtin, or a default + // "prompt echo" used when no real adapter is wired yet. + const command = resolveCommand(opts); + try { + const child = spawn(command.bin, command.args, { + cwd, + env: { ...process.env, ...(opts.env ?? {}) } as NodeJS.ProcessEnv, + stdio: ["pipe", "pipe", "pipe"], + windowsHide: true, + }); + agent.child = child; + + child.on("error", (err) => { + this.deps.logger.warn("agent.spawn-error", { id, message: err.message }); + this.emit(agent, "error", { message: err.message }); + this.setStatus(agent, "failed"); + }); + + if (!child.pid) { + this.setStatus(agent, "failed"); + this.emit(agent, "error", { message: "spawn failed: no pid" }); + return id; + } + + this.setStatus(agent, "running"); + this.emit(agent, "spawn", { pid: child.pid }); + + child.stdout?.on("data", (chunk: Buffer) => { + const bytes = new Uint8Array(chunk); + this.deps.broadcastTerminal(id, bytes); + this.emit(agent, "stdout", { bytes: chunk.length }); + }); + child.stderr?.on("data", (chunk: Buffer) => { + const bytes = new Uint8Array(chunk); + this.deps.broadcastTerminal(id, bytes); + this.emit(agent, "stderr", { bytes: chunk.length }); + }); + child.on("exit", (code, signal) => { + const finalStatus: AgentStatus = + agent.status === "stopped" ? "stopped" : code === 0 ? "completed" : "failed"; + this.emit(agent, "exit", { code, signal }); + this.setStatus(agent, finalStatus, { completedAt: Date.now() }); + }); + + // Prompt injection: if a prompt was provided and no explicit command + // was given, write the prompt to stdin followed by a newline. Builtins + // like `_echo` read a single stdin line and exit. + if (opts.prompt && !opts.command && child.stdin) { + child.stdin.write(`${opts.prompt}\n`); + child.stdin.end(); + } + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + this.deps.logger.warn("agent.spawn-throw", { id, message }); + this.emit(agent, "error", { message }); + this.setStatus(agent, "failed"); + } + + return id; + } + + /** Terminate a running agent. Returns `{ ok: true }` even if already dead. */ + async stop(agentId: string): Promise { + const agent = this.agents.get(agentId); + if (!agent) { + throw withCode(new Error(`agent not found: ${agentId}`), "not_found"); + } + if (agent.status === "completed" || agent.status === "failed" || agent.status === "stopped") { + return; + } + agent.status = "stopped"; + if (agent.child && !agent.child.killed) { + try { + agent.child.kill("SIGTERM"); + } catch (err) { + this.deps.logger.warn("agent.stop-error", { + id: agentId, + message: (err as Error).message, + }); + } + } + this.setStatus(agent, "stopped", { completedAt: Date.now() }); + } + + /** Write text to the agent's stdin. */ + async send(agentId: string, text: string): Promise { + const agent = this.agents.get(agentId); + if (!agent) { + throw withCode(new Error(`agent not found: ${agentId}`), "not_found"); + } + if (!agent.child || agent.child.killed || !agent.child.stdin?.writable) { + throw withCode(new Error(`agent stdin not writable: ${agentId}`), "bad_request"); + } + agent.child.stdin.write(text); + } + + /** Read the last N events for an agent from the DB. */ + replay(agentId: string, sinceEpoch?: number, sinceSeq?: number, limit = 200): AgentEvent[] { + const rows = this.deps.db + .prepare( + `SELECT agent_id, epoch, seq, ts, kind, payload + FROM agent_events + WHERE agent_id = ? + AND (? IS NULL OR (epoch, seq) > (?, ?)) + ORDER BY epoch ASC, seq ASC + LIMIT ?`, + ) + .all( + agentId, + sinceEpoch === undefined ? null : sinceEpoch, + sinceEpoch ?? 0, + sinceSeq ?? 0, + limit, + ) as Array<{ + agent_id: string; + epoch: number; + seq: number; + ts: number; + kind: string; + payload: string; + }>; + return rows.map((r) => ({ + agentId: r.agent_id, + epoch: r.epoch, + seq: r.seq, + ts: r.ts, + kind: r.kind as AgentEventKind, + payload: safeParseJson(r.payload), + })); + } + + /** + * Stop every running child. Called during daemon shutdown. Flips the + * `closed` flag so any late `exit` handlers become no-ops on the DB/hub. + */ + async shutdown(): Promise { + const ids = Array.from(this.agents.keys()); + for (const id of ids) { + const agent = this.agents.get(id); + if (!agent) continue; + if (agent.status === "running" || agent.status === "starting") { + try { + await this.stop(id); + } catch { + // best effort + } + } + } + this.closed = true; + } + + // --- Internals ----------------------------------------------------------- + + private emit(agent: RunningAgent, kind: AgentEventKind, payload: Record): void { + if (this.closed) return; + agent.seq += 1; + const event: AgentEvent = { + agentId: agent.id, + epoch: agent.epoch, + seq: agent.seq, + ts: Date.now(), + kind, + payload, + }; + try { + this.deps.db + .prepare( + `INSERT INTO agent_events (agent_id, epoch, seq, ts, kind, payload) + VALUES (?, ?, ?, ?, ?, ?)`, + ) + .run(event.agentId, event.epoch, event.seq, event.ts, event.kind, JSON.stringify(payload)); + } catch (err) { + // DB may be closed during shutdown race — log but don't crash. + try { + this.deps.logger.warn("agent.event-persist-error", { + id: agent.id, + message: (err as Error).message, + }); + } catch { + // logger may also be closed + } + } + try { + this.deps.hub.publish(`agent:${agent.id}`, event); + } catch { + // session sinks closed during shutdown + } + } + + private setStatus( + agent: RunningAgent, + status: AgentStatus, + extra: { completedAt?: number } = {}, + ): void { + agent.status = status; + if (this.closed) return; + const now = Date.now(); + try { + if (extra.completedAt !== undefined) { + this.deps.db + .prepare("UPDATE agents SET status = ?, updated_at = ?, completed_at = ? WHERE id = ?") + .run(status, now, extra.completedAt, agent.id); + } else { + this.deps.db + .prepare("UPDATE agents SET status = ?, updated_at = ? WHERE id = ?") + .run(status, now, agent.id); + } + } catch { + // DB closed during shutdown race + } + this.emit(agent, "status", { status }); + } +} + +function resolveCommand(opts: AgentRunOptions): { bin: string; args: string[] } { + if (opts.command) return opts.command; + // Builtin test profile: echoes the prompt then exits. + if (opts.profile === "_echo") { + return { + bin: process.execPath, + args: [ + "-e", + // Node inline: read stdin, print it, exit. + "let d='';process.stdin.on('data',c=>d+=c);process.stdin.on('end',()=>{process.stdout.write(d);process.exit(0);});", + ], + }; + } + // Fallback: a short-lived noop so the daemon doesn't spin on unknown profiles. + return { + bin: process.execPath, + args: ["-e", "process.stdout.write('agent runtime: no adapter wired for profile\\n');"], + }; +} + +function safeParseJson(raw: string): Record { + try { + const parsed = JSON.parse(raw) as unknown; + if (parsed && typeof parsed === "object") return parsed as Record; + } catch { + // fall through + } + return {}; +} + +function withCode(err: Error, code: string): Error { + (err as Error & { code: string }).code = code; + return err; +} diff --git a/packages/daemon/src/bootstrap.ts b/packages/daemon/src/bootstrap.ts index 4e54625..13773bd 100644 --- a/packages/daemon/src/bootstrap.ts +++ b/packages/daemon/src/bootstrap.ts @@ -5,6 +5,7 @@ import { openDb } from "./db.js"; import { createLogger } from "./logger.js"; import { ensureAuthFile } from "./auth.js"; import { Hub } from "./hub.js"; +import { AgentRuntime } from "./agent-runtime.js"; import { startServer, type ServerHandle } from "./server.js"; export interface DaemonOptions { @@ -59,12 +60,25 @@ export async function startDaemon( const version = opts.version ?? readDaemonVersion(); const startedAt = Date.now(); - const server = startServer({ config, db, logger, hub, auth, version, startedAt }); + const runtime = new AgentRuntime({ + db, + hub, + logger, + broadcastTerminal: (agentId, bytes) => { + for (const session of hub.subscribers(`agent:${agentId}`)) { + if (!session.conn.alive) continue; + session.sendTerminal(agentId, bytes); + } + }, + }); + + const server = startServer({ config, db, logger, hub, auth, version, startedAt, runtime }); fs.writeFileSync(config.pidPath, String(process.pid)); logger.info("daemon.started", { pid: process.pid, version, port: config.port }); const stop = async () => { logger.info("daemon.stopping"); + await runtime.shutdown(); await server.close(); try { db.close(); diff --git a/packages/daemon/src/hub.ts b/packages/daemon/src/hub.ts index 5b9bbef..ac9a84f 100644 --- a/packages/daemon/src/hub.ts +++ b/packages/daemon/src/hub.ts @@ -55,6 +55,13 @@ export class Hub { session.sendControl(envelope); } } + + /** Iterate live sessions subscribed to a topic (for non-envelope channels). */ + subscribers(topic: string): Iterable { + const set = this.byTopic.get(topic); + if (!set) return []; + return set; + } } let counter = 0; diff --git a/packages/daemon/src/index.ts b/packages/daemon/src/index.ts index b62051b..066280d 100644 --- a/packages/daemon/src/index.ts +++ b/packages/daemon/src/index.ts @@ -2,3 +2,13 @@ export { startDaemon, readPid, type DaemonHandle, type DaemonOptions } from "./b export { loadConfig as loadDaemonConfig, DEFAULT_PORT, PROTOCOL_VERSION, type DaemonConfig } from "./config.js"; export { ensureAuthFile, pairClient, type AuthFile, type PairResult } from "./auth.js"; export { buildHealth, type DaemonHealth } from "./health.js"; +export { + AgentRuntime, + encodeTerminalPayload, + decodeTerminalPayload, + TERMINAL_ID_LEN, + type AgentEvent, + type AgentEventKind, + type AgentRunOptions, + type AgentStatus, +} from "./agent-runtime.js"; diff --git a/packages/daemon/src/rpc/agent.ts b/packages/daemon/src/rpc/agent.ts index 789db11..7f76042 100644 --- a/packages/daemon/src/rpc/agent.ts +++ b/packages/daemon/src/rpc/agent.ts @@ -1,9 +1,17 @@ +import { + AgentAttachParams, + AgentRunParams, + AgentSendParams, + AgentStopParams, +} from "@axiom-labs/arc-client"; import type { RpcHandler } from "./types.js"; /** - * Agent RPCs — Phase 1 ships `agent.list` (reads from SQLite) and stubs - * `agent.run/stop/send` with unimplemented errors. Full lifecycle wiring - * lands in Phase 4 when adapters move behind the daemon. + * Agent RPCs — Phase 4 wiring. + * - `agent.list`: reads from SQLite `agents`. + * - `agent.run` / `agent.stop` / `agent.send`: delegate to `AgentRuntime`. + * - `agent.attach`: returns the most recent events for an agent and lets the + * caller subscribe to `agent:` for live updates. */ export const agentList: RpcHandler = (_params, ctx) => { @@ -38,12 +46,49 @@ export const agentList: RpcHandler = (_params, ctx) => { }; }; -const unimplemented: RpcHandler = () => { - const err = new Error("agent lifecycle lands in Phase 4") as Error & { code: string }; - err.code = "unimplemented"; - throw err; +export const agentRun: RpcHandler = async (raw, ctx) => { + const params = AgentRunParams.parse(raw); + const agentId = await ctx.runtime.run({ + profile: params.profile, + prompt: params.prompt, + cwd: params.cwd, + worktree: params.worktree ?? null, + launchMode: params.launchMode, + }); + return { agentId }; }; -export const agentRun = unimplemented; -export const agentStop = unimplemented; -export const agentSend = unimplemented; +export const agentStop: RpcHandler = async (raw, ctx) => { + const { agentId } = AgentStopParams.parse(raw); + await ctx.runtime.stop(agentId); + return { ok: true as const }; +}; + +export const agentSend: RpcHandler = async (raw, ctx) => { + const { agentId, text } = AgentSendParams.parse(raw); + await ctx.runtime.send(agentId, text); + return { ok: true as const }; +}; + +export const agentAttach: RpcHandler = (raw, ctx) => { + const params = AgentAttachParams.parse(raw); + const events = ctx.runtime.replay( + params.agentId, + params.sinceEpoch, + params.sinceSeq, + params.limit ?? 200, + ); + const row = ctx.db + .prepare("SELECT status FROM agents WHERE id = ?") + .get(params.agentId) as { status: string } | undefined; + if (!row) { + const err = new Error(`agent not found: ${params.agentId}`) as Error & { code: string }; + err.code = "not_found"; + throw err; + } + return { + agentId: params.agentId, + status: row.status, + events, + }; +}; diff --git a/packages/daemon/src/rpc/index.ts b/packages/daemon/src/rpc/index.ts index 5341c04..b66cac3 100644 --- a/packages/daemon/src/rpc/index.ts +++ b/packages/daemon/src/rpc/index.ts @@ -2,7 +2,7 @@ import { Methods } from "@axiom-labs/arc-client"; import { authLogin } from "./auth.js"; import { healthGet } from "./health.js"; import { profileGet, profileList } from "./profile.js"; -import { agentList, agentRun, agentSend, agentStop } from "./agent.js"; +import { agentAttach, agentList, agentRun, agentSend, agentStop } from "./agent.js"; import type { RpcHandler } from "./types.js"; /** Methods that do NOT require an authenticated session. */ @@ -17,6 +17,7 @@ export const handlers: Record = { [Methods.agent_run]: agentRun, [Methods.agent_stop]: agentStop, [Methods.agent_send]: agentSend, + [Methods.agent_attach]: agentAttach, }; export type { RpcContext } from "./types.js"; diff --git a/packages/daemon/src/rpc/types.ts b/packages/daemon/src/rpc/types.ts index 419d8ea..c4562eb 100644 --- a/packages/daemon/src/rpc/types.ts +++ b/packages/daemon/src/rpc/types.ts @@ -3,6 +3,7 @@ import type { Logger } from "../logger.js"; import type { Session } from "../ws/session.js"; import type { Hub } from "../hub.js"; import type { AuthFile } from "../auth.js"; +import type { AgentRuntime } from "../agent-runtime.js"; export interface RpcContext { db: DB; @@ -10,6 +11,7 @@ export interface RpcContext { hub: Hub; auth: AuthFile; session: Session; + runtime: AgentRuntime; /** Daemon start time — for uptime reporting. */ startedAt: number; /** Daemon version string. */ diff --git a/packages/daemon/src/server.ts b/packages/daemon/src/server.ts index 8b5f40d..83a1ae3 100644 --- a/packages/daemon/src/server.ts +++ b/packages/daemon/src/server.ts @@ -8,6 +8,7 @@ import type { DB } from "./db.js"; import type { Logger } from "./logger.js"; import type { Hub } from "./hub.js"; import type { AuthFile } from "./auth.js"; +import type { AgentRuntime } from "./agent-runtime.js"; import { createWsConnection } from "./ws/connection.js"; import { Session, receiveBinary } from "./ws/session.js"; import { dispatchEnvelope } from "./router.js"; @@ -19,6 +20,7 @@ export interface ServerDeps { logger: Logger; hub: Hub; auth: AuthFile; + runtime: AgentRuntime; version: string; startedAt: number; } @@ -91,7 +93,7 @@ function handleUpgrade( deps: ServerDeps, sessions: Set, ): void { - const { config, logger, db, hub, auth, version, startedAt } = deps; + const { config, logger, db, hub, auth, runtime, version, startedAt } = deps; if (!isAllowedHost(req.headers.host, config.port)) { socket.write("HTTP/1.1 403 Forbidden\r\n\r\n"); @@ -142,6 +144,7 @@ function handleUpgrade( logger, hub, auth, + runtime, version, startedAt, host: config.host, diff --git a/packages/daemon/src/terminal-frame.ts b/packages/daemon/src/terminal-frame.ts new file mode 100644 index 0000000..6b2428e --- /dev/null +++ b/packages/daemon/src/terminal-frame.ts @@ -0,0 +1,27 @@ +/** + * Terminal-channel payload format shared between the WS session layer and + * the agent runtime. Channel-1 frames are prefixed with a 36-byte ASCII + * UUID identifying the source agent, followed by raw byte payload. + */ + +export const TERMINAL_ID_LEN = 36; + +export function encodeTerminalPayload(agentId: string, bytes: Uint8Array): Uint8Array { + const id = agentId.length === TERMINAL_ID_LEN + ? agentId + : agentId.padEnd(TERMINAL_ID_LEN, " ").slice(0, TERMINAL_ID_LEN); + const idBytes = new TextEncoder().encode(id); + const out = new Uint8Array(TERMINAL_ID_LEN + bytes.length); + out.set(idBytes, 0); + out.set(bytes, TERMINAL_ID_LEN); + return out; +} + +export function decodeTerminalPayload(buf: Uint8Array): { agentId: string; bytes: Uint8Array } { + if (buf.length < TERMINAL_ID_LEN) { + return { agentId: "", bytes: buf }; + } + const agentId = new TextDecoder().decode(buf.subarray(0, TERMINAL_ID_LEN)); + const bytes = buf.subarray(TERMINAL_ID_LEN); + return { agentId, bytes }; +} diff --git a/packages/daemon/src/ws/session.ts b/packages/daemon/src/ws/session.ts index 1383f98..925a980 100644 --- a/packages/daemon/src/ws/session.ts +++ b/packages/daemon/src/ws/session.ts @@ -10,6 +10,7 @@ import { type ChannelId, type Envelope as EnvelopeT, } from "@axiom-labs/arc-client"; +import { encodeTerminalPayload } from "../terminal-frame.js"; export type EventSink = (envelope: EnvelopeT) => void; @@ -28,8 +29,14 @@ export class Session { this.conn.send("binary", encodeControl(envelope)); } - sendTerminal(bytes: Uint8Array): void { - this.conn.send("binary", encodeFrame({ channel: Channel.Terminal, flags: 0, payload: bytes })); + /** + * Send raw terminal bytes on channel 1. The payload is prefixed with a + * 36-byte agent UUID so multiplexed streams can be routed per-agent + * client-side. + */ + sendTerminal(agentId: string, bytes: Uint8Array): void { + const payload = encodeTerminalPayload(agentId, bytes); + this.conn.send("binary", encodeFrame({ channel: Channel.Terminal, flags: 0, payload })); } close(code = 1000): void { diff --git a/packages/daemon/tests/agent-lifecycle.test.ts b/packages/daemon/tests/agent-lifecycle.test.ts new file mode 100644 index 0000000..30c4ddb --- /dev/null +++ b/packages/daemon/tests/agent-lifecycle.test.ts @@ -0,0 +1,133 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { startDaemon, type DaemonHandle } from "../src/index.js"; +import { ArcClient } from "@axiom-labs/arc-client"; + +interface TestCtx { + tmp: string; + port: number; + handle: DaemonHandle & { stop: () => Promise }; + client: ArcClient; +} + +describe("agent lifecycle", () => { + let ctx: TestCtx | null = null; + + beforeEach(async () => { + const tmp = fs.mkdtempSync(path.join(os.tmpdir(), "arc-agent-test-")); + process.env["ARC_DIR"] = tmp; + const port = 18100 + Math.floor(Math.random() * 800); + const handle = await startDaemon({ port, version: "1.0.0-alpha.0-test" }); + const authFile = JSON.parse(fs.readFileSync(path.join(tmp, "auth.json"), "utf8")) as { + rootToken: string; + }; + const client = new ArcClient({ + url: `ws://127.0.0.1:${port}`, + token: authFile.rootToken, + noReconnect: true, + }); + await client.connect(); + ctx = { tmp, port, handle, client }; + }); + + afterEach(async () => { + if (!ctx) return; + await ctx.client.close(); + await ctx.handle.stop(); + fs.rmSync(ctx.tmp, { recursive: true, force: true }); + ctx = null; + }); + + it("runs, streams terminal bytes, persists events, and exits cleanly", async () => { + const events: Array<{ kind: string; payload?: Record }> = []; + let terminalBytes = Buffer.alloc(0); + + const prompt = "hello from agent lifecycle test"; + const { agentId } = await ctx!.client.agents.run({ profile: "_echo", prompt }); + expect(agentId).toMatch(/^[0-9a-f-]{36}$/); + + const attached = await ctx!.client.agents.attach(agentId, { + onEvent: (e) => events.push({ kind: e.kind, payload: e.payload }), + onTerminal: (_id, bytes) => { + terminalBytes = Buffer.concat([terminalBytes, Buffer.from(bytes)]); + }, + }); + expect(attached.initial.agentId).toBe(agentId); + + // Wait for the child to exit — `_echo` reads stdin, writes it, exits 0. + await waitFor(() => events.some((e) => e.kind === "exit"), 5000); + + // The list should now have our agent with a terminal status. + const list = await ctx!.client.agents.list(); + const row = list.agents.find((a) => a.id === agentId); + expect(row).toBeDefined(); + expect(["completed", "failed"]).toContain(row!.status); + + // Terminal bytes should contain the echoed prompt. + const terminalText = terminalBytes.toString("utf8"); + expect(terminalText).toContain(prompt); + + // Combine the initial replay with the live events — early spawn may have + // fired before `attach` returned, so it lands in `initial.events`. + const allKinds = [...attached.initial.events.map((e) => e.kind), ...events.map((e) => e.kind)]; + expect(allKinds).toContain("spawn"); + expect(allKinds).toContain("exit"); + + await attached.dispose(); + }); + + it("agent.attach replays persisted events after exit", async () => { + const { agentId } = await ctx!.client.agents.run({ profile: "_echo", prompt: "replay test" }); + // Wait for exit via polling the list status. + await waitFor(async () => { + const list = await ctx!.client.agents.list(); + const row = list.agents.find((a) => a.id === agentId); + return !!row && (row.status === "completed" || row.status === "failed"); + }, 5000); + + const { initial } = await ctx!.client.agents.attach(agentId); + expect(initial.agentId).toBe(agentId); + expect(initial.events.length).toBeGreaterThan(0); + const exit = initial.events.find((e) => e.kind === "exit"); + expect(exit).toBeDefined(); + }); + + it("agent.stop terminates a long-running process", async () => { + // Spawn a long-lived child via a generic _echo profile that won't self-exit: + // Use agent.run with a _sleep profile (fallback profile stays alive? no — use _echo but + // don't send prompt => stdin stays open => process waits for stdin close). + // Instead we'll run a command via the builtin _echo which waits on stdin; if we don't + // pass a prompt, the runtime does NOT close stdin, so the child stays alive. + const { agentId } = await ctx!.client.agents.run({ profile: "_echo" }); + // Give it a moment to move to running. + await new Promise((r) => setTimeout(r, 150)); + + const before = await ctx!.client.agents.list(); + const row = before.agents.find((a) => a.id === agentId); + expect(row?.status === "running" || row?.status === "starting").toBe(true); + + const stopRes = await ctx!.client.agents.stop({ agentId }); + expect(stopRes.ok).toBe(true); + + await waitFor(async () => { + const list = await ctx!.client.agents.list(); + const r = list.agents.find((a) => a.id === agentId); + return !!r && (r.status === "stopped" || r.status === "completed" || r.status === "failed"); + }, 5000); + }); +}); + +async function waitFor( + predicate: () => boolean | Promise, + timeoutMs: number, +): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + const res = await predicate(); + if (res) return; + await new Promise((r) => setTimeout(r, 50)); + } + throw new Error(`waitFor timed out after ${timeoutMs}ms`); +}