Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/runtime/moon.pkg
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import {
"choir/src/sys",
"choir/src/exec",
"choir/src/workspace",
"choir/src/types",
"choir/src/config",
"choir/src/phase",
Expand Down
55 changes: 42 additions & 13 deletions src/runtime/pane_watch.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ pub(all) struct PaneWatchHost {
}

///|
async fn native_dump_pane_screen(
_session : String,
_pane_id : String,
) -> String {
@async.sleep(0)
""
async fn native_dump_pane_screen(session : String, pane_id : String) -> String {
let (status, output) = @exec.capture_command_stdout(
@workspace.zellij_dump_screen_command(session, pane_id),
)
if status == 0 {
output
} else {
""
}
}

///|
Expand All @@ -29,7 +32,7 @@ pub(all) struct WatchObservation {
///|
pub(all) struct WatchState {
session : String
pane_id : String
pane_id : String?
mut observation : WatchObservation
}

Expand Down Expand Up @@ -61,11 +64,18 @@ pub fn PaneWatcher::watch_pane(
agent_id : String,
target : @types.LocalTarget,
now : Int64,
pane_id? : String = "",
pane_id? : String? = None,
) -> Unit {
let resolved = pane_id.trim().to_owned()
if resolved == "" {
return
let resolved = match pane_id {
Some(value) => {
let trimmed = value.trim().to_owned()
if trimmed == "" {
None
} else {
Some(trimmed)
}
}
None => None
}
self.watches.set(agent_id, {
session: target.session,
Expand All @@ -74,6 +84,17 @@ pub fn PaneWatcher::watch_pane(
})
}

///|
pub fn PaneWatcher::pane_id_for_agent(
self : PaneWatcher,
agent_id : String,
) -> String? {
match self.watches.get(agent_id) {
Some(watch) => watch.pane_id
None => None
}
}

///|
pub fn PaneWatcher::stop_subscribe(
self : PaneWatcher,
Expand All @@ -91,7 +112,11 @@ pub async fn PaneWatcher::current_screen(
Some(watch) => watch
None => return None
}
let screen = (self.host.dump_pane_screen)(watch.session, watch.pane_id)
let pane_id = match watch.pane_id {
Some(value) => value
None => return None
}
let screen = (self.host.dump_pane_screen)(watch.session, pane_id)
if screen.trim().to_owned() == "" {
None
} else {
Expand Down Expand Up @@ -143,7 +168,11 @@ pub async fn PaneWatcher::observe_idle(
Some(watch) => watch
None => return None
}
let screen = (self.host.dump_pane_screen)(watch.session, watch.pane_id)
let pane_id = match watch.pane_id {
Some(value) => value
None => return None
}
let screen = (self.host.dump_pane_screen)(watch.session, pane_id)
if screen.trim().to_owned() == "" {
return None
}
Expand Down
16 changes: 11 additions & 5 deletions src/runtime/pane_watch_test.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async test "PaneWatcher::observe_idle dumps the resolved pane id through host sp
pane_watch_dump_host(["worker ready"], dump_args),
)

pw.watch_pane("agent-x", pane_watch_target(), 0L, pane_id="terminal_4")
pw.watch_pane("agent-x", pane_watch_target(), 0L, pane_id=Some("terminal_4"))

assert_true(pw.observe_idle("agent-x", 10L) is None)
@debug.assert_eq(dump_args, ["sess", "terminal_4"])
Expand All @@ -62,7 +62,12 @@ async test "PaneWatcher::observe_idle reports unchanged dump-screen idle and res
),
)

pw.watch_pane("agent-idle", pane_watch_target(), 100L, pane_id="terminal_9")
pw.watch_pane(
"agent-idle",
pane_watch_target(),
100L,
pane_id=Some("terminal_9"),
)

assert_true(pw.observe_idle("agent-idle", 110L) is None)
match pw.observe_idle("agent-idle", 230L) {
Expand All @@ -86,15 +91,16 @@ async test "PaneWatcher::observe_idle reports unchanged dump-screen idle and res
}

///|
async test "PaneWatcher::watch_pane skips unresolved pane ids" {
async test "PaneWatcher::observe_idle skips unresolved pane ids" {
let dump_args : Array[String] = []
let pw = PaneWatcher::with_host(
pane_watch_dump_host(["should not be read"], dump_args),
)

pw.watch_pane("agent-missing", pane_watch_target(), 0L, pane_id="")
pw.watch_pane("agent-missing", pane_watch_target(), 0L, pane_id=None)

@debug.assert_eq(pw.watched_agents(), [])
@debug.assert_eq(pw.watched_agents(), ["agent-missing"])
@debug.assert_eq(pw.pane_id_for_agent("agent-missing"), None)
assert_true(pw.observe_idle("agent-missing", 999L) is None)
@debug.assert_eq(dump_args, [])
}
65 changes: 40 additions & 25 deletions src/server/handler_disconnect.mbt
Original file line number Diff line number Diff line change
@@ -1,21 +1,39 @@
///|
/// Register a pane for idle watchdog monitoring.
/// Reads the agent's persisted zellij pane id and records it for dump-screen
/// polling. Missing sidecars leave the agent unwatched.
pub fn ServerState::watch_pane(self : ServerState, agent_id : String) -> Unit {
/// Resolves the agent's live zellij pane id from the current list-panes
/// snapshot and records it for dump-screen polling. Missing tabs remain
/// tracked with no pane id so later poller ticks can retry resolution.
pub async fn ServerState::watch_pane(
self : ServerState,
agent_id : String,
) -> Unit {
let agent = self.registry.get(agent_id) catch { _ => return }
let target = match agent.terminal_target {
Some(t) => t
None => return
}
let pane_id = (self.read_file)(
@workspace.agent_pane_id_file_path(self.config.project_dir, agent_id),
)
.trim()
.to_owned()
let snapshot = (self.list_panes_snapshot_capture)(target.session)
let pane_id = match @tools.resolve_zellij_pane_id(target, snapshot) {
Some(value) => Some("terminal_" + value.to_string())
None => None
}
self.pane_watcher.watch_pane(agent_id, target, @poller.now_sec(), pane_id~)
}

///|
async fn ServerState::ensure_watched_pane_resolved(
self : ServerState,
agent_id : String,
) -> Bool {
match self.pane_watcher.pane_id_for_agent(agent_id) {
Some(_) => true
None => {
self.watch_pane(agent_id)
self.pane_watcher.pane_id_for_agent(agent_id) is Some(_)
}
}
}

///|
/// Stop the zellij subscribe process for an agent and clean up state.
fn ServerState::stop_subscribe(self : ServerState, agent_id : String) -> Unit {
Expand Down Expand Up @@ -120,11 +138,6 @@ fn ServerState::clear_agent_runtime_tracking(
agent_id : String,
) -> Unit {
self.stop_subscribe(agent_id)
ignore(
(self.delete_file)(
@workspace.agent_pane_id_file_path(self.config.project_dir, agent_id),
),
)
self.pending_disconnects.remove(agent_id)
self.agent_pids.remove(agent_id)
self.disconnect_contexts.remove(agent_id)
Expand Down Expand Up @@ -1117,6 +1130,9 @@ pub async fn ServerState::check_idle_agents(
Some(_) => ()
None => continue
}
if !self.ensure_watched_pane_resolved(agent_id) {
continue
}

// Read and parse the pane snapshot once for all idle decisions this tick.
match self.pane_watcher.observe_idle(agent_id, now) {
Expand Down Expand Up @@ -1188,7 +1204,6 @@ async test "check_idle_agents skips supervisor roles before reading panes" {
"-" +
@env.now().reinterpret_as_int64().to_string()
ignore(@sys.create_dir_all(project_dir + "/.choir/kv"))
ignore(@sys.create_dir_all(project_dir + "/.choir/pane-ids"))
let reads : Array[String] = []
let host : @runtime.PaneWatchHost = {
dump_pane_screen: async fn(_session, _pane_id) {
Expand All @@ -1210,6 +1225,11 @@ async test "check_idle_agents skips supervisor roles before reading panes" {
@types.Role::TL => "main.tl"
_ => "main.supervisor"
}
let target : @types.LocalTarget = {
kind: @types.TargetKind::ZellijPane,
session: "s",
name: agent_id,
}
ignore(
state.register_agent({
agent_id: @types.AgentId(agent_id),
Expand All @@ -1218,23 +1238,18 @@ async test "check_idle_agents skips supervisor roles before reading panes" {
parent: None,
workspace: project_dir,
branch: "main",
terminal_target: Some({
kind: @types.TargetKind::ZellijPane,
session: "s",
name: agent_id,
}),
terminal_target: Some(target),
host: @types.Host::Local,
spawn_depth: 0,
state: @types.AgentState::Active,
}),
)
ignore(
@sys.write_file_sync(
@workspace.agent_pane_id_file_path(project_dir, agent_id),
"terminal_" + agent_id.length().to_string(),
),
state.pane_watcher.watch_pane(
agent_id,
target,
0L,
pane_id=Some("terminal_" + agent_id.length().to_string()),
)
state.watch_pane(agent_id)
}
let calls : Array[@workspace.Command] = []
state.check_idle_agents(1000L, 0L, async fn(cmds) {
Expand Down
Loading
Loading