From 71550528e98deb245dc5f4f103d6597a25296f12 Mon Sep 17 00:00:00 2001 From: Justin <9146678+brickfrog@users.noreply.github.com> Date: Sat, 13 Jun 2026 06:25:59 -0500 Subject: [PATCH 1/2] test: add failing tests for pane-watcher revival --- src/config/config.mbt | 1 + src/config/config_test.mbt | 29 ++--- src/runtime/pane_watch.mbt | 4 + src/runtime/pane_watch_test.mbt | 180 +++++++++++++---------------- src/server/handler_disconnect.mbt | 4 + src/server/handler_test.mbt | 99 +++++++++++++++- src/server/worker_handoff_test.mbt | 24 ++-- src/types/config.mbt | 3 + src/types/config_schema_test.mbt | 4 +- src/workspace/launch.mbt | 8 ++ src/workspace/multiplexer.mbt | 8 ++ src/workspace/workspace_test.mbt | 43 +++++++ 12 files changed, 269 insertions(+), 138 deletions(-) diff --git a/src/config/config.mbt b/src/config/config.mbt index 66c18975..56ceb21f 100644 --- a/src/config/config.mbt +++ b/src/config/config.mbt @@ -677,6 +677,7 @@ pub fn parse_config( moon_verify_target, agy_migration_incomplete, hooks_enforcement, + worker_no_handoff_idle_sec: default_cfg.worker_no_handoff_idle_sec, worker_no_handoff_prod_sec, worker_no_handoff_escalate_sec, poller_heartbeat_tick_sec, diff --git a/src/config/config_test.mbt b/src/config/config_test.mbt index 6af9dfce..47df259a 100644 --- a/src/config/config_test.mbt +++ b/src/config/config_test.mbt @@ -13,8 +13,7 @@ test "parse_config - full" { #|companions_json = '[{"name":"monitor","role":"worker","command":"claude","task":"Monitor the build"}]' #|plugins_json = '[{"name":"plugin","command":"./plugin","tools":"search"}]' #|hooks_enforcement = true - #|worker_no_handoff_prod_sec = 90 - #|worker_no_handoff_escalate_sec = 45 + #|worker_no_handoff_idle_sec = 90 #|poller_heartbeat_tick_sec = 10 #|prenotify_checks = ["moon check --target native", "CHOIR_TEST_NO_EXEC=1 moon test --target native", "CHOIR_TEST_REAL_EXEC=1 moon test --target native src/exec", "moon run src/bin/choir_lint --target native"] #|moon_verify_target = "js" @@ -53,8 +52,7 @@ test "parse_config - full" { "[{\"name\":\"plugin\",\"command\":\"./plugin\",\"tools\":\"search\"}]", ), ) - assert_eq(cfg.worker_no_handoff_prod_sec, 90L) - assert_eq(cfg.worker_no_handoff_escalate_sec, 45L) + assert_eq(cfg.worker_no_handoff_idle_sec, 90L) assert_eq(cfg.hooks_enforcement, true) assert_eq(cfg.poller_heartbeat_tick_sec, 10) @debug.assert_eq(cfg.prenotify_checks, [ @@ -149,8 +147,7 @@ test "parse_config - minimal" { assert_eq(cfg.automerge, false) assert_eq(cfg.review_mode, @types.ReviewMode::SinglePass) assert_eq(cfg.merge_strategy, @types.MergeStrategy::Merge) - assert_eq(cfg.worker_no_handoff_prod_sec, 120L) - assert_eq(cfg.worker_no_handoff_escalate_sec, 120L) + assert_eq(cfg.worker_no_handoff_idle_sec, 120L) assert_eq(cfg.hooks_enforcement, false) assert_eq(cfg.poller_heartbeat_tick_sec, 60) @debug.assert_eq(cfg.prenotify_checks, []) @@ -336,32 +333,30 @@ test "parse_config pr_policy reviewer explicit copilot" { } ///| -test "parse_config worker no-handoff knobs accept zero disable" { +test "parse_config worker no-handoff idle knob accepts zero disable" { let cfg = parse_config( ( - #|worker_no_handoff_prod_sec = 0 - #|worker_no_handoff_escalate_sec = 0 + #|worker_no_handoff_idle_sec = 0 #|poller_heartbeat_tick_sec = 0 ), ).unwrap() - assert_eq(cfg.worker_no_handoff_prod_sec, 0L) - assert_eq(cfg.worker_no_handoff_escalate_sec, 0L) + assert_eq(cfg.worker_no_handoff_idle_sec, 0L) assert_eq(cfg.poller_heartbeat_tick_sec, 0) } ///| -test "parse_config worker no-handoff knobs reject negatives" { - match parse_config("worker_no_handoff_prod_sec = -1") { +test "parse_config worker no-handoff idle knob rejects negatives" { + match parse_config("worker_no_handoff_idle_sec = -1") { Err(e) => assert_true(e.message().contains("expected >= 0")) - Ok(_) => fail("expected negative prod config to fail") + Ok(_) => fail("expected negative idle config to fail") } } ///| -test "parse_config worker no-handoff knobs reject fractional values" { - match parse_config("worker_no_handoff_escalate_sec = 1.5") { +test "parse_config worker no-handoff idle knob rejects fractional values" { + match parse_config("worker_no_handoff_idle_sec = 1.5") { Err(e) => assert_true(e.message().contains("expected integer")) - Ok(_) => fail("expected fractional escalate config to fail") + Ok(_) => fail("expected fractional idle config to fail") } } diff --git a/src/runtime/pane_watch.mbt b/src/runtime/pane_watch.mbt index eb7e5297..b2280d9c 100644 --- a/src/runtime/pane_watch.mbt +++ b/src/runtime/pane_watch.mbt @@ -11,6 +11,7 @@ pub(all) struct PaneWatchHost { kill_pid_best_effort : (Int) -> Unit delete_file_sync : (String) -> Unit read_file : (String) -> String + dump_pane_screen : (String, String) -> String } ///| @@ -20,6 +21,7 @@ pub fn PaneWatchHost::native() -> PaneWatchHost { kill_pid_best_effort: @sys.kill_pid_best_effort, delete_file_sync: fn(path) { ignore(@sys.delete_file_sync(path)) }, read_file: @sys.read_file, + dump_pane_screen: fn(_, _) { "" }, } } @@ -66,7 +68,9 @@ pub fn PaneWatcher::watch_pane( agent_id : String, target : @types.LocalTarget, now : Int64, + pane_id? : String = "", ) -> Unit { + ignore(pane_id) let snapshot_path = pane_snapshot_path(agent_id) let pid = (self.host.spawn_zellij_pane_viewport_subscribe)( target.session, diff --git a/src/runtime/pane_watch_test.mbt b/src/runtime/pane_watch_test.mbt index 388a77b5..13072ace 100644 --- a/src/runtime/pane_watch_test.mbt +++ b/src/runtime/pane_watch_test.mbt @@ -1,13 +1,39 @@ ///| -test "pane_snapshot_path" { - assert_eq(pane_snapshot_path("abc"), "/tmp/choir-viewport-abc.json") +fn pane_watch_target() -> @types.LocalTarget { + @types.LocalTarget::{ + kind: @types.TargetKind::ZellijTab, + session: "sess", + name: "tab1", + } } ///| -test "parse_int_simple" { - assert_eq(parse_int_simple("123"), 123) - assert_eq(parse_int_simple("0"), 0) - assert_eq(parse_int_simple("abc123def456"), 123456) +fn pane_watch_dump_host( + screens : Array[String], + dump_args : Array[String], +) -> PaneWatchHost { + let mut dump_count = 0 + { + spawn_zellij_pane_viewport_subscribe: fn(_, _, _) { 101 }, + kill_pid_best_effort: fn(_) { () }, + delete_file_sync: fn(_) { () }, + read_file: fn(_) { "" }, + dump_pane_screen: fn(session, pane_id) { + dump_args.push(session) + dump_args.push(pane_id) + if screens.length() == 0 { + "" + } else { + let idx = if dump_count < screens.length() { + dump_count + } else { + screens.length() - 1 + } + dump_count = dump_count + 1 + screens[idx] + } + }, + } } ///| @@ -17,112 +43,62 @@ test "PaneWatcher::new" { } ///| -test "PaneWatcher::with_host routes subscribe and cleanup through injected host (spies)" { - let spawn_args : Array[String] = [] - let killed_pids : Array[Int] = [] - let deleted_paths : Array[String] = [] - let read_paths : Array[String] = [] - let host : PaneWatchHost = { - spawn_zellij_pane_viewport_subscribe: fn(session, name, path) { - spawn_args.push(session) - spawn_args.push(name) - spawn_args.push(path) - 99 - }, - kill_pid_best_effort: fn(pid) { killed_pids.push(pid) }, - delete_file_sync: fn(path) { deleted_paths.push(path) }, - read_file: fn(path) { - read_paths.push(path) - "{\"event\":\"pane_update\",\"viewport\":[\"hi\"]}" - }, - } - let pw = PaneWatcher::with_host(host) - let target = @types.LocalTarget::{ - kind: @types.TargetKind::ZellijTab, - session: "sess", - name: "tab1", - } - let snap = pane_snapshot_path("agent-x") - pw.watch_pane("agent-x", target, 0L) - @debug.assert_eq(pw.watches.get("agent-x").map(fn(w) { w.pid }), Some(99)) - @debug.assert_eq(spawn_args, ["sess", "tab1", snap]) - assert_eq( - pw.read_viewport("agent-x"), - "{\"event\":\"pane_update\",\"viewport\":[\"hi\"]}", +test "PaneWatcher::observe_idle dumps the resolved pane id through host spy" { + let dump_args : Array[String] = [] + let pw = PaneWatcher::with_host( + pane_watch_dump_host(["worker ready"], dump_args), ) - @debug.assert_eq(read_paths, [snap]) - pw.stop_subscribe("agent-x") - @debug.assert_eq(pw.watches.get("agent-x").map(fn(w) { w.pid }), None) - @debug.assert_eq(killed_pids, [99]) - @debug.assert_eq(deleted_paths, [snap]) -} -///| -test "PaneWatcher::stop_subscribe followed by check_idle does not double-kill or false-idle" { - let killed_pids : Array[Int] = [] - let deleted_paths : Array[String] = [] - let read_paths : Array[String] = [] - let host : PaneWatchHost = { - spawn_zellij_pane_viewport_subscribe: fn(_, _, _) { 123 }, - kill_pid_best_effort: fn(pid) { killed_pids.push(pid) }, - delete_file_sync: fn(path) { deleted_paths.push(path) }, - read_file: fn(path) { - read_paths.push(path) - "{\"event\":\"pane_update\",\"viewport\":[\"stale\"]}" - }, - } - let pw = PaneWatcher::with_host(host) - let target = @types.LocalTarget::{ - kind: @types.TargetKind::ZellijTab, - session: "sess", - name: "tab1", - } - pw.watch_pane("agent-stop", target, 0L) - pw.stop_subscribe("agent-stop") + pw.watch_pane("agent-x", pane_watch_target(), 0L, pane_id="terminal_4") - @debug.assert_eq(pw.check_idle("agent-stop", 999L, 0L), None) - @debug.assert_eq(pw.watches.get("agent-stop").map(fn(w) { w.pid }), None) - @debug.assert_eq(killed_pids, [123]) - @debug.assert_eq(deleted_paths, [pane_snapshot_path("agent-stop")]) - @debug.assert_eq(read_paths, []) + assert_true(pw.observe_idle("agent-x", 10L) is None) + @debug.assert_eq(dump_args, ["sess", "terminal_4"]) } ///| -test "PaneWatcher::record_observation updates all fields atomically" { - let read_paths : Array[String] = [] - let host : PaneWatchHost = { - spawn_zellij_pane_viewport_subscribe: fn(_, _, _) { 456 }, - kill_pid_best_effort: fn(_) { () }, - delete_file_sync: fn(_) { () }, - read_file: fn(path) { - read_paths.push(path) - "{\"event\":\"pane_update\",\"viewport\":[\"ready\"]}" - }, - } - let pw = PaneWatcher::with_host(host) - let target = @types.LocalTarget::{ - kind: @types.TargetKind::ZellijTab, - session: "sess", - name: "tab1", - } - pw.watch_pane("agent-observe", target, 10L) +test "PaneWatcher::observe_idle reports unchanged dump-screen idle and resets on change" { + let dump_args : Array[String] = [] + let pw = PaneWatcher::with_host( + pane_watch_dump_host( + ["same screen", "same screen", "new screen", "new screen"], + dump_args, + ), + ) - @debug.assert_eq(pw.check_idle("agent-observe", 42L, 300L), None) + pw.watch_pane("agent-idle", pane_watch_target(), 100L, pane_id="terminal_9") - let screen = "ready\n" - let watch = match pw.watches.get("agent-observe") { - Some(watch) => watch - None => fail("expected pane watch") + assert_true(pw.observe_idle("agent-idle", 110L) is None) + match pw.observe_idle("agent-idle", 230L) { + Some(obs) => { + assert_eq(obs.screen, "same screen") + assert_eq(obs.idle_sec, 120L) + } + None => fail("expected unchanged dump-screen to report idle") } - let observation = watch.observation - assert_eq(observation.last_change_at, 42L) - assert_eq(observation.last_hash, simple_hash(screen)) - assert_eq(observation.last_len, screen.length()) - @debug.assert_eq(read_paths, [pane_snapshot_path("agent-observe")]) + assert_true(pw.observe_idle("agent-idle", 240L) is None) + match pw.observe_idle("agent-idle", 360L) { + Some(obs) => { + assert_eq(obs.screen, "new screen") + assert_eq(obs.idle_sec, 120L) + } + None => fail("expected unchanged new dump-screen to report idle") + } + @debug.assert_eq(dump_args, [ + "sess", "terminal_9", "sess", "terminal_9", "sess", "terminal_9", "sess", + "terminal_9", + ]) } ///| -test "extract_viewport_text parses viewport array" { - let j = "{\"event\":\"pane_update\",\"viewport\":[\"line one\",\"line two\"]}" - assert_eq(extract_viewport_text(j), "line one\nline two\n") +test "PaneWatcher::watch_pane skips unresolved pane ids" { + let dump_args : Array[String] = [] + let pw = PaneWatcher::with_host( + pane_watch_dump_host(["should not be read"], dump_args), + ) + + pw.watch_pane("agent-missing", pane_watch_target(), 0L, pane_id="") + + @debug.assert_eq(pw.watched_agents(), []) + assert_true(pw.observe_idle("agent-missing", 999L) is None) + @debug.assert_eq(dump_args, []) } diff --git a/src/server/handler_disconnect.mbt b/src/server/handler_disconnect.mbt index 42e77018..fa04d31b 100644 --- a/src/server/handler_disconnect.mbt +++ b/src/server/handler_disconnect.mbt @@ -1282,6 +1282,10 @@ async test "check_idle_agents skips supervisor roles before reading panes" { reads.push(path) "{\"event\":\"pane_update\",\"viewport\":[\"idle\"]}" }, + dump_pane_screen: fn(_session, _pane_id) { + reads.push(_session + ":" + _pane_id) + "idle" + }, } let state = { ..ServerState::new( diff --git a/src/server/handler_test.mbt b/src/server/handler_test.mbt index 6bd8c601..9cd35291 100644 --- a/src/server/handler_test.mbt +++ b/src/server/handler_test.mbt @@ -7689,6 +7689,7 @@ async test "soft transport disconnect still escalates when idle watchdog expires kill_pid_best_effort: fn(_pid) { }, delete_file_sync: fn(_path) { }, read_file: fn(_path) { snapshot }, + dump_pane_screen: fn(_session, _pane_id) { "still thinking" }, } let state = ServerState::new( { @@ -8639,6 +8640,10 @@ fn worker_handoff_test_state_with_read_log( reads.push(path) snapshot }, + dump_pane_screen: fn(_session, _pane_id) { + reads.push(_session + ":" + _pane_id) + snapshot + }, } base.with_pane_watcher(@runtime.PaneWatcher::with_host(host)) } @@ -8667,6 +8672,19 @@ fn worker_handoff_test_state_with_snapshots( snapshots[idx] } }, + dump_pane_screen: fn(_session, _pane_id) { + if snapshots.length() == 0 { + "" + } else { + let idx = if read_count < snapshots.length() { + read_count + } else { + snapshots.length() - 1 + } + read_count = read_count + 1 + snapshots[idx] + } + }, } base.with_pane_watcher(@runtime.PaneWatcher::with_host(host)) } @@ -9901,7 +9919,7 @@ async test "check_idle_agents keeps pre-TDD and post-fix leaf watchdog state sep } ///| -async test "check_idle_agents prods worker then emits one stalled no-handoff event" { +async test "check_idle_agents emits worker stalled directly without terminal prod" { let snapshot = "{\"event\":\"pane_update\",\"viewport\":[\"audit complete\",\"Finding 1\"]}" let state = worker_handoff_test_state("worker-no-handoff-stall", snapshot) let parent_id = "main.parent" @@ -9932,12 +9950,13 @@ async test "check_idle_agents prods worker then emits one stalled no-handoff eve ) assert_eq( worker_handoff_count_sent(sent, "printed output is not the handoff"), - 1, + 0, ) - assert_true( - state.worker_no_handoff_prodded_at.contains( - "worker-no-handoff:" + worker_id, + assert_eq( + worker_handoff_count_sent( + sent, "[WORKER STALLED — no handoff] sarcasmotron", ), + 1, ) state.check_idle_agents( @@ -9962,6 +9981,76 @@ async test "check_idle_agents prods worker then emits one stalled no-handoff eve ) } +///| +async test "check_idle_agents resets worker no-handoff idle when pane changes" { + let state = worker_handoff_test_state_with_snapshots( + "worker-no-handoff-change-reset", + [ + "{\"event\":\"pane_update\",\"viewport\":[\"first tail\"]}", + "{\"event\":\"pane_update\",\"viewport\":[\"changed tail\"]}", + "{\"event\":\"pane_update\",\"viewport\":[\"changed tail\"]}", + "{\"event\":\"pane_update\",\"viewport\":[\"changed tail\"]}", + ], + ) + let parent_id = "main.parent" + let worker_id = "main.reset-worker" + ignore(state.register_agent(worker_handoff_parent(parent_id))) + ignore( + state.register_agent( + worker_handoff_worker(worker_id, parent_id, @types.AgentState::Active), + ), + ) + state.watch_pane(worker_id) + + let sent : Array[String] = [] + let start = @poller.now_sec() + state.check_idle_agents( + start, + 300L, + async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, + worker_no_handoff_prod_sec=120L, + worker_no_handoff_escalate_sec=120L, + ) + state.check_idle_agents( + start + 121L, + 300L, + async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, + worker_no_handoff_prod_sec=120L, + worker_no_handoff_escalate_sec=120L, + ) + assert_eq( + worker_handoff_count_sent( + sent, "[WORKER STALLED — no handoff] reset-worker", + ), + 0, + ) + + state.check_idle_agents( + start + 242L, + 300L, + async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, + worker_no_handoff_prod_sec=120L, + worker_no_handoff_escalate_sec=120L, + ) + state.check_idle_agents( + start + 250L, + 300L, + async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, + worker_no_handoff_prod_sec=120L, + worker_no_handoff_escalate_sec=120L, + ) + assert_eq( + worker_handoff_count_sent(sent, "printed output is not the handoff"), + 0, + ) + assert_eq( + worker_handoff_count_sent( + sent, "[WORKER STALLED — no handoff] reset-worker", + ), + 1, + ) +} + ///| async test "check_idle_agents reuses one pane snapshot for handoff and watchdog checks" { let snapshot = "{\"event\":\"pane_update\",\"viewport\":[\"done\",\"tail\"]}" diff --git a/src/server/worker_handoff_test.mbt b/src/server/worker_handoff_test.mbt index 8b67fef2..13a11cdb 100644 --- a/src/server/worker_handoff_test.mbt +++ b/src/server/worker_handoff_test.mbt @@ -1,5 +1,5 @@ ///| -test "worker_no_handoff_decision prods worker after threshold" { +test "worker_no_handoff_decision emits stalled directly after threshold" { assert_eq( worker_no_handoff_decision( @types.Role::Worker, @@ -10,23 +10,23 @@ test "worker_no_handoff_decision prods worker after threshold" { 120L, 120L, ), - WorkerNoHandoffDecision::Prod, + WorkerNoHandoffDecision::EmitStalled, ) } ///| -test "worker_no_handoff_decision emits stalled after prod grace" { +test "worker_no_handoff_decision skips once stalled was emitted" { assert_eq( worker_no_handoff_decision( @types.Role::Worker, 121L, false, - true, false, + true, 120L, 120L, ), - WorkerNoHandoffDecision::EmitStalled, + WorkerNoHandoffDecision::DoNothing, ) } @@ -57,7 +57,7 @@ test "worker_no_handoff_decision skips non-worker roles" { } ///| -test "worker_no_handoff_decision prod zero disables path" { +test "worker_no_handoff_decision idle threshold zero disables path" { assert_eq( worker_no_handoff_decision( @types.Role::Worker, @@ -85,18 +85,18 @@ test "worker_no_handoff_decision prod zero disables path" { } ///| -test "worker_no_handoff_decision escalate zero disables stalled event" { +test "worker_no_handoff_decision ignores removed escalate threshold" { assert_eq( worker_no_handoff_decision( @types.Role::Worker, 999L, false, - true, + false, false, 120L, 0L, ), - WorkerNoHandoffDecision::DoNothing, + WorkerNoHandoffDecision::EmitStalled, ) } @@ -286,7 +286,7 @@ test "leaf_no_handoff_decision truth table" { 120L, 120L, ), - WorkerNoHandoffDecision::Prod, + WorkerNoHandoffDecision::EmitStalled, ) assert_eq( leaf_no_handoff_decision( @@ -300,7 +300,7 @@ test "leaf_no_handoff_decision truth table" { 120L, 120L, ), - WorkerNoHandoffDecision::DoNothing, + WorkerNoHandoffDecision::EmitStalled, ) assert_eq( leaf_no_handoff_decision( @@ -328,7 +328,7 @@ test "leaf_no_handoff_decision truth table" { 120L, 0L, ), - WorkerNoHandoffDecision::DoNothing, + WorkerNoHandoffDecision::EmitStalled, ) assert_eq( leaf_no_handoff_decision( diff --git a/src/types/config.mbt b/src/types/config.mbt index 292a049e..5153c66a 100644 --- a/src/types/config.mbt +++ b/src/types/config.mbt @@ -100,6 +100,8 @@ pub(all) struct Config { agy_migration_incomplete : Bool /// When true, spawned hook-capable agents get task-contract boundary guard hooks. hooks_enforcement : Bool + /// Worker/leaf no-handoff idle threshold in seconds; 0 disables stalled event. + worker_no_handoff_idle_sec : Int64 /// Worker-only no-handoff prod threshold in seconds; 0 disables prod/stall. worker_no_handoff_prod_sec : Int64 /// Worker-only no-handoff stalled-event grace after prod in seconds; 0 disables stalled event. @@ -196,6 +198,7 @@ pub fn Config::default() -> Config { moon_verify_target: Native, agy_migration_incomplete: false, hooks_enforcement: false, + worker_no_handoff_idle_sec: 0L, worker_no_handoff_prod_sec: 120L, worker_no_handoff_escalate_sec: 120L, poller_heartbeat_tick_sec: 60, diff --git a/src/types/config_schema_test.mbt b/src/types/config_schema_test.mbt index 1b295cd8..eb6ab8ac 100644 --- a/src/types/config_schema_test.mbt +++ b/src/types/config_schema_test.mbt @@ -18,8 +18,8 @@ fn expected_config_schema_keys() -> Array[String] { "default_role", "project_dir", "shell_command", "listen_uds", "listen_tcp", "tcp_auth_token", "otlp_endpoint", "agent_id", "extra_mcp_servers_json", "companions_json", "plugins_json", "terminal_session", "automerge", "review_mode", "merge_strategy", "moon_verify_target", - "agy_migration_incomplete", "hooks_enforcement", "worker_no_handoff_prod_sec", - "worker_no_handoff_escalate_sec", "poller_heartbeat_tick_sec", "prenotify_checks", + "agy_migration_incomplete", "hooks_enforcement", "worker_no_handoff_idle_sec", + "poller_heartbeat_tick_sec", "prenotify_checks", "auto_pull_after_integration", "project.verify_cmd", "pr_policy.require_integration_audit", "pr_policy.skip_copilot_on_feature_branch", "pr_policy.reviewer", "auto_rebuild_after_pull", "auto_reload_after_rebuild", "pricing..rate_in", "pricing..rate_out", diff --git a/src/workspace/launch.mbt b/src/workspace/launch.mbt index 4eac26d3..11c46d12 100644 --- a/src/workspace/launch.mbt +++ b/src/workspace/launch.mbt @@ -57,6 +57,14 @@ pub fn agent_pid_file_path(project_dir : String, agent_id : String) -> String { project_dir + "/.choir/pids/" + @types.sanitize_agent_id(agent_id) } +///| +pub fn agent_pane_id_file_path( + _project_dir : String, + _agent_id : String, +) -> String { + "" +} + ///| /// Pure parse of an agent pidfile body (see [`agent_pid_file_path`]): the /// recorded setsid process-group id. Returns `None` for missing/empty, diff --git a/src/workspace/multiplexer.mbt b/src/workspace/multiplexer.mbt index d89d8f42..76afad22 100644 --- a/src/workspace/multiplexer.mbt +++ b/src/workspace/multiplexer.mbt @@ -152,6 +152,14 @@ pub fn zellij_list_panes_json_command(session : String) -> Command { } } +///| +pub fn pane_id_from_list_panes_diff( + _before : String, + _after : String, +) -> String? { + None +} + ///| /// Layout loaded into choir-managed zellij sessions. Opt-in: only loads when /// the layout file is present under the user's zellij config dir diff --git a/src/workspace/workspace_test.mbt b/src/workspace/workspace_test.mbt index b188bb66..e667c599 100644 --- a/src/workspace/workspace_test.mbt +++ b/src/workspace/workspace_test.mbt @@ -425,6 +425,49 @@ test "agent_pid_file_path sanitizes agent ids as flat filenames" { ) } +///| +test "agent_pane_id_file_path mirrors pid sidecar sanitization" { + assert_eq( + @workspace.agent_pane_id_file_path( + "/project", "../../evil\\agent\u0000pane", + ), + "/project/.choir/pane-ids/.._.._evil_agent_pane", + ) +} + +///| +test "pane_id_from_list_panes_diff returns single new terminal pane id" { + let before = "PANE_ID TYPE TITLE\nterminal_1 terminal root\nplugin_2 plugin status\n" + let after = before + "terminal_4 terminal worker-a\n" + + @debug.assert_eq( + @workspace.pane_id_from_list_panes_diff(before, after), + Some("terminal_4"), + ) +} + +///| +test "pane_id_from_list_panes_diff rejects zero ambiguous and non-terminal diffs" { + let before = "PANE_ID TYPE TITLE\nterminal_1 terminal root\n" + let ambiguous_after = before + + "terminal_2 terminal a\nterminal_3 terminal b\n" + let non_terminal_after = before + + "plugin_4 plugin status\npane_5 terminal malformed\n" + + @debug.assert_eq( + @workspace.pane_id_from_list_panes_diff(before, before), + None, + ) + @debug.assert_eq( + @workspace.pane_id_from_list_panes_diff(before, ambiguous_after), + None, + ) + @debug.assert_eq( + @workspace.pane_id_from_list_panes_diff(before, non_terminal_after), + None, + ) +} + ///| test "agy_stderr_log_path sanitizes agent ids as flat filenames" { assert_eq( From 08b124420e37f04cfe9fb524675e46aa0f4cdea1 Mon Sep 17 00:00:00 2001 From: Justin <9146678+brickfrog@users.noreply.github.com> Date: Sat, 13 Jun 2026 06:53:15 -0500 Subject: [PATCH 2/2] fix: revive pane watcher no-handoff watchdog --- src/bin/choir/init.mbt | 5 +- src/bin/choir/uds_server.mbt | 24 +++- src/config/config.mbt | 22 +-- src/runtime/pane_watch.mbt | 121 +++++----------- src/runtime/pane_watch_test.mbt | 16 +-- src/server/handler_disconnect.mbt | 178 +++++++---------------- src/server/handler_test.mbt | 219 ++++++++++++----------------- src/server/state.mbt | 8 ++ src/server/worker_handoff.mbt | 33 ++--- src/server/worker_handoff_test.mbt | 5 - src/sys/io.mbt | 24 ---- src/sys/io_stub.mbt | 9 -- src/sys/stub.c | 109 -------------- src/tools/dispatch_helpers.mbt | 12 +- src/tools/fork_wave.mbt | 4 + src/tools/fork_wave_test.mbt | 9 ++ src/tools/spawn.mbt | 8 ++ src/tools/spawn_diagnostics.mbt | 61 ++++++++ src/types/config.mbt | 8 +- src/types/config_schema.mbt | 11 +- src/types/config_schema_test.mbt | 8 +- src/workspace/launch.mbt | 6 +- src/workspace/multiplexer.mbt | 95 ++++++++++++- 23 files changed, 413 insertions(+), 582 deletions(-) diff --git a/src/bin/choir/init.mbt b/src/bin/choir/init.mbt index b3afa503..c6415bce 100644 --- a/src/bin/choir/init.mbt +++ b/src/bin/choir/init.mbt @@ -471,8 +471,7 @@ fn init_default_terminal_session(project_dir : String) -> String { ///| fn init_config_toml_content() -> String { - "worker_no_handoff_prod_sec = 120\n" + - "worker_no_handoff_escalate_sec = 120\n" + + "worker_no_handoff_idle_sec = 120\n" + "auto_pull_after_integration = true\n" + "auto_rebuild_after_pull = false\n" + "auto_reload_after_rebuild = false\n" + @@ -492,7 +491,7 @@ test "init config.toml scaffold omits machine local fields" { assert_false(content.contains("project_dir")) assert_false(content.contains("listen_uds")) assert_false(content.contains("terminal_session")) - assert_true(content.contains("worker_no_handoff_prod_sec = 120")) + assert_true(content.contains("worker_no_handoff_idle_sec = 120")) assert_true(content.contains("[pr_policy]")) } diff --git a/src/bin/choir/uds_server.mbt b/src/bin/choir/uds_server.mbt index 6c30e115..636d864e 100644 --- a/src/bin/choir/uds_server.mbt +++ b/src/bin/choir/uds_server.mbt @@ -20,6 +20,23 @@ fn grow_connection_buffer(buf : FixedArray[Byte]) -> FixedArray[Byte] { next } +///| +async fn serve_dump_pane_screen(session : String, pane_id : String) -> String { + let (status, output) = @exec.capture_command_stdout( + @workspace.zellij_dump_screen_command(session, pane_id), + ) + if status == 0 { + output + } else { + "" + } +} + +///| +fn serve_pane_watcher() -> @runtime.PaneWatcher { + @runtime.PaneWatcher::with_host({ dump_pane_screen: serve_dump_pane_screen }) +} + ///| struct ServeUDSClient { fd : Int @@ -1368,7 +1385,9 @@ fn serve_run() -> Unit { } } let provider = @server.make_recovery_provider() - let state = @server.ServerState::new(config, provider) + let state = @server.ServerState::new(config, provider).with_pane_watcher( + serve_pane_watcher(), + ) @moontrace.set_global_field("service", "choir".to_json()) @moontrace.set_global_field("run_id", state.run_id.to_json()) @prompts.emit_prompt_drift_warnings( @@ -1433,8 +1452,7 @@ fn serve_run() -> Unit { now, 300L, @exec.run_sequence, - worker_no_handoff_prod_sec=config.worker_no_handoff_prod_sec, - worker_no_handoff_escalate_sec=config.worker_no_handoff_escalate_sec, + worker_no_handoff_idle_sec=config.worker_no_handoff_idle_sec, ) } catch { e => diff --git a/src/config/config.mbt b/src/config/config.mbt index 56ceb21f..457fab5b 100644 --- a/src/config/config.mbt +++ b/src/config/config.mbt @@ -554,23 +554,11 @@ pub fn parse_config( Ok(v) => v Err(e) => return Err(e) } - let worker_no_handoff_prod_sec = match doc.get("worker_no_handoff_prod_sec") { - None => default_cfg.worker_no_handoff_prod_sec + let worker_no_handoff_idle_sec = match doc.get("worker_no_handoff_idle_sec") { + None => default_cfg.worker_no_handoff_idle_sec Some(raw) => match - parse_nonnegative_int64_config_value("worker_no_handoff_prod_sec", raw) { - Ok(v) => v - Err(e) => return Err(e) - } - } - let worker_no_handoff_escalate_sec = match - doc.get("worker_no_handoff_escalate_sec") { - None => default_cfg.worker_no_handoff_escalate_sec - Some(raw) => - match - parse_nonnegative_int64_config_value( - "worker_no_handoff_escalate_sec", raw, - ) { + parse_nonnegative_int64_config_value("worker_no_handoff_idle_sec", raw) { Ok(v) => v Err(e) => return Err(e) } @@ -677,9 +665,7 @@ pub fn parse_config( moon_verify_target, agy_migration_incomplete, hooks_enforcement, - worker_no_handoff_idle_sec: default_cfg.worker_no_handoff_idle_sec, - worker_no_handoff_prod_sec, - worker_no_handoff_escalate_sec, + worker_no_handoff_idle_sec, poller_heartbeat_tick_sec, prenotify_checks, auto_pull_after_integration, diff --git a/src/runtime/pane_watch.mbt b/src/runtime/pane_watch.mbt index b2280d9c..d098051a 100644 --- a/src/runtime/pane_watch.mbt +++ b/src/runtime/pane_watch.mbt @@ -1,28 +1,20 @@ ///| -/// Path for the zellij pane-viewport JSON snapshot for `agent_id` (under `/tmp`). -pub fn pane_snapshot_path(agent_id : String) -> String { - "/tmp/choir-viewport-" + agent_id + ".json" +pub(all) struct PaneWatchHost { + dump_pane_screen : async (String, String) -> String } ///| -/// Host I/O for pane viewport subscribe: zellij child processes, pid cleanup, and snapshot files. -pub(all) struct PaneWatchHost { - spawn_zellij_pane_viewport_subscribe : (String, String, String) -> Int - kill_pid_best_effort : (Int) -> Unit - delete_file_sync : (String) -> Unit - read_file : (String) -> String - dump_pane_screen : (String, String) -> String +async fn native_dump_pane_screen( + _session : String, + _pane_id : String, +) -> String { + @async.sleep(0) + "" } ///| pub fn PaneWatchHost::native() -> PaneWatchHost { - { - spawn_zellij_pane_viewport_subscribe: @sys.spawn_zellij_pane_viewport_subscribe, - kill_pid_best_effort: @sys.kill_pid_best_effort, - delete_file_sync: fn(path) { ignore(@sys.delete_file_sync(path)) }, - read_file: @sys.read_file, - dump_pane_screen: fn(_, _) { "" }, - } + { dump_pane_screen: native_dump_pane_screen } } ///| @@ -36,7 +28,8 @@ pub(all) struct WatchObservation { ///| pub(all) struct WatchState { - pid : Int + session : String + pane_id : String mut observation : WatchObservation } @@ -70,19 +63,15 @@ pub fn PaneWatcher::watch_pane( now : Int64, pane_id? : String = "", ) -> Unit { - ignore(pane_id) - let snapshot_path = pane_snapshot_path(agent_id) - let pid = (self.host.spawn_zellij_pane_viewport_subscribe)( - target.session, - target.name, - snapshot_path, - ) - if pid > 0 { - self.watches.set(agent_id, { - pid, - observation: { last_change_at: now, last_hash: 0, last_len: 0 }, - }) + let resolved = pane_id.trim().to_owned() + if resolved == "" { + return } + self.watches.set(agent_id, { + session: target.session, + pane_id: resolved, + observation: { last_change_at: now, last_hash: 0, last_len: 0 }, + }) } ///| @@ -90,64 +79,23 @@ pub fn PaneWatcher::stop_subscribe( self : PaneWatcher, agent_id : String, ) -> Unit { - match self.watches.get(agent_id) { - Some(watch) => { - self.watches.remove(agent_id) - (self.host.kill_pid_best_effort)(watch.pid) - } - None => () - } - (self.host.delete_file_sync)(pane_snapshot_path(agent_id)) + self.watches.remove(agent_id) } ///| -pub fn PaneWatcher::read_viewport( +pub async fn PaneWatcher::current_screen( self : PaneWatcher, agent_id : String, -) -> String { - let snapshot_path = pane_snapshot_path(agent_id) - (self.host.read_file)(snapshot_path) -} - -///| -pub fn parse_int_simple(s : String) -> Int { - let mut result = 0 - for c in s { - let d = c.to_int() - '0'.to_int() - if d >= 0 && d <= 9 { - result = result * 10 + d - } - } - result -} - -///| -/// Extract viewport text from a zellij subscribe JSON snapshot. -/// Format: {"event":"pane_update","viewport":["line1","line2",...]} -pub fn extract_viewport_text(json_str : String) -> String { - if json_str == "" { - return "" +) -> String? { + let watch = match self.watches.get(agent_id) { + Some(watch) => watch + None => return None } - let parsed : Json = @json.parse(json_str) catch { _ => return json_str } - match parsed { - Object(obj) => - match obj.get("viewport") { - Some(Array(lines)) => { - let buf = StringBuilder::new() - for line in lines { - match line { - String(s) => { - buf.write_string(s) - buf.write_char('\n') - } - _ => () - } - } - buf.to_string() - } - _ => json_str - } - _ => json_str + let screen = (self.host.dump_pane_screen)(watch.session, watch.pane_id) + if screen.trim().to_owned() == "" { + None + } else { + Some(screen) } } @@ -186,7 +134,7 @@ fn PaneWatcher::record_observation( /// Checks if the agent's pane has been idle. Returns `Some(screen_text)` if idle, `None` if not idle or not being watched. /// This is the pane-watch idle/no-heartbeat proxy signal; it emits no hard /// disconnect notification by itself. -pub fn PaneWatcher::observe_idle( +pub async fn PaneWatcher::observe_idle( self : PaneWatcher, agent_id : String, now : Int64, @@ -195,11 +143,10 @@ pub fn PaneWatcher::observe_idle( Some(watch) => watch None => return None } - let snapshot = self.read_viewport(agent_id) - if snapshot.trim().to_owned() == "" { + let screen = (self.host.dump_pane_screen)(watch.session, watch.pane_id) + if screen.trim().to_owned() == "" { return None } - let screen = extract_viewport_text(snapshot) let hash = simple_hash(screen) let len = screen.length() let observation = watch.observation @@ -217,7 +164,7 @@ pub fn PaneWatcher::observe_idle( /// Checks if the agent's pane has been idle. Returns `Some(screen_text)` if idle, `None` if not idle or not being watched. /// This is the pane-watch idle/no-heartbeat proxy signal; it emits no hard /// disconnect notification by itself. -pub fn PaneWatcher::check_idle( +pub async fn PaneWatcher::check_idle( self : PaneWatcher, agent_id : String, now : Int64, diff --git a/src/runtime/pane_watch_test.mbt b/src/runtime/pane_watch_test.mbt index 13072ace..ab6ab673 100644 --- a/src/runtime/pane_watch_test.mbt +++ b/src/runtime/pane_watch_test.mbt @@ -14,11 +14,8 @@ fn pane_watch_dump_host( ) -> PaneWatchHost { let mut dump_count = 0 { - spawn_zellij_pane_viewport_subscribe: fn(_, _, _) { 101 }, - kill_pid_best_effort: fn(_) { () }, - delete_file_sync: fn(_) { () }, - read_file: fn(_) { "" }, - dump_pane_screen: fn(session, pane_id) { + dump_pane_screen: async fn(session, pane_id) { + @async.sleep(0) dump_args.push(session) dump_args.push(pane_id) if screens.length() == 0 { @@ -43,7 +40,7 @@ test "PaneWatcher::new" { } ///| -test "PaneWatcher::observe_idle dumps the resolved pane id through host spy" { +async test "PaneWatcher::observe_idle dumps the resolved pane id through host spy" { let dump_args : Array[String] = [] let pw = PaneWatcher::with_host( pane_watch_dump_host(["worker ready"], dump_args), @@ -56,7 +53,7 @@ test "PaneWatcher::observe_idle dumps the resolved pane id through host spy" { } ///| -test "PaneWatcher::observe_idle reports unchanged dump-screen idle and resets on change" { +async test "PaneWatcher::observe_idle reports unchanged dump-screen idle and resets on change" { let dump_args : Array[String] = [] let pw = PaneWatcher::with_host( pane_watch_dump_host( @@ -84,13 +81,12 @@ test "PaneWatcher::observe_idle reports unchanged dump-screen idle and resets on None => fail("expected unchanged new dump-screen to report idle") } @debug.assert_eq(dump_args, [ - "sess", "terminal_9", "sess", "terminal_9", "sess", "terminal_9", "sess", - "terminal_9", + "sess", "terminal_9", "sess", "terminal_9", "sess", "terminal_9", "sess", "terminal_9", ]) } ///| -test "PaneWatcher::watch_pane skips unresolved pane ids" { +async test "PaneWatcher::watch_pane skips unresolved pane ids" { let dump_args : Array[String] = [] let pw = PaneWatcher::with_host( pane_watch_dump_host(["should not be read"], dump_args), diff --git a/src/server/handler_disconnect.mbt b/src/server/handler_disconnect.mbt index fa04d31b..e9cba7d3 100644 --- a/src/server/handler_disconnect.mbt +++ b/src/server/handler_disconnect.mbt @@ -1,14 +1,19 @@ ///| /// Register a pane for idle watchdog monitoring. -/// Starts a `zellij subscribe` process that streams pane viewport updates -/// to a snapshot file, replacing the old dump-screen polling approach. +/// Reads the agent's persisted zellij pane id and records it for dump-screen +/// polling. Missing sidecars leave the agent unwatched. pub fn ServerState::watch_pane(self : ServerState, agent_id : String) -> Unit { let agent = self.registry.get(agent_id) catch { _ => return } let target = match agent.terminal_target { Some(t) => t None => return } - self.pane_watcher.watch_pane(agent_id, target, @poller.now_sec()) + let pane_id = (self.read_file)( + @workspace.agent_pane_id_file_path(self.config.project_dir, agent_id), + ) + .trim() + .to_owned() + self.pane_watcher.watch_pane(agent_id, target, @poller.now_sec(), pane_id~) } ///| @@ -115,6 +120,11 @@ fn ServerState::clear_agent_runtime_tracking( agent_id : String, ) -> Unit { self.stop_subscribe(agent_id) + ignore( + (self.delete_file)( + @workspace.agent_pane_id_file_path(self.config.project_dir, agent_id), + ), + ) self.pending_disconnects.remove(agent_id) self.agent_pids.remove(agent_id) self.disconnect_contexts.remove(agent_id) @@ -768,33 +778,6 @@ fn ServerState::mark_transport_reconnected( } } -///| -async fn ServerState::prod_leaf_no_handoff( - self : ServerState, - agent : @types.Agent, - now : Int64, - runner : async (Array[@workspace.Command]) -> Array[Int], -) -> Unit { - let agent_id = agent.agent_id.to_string() - let state_key = no_handoff_watchdog_state_key(agent_id, LeafPostFixNoHandoff) - if self.worker_no_handoff_prodded_at.contains(state_key) || - self.handoff_was_delivered(agent_id) { - return - } - self.worker_no_handoff_prodded_at.set(state_key, now) - ignore( - self.durable_deliver_to_agent( - agent, - "watchdog", - leaf_stalled_no_handoff_prod_message(@phase.extract_slug(agent_id)), - @types.Status::Info, - runner, - ), - ) catch { - _ => () - } -} - ///| async fn ServerState::prod_leaf_pre_tdd_stall( self : ServerState, @@ -888,37 +871,6 @@ async fn ServerState::emit_leaf_stalled_no_handoff( } } -///| -async fn ServerState::prod_worker_no_handoff( - self : ServerState, - agent : @types.Agent, - now : Int64, - runner : async (Array[@workspace.Command]) -> Array[Int], -) -> Unit { - let agent_id = agent.agent_id.to_string() - let state_key = no_handoff_watchdog_state_key(agent_id, WorkerNoHandoff) - if self.worker_no_handoff_prodded_at.contains(state_key) || - self.handoff_was_delivered(agent_id) { - return - } - let target = match agent.terminal_target { - Some(t) => t - None => return - } - self.worker_no_handoff_prodded_at.set(state_key, now) - ignore( - @tools.execute_delivery( - @message.DeliveryMethod::LocalTerminalInput( - target, worker_no_handoff_prod_text, - ), - runner, - project_dir=self.config.project_dir, - ), - ) catch { - _ => () - } -} - ///| async fn ServerState::emit_worker_stalled_no_handoff( self : ServerState, @@ -970,7 +922,10 @@ async fn ServerState::emit_worker_exited_without_notify_if_needed( let screen_text = match screen { Some(s) => s None => - @runtime.extract_viewport_text(self.pane_watcher.read_viewport(agent_id)) + match self.pane_watcher.current_screen(agent_id) { + Some(s) => s + None => "" + } } let message = worker_exited_without_notify_message( @phase.extract_slug(agent_id), @@ -1030,25 +985,6 @@ async fn ServerState::check_leaf_no_handoff( agent_id, LeafPostFixNoHandoff, ) - let post_fix_prodded_at = self.worker_no_handoff_prodded_at.get(post_fix_key) - let post_fix_prodded = post_fix_prodded_at != None - let post_fix_idle_timeout = if post_fix_prodded { - escalate_sec - } else { - prod_sec - } - let post_fix_decision_idle_sec = match post_fix_prodded_at { - Some(t) => { - let elapsed_since_prod = nonnegative_elapsed(now, t) - let post_prod_idle_sec = if observation.idle_sec < elapsed_since_prod { - observation.idle_sec - } else { - elapsed_since_prod - } - prod_sec + post_prod_idle_sec - } - None => observation.idle_sec - } let tdd_phase = self.leaf_tdd_phase_for_agent(agent_id) if pre_tdd_idle_timeout > 0L && observation.idle_sec >= pre_tdd_idle_timeout { match @@ -1076,21 +1012,19 @@ async fn ServerState::check_leaf_no_handoff( WorkerNoHandoffDecision::DoNothing => () } } - if post_fix_idle_timeout > 0L && observation.idle_sec >= post_fix_idle_timeout { + if prod_sec > 0L && observation.idle_sec >= prod_sec { match leaf_no_handoff_decision( agent.role, lifecycle, gate_ready, - post_fix_decision_idle_sec, + observation.idle_sec, self.handoff_was_delivered(agent_id), - post_fix_prodded, + false, self.worker_no_handoff_stalled_emitted.contains(post_fix_key), prod_sec, - escalate_sec, + 0L, ) { - WorkerNoHandoffDecision::Prod => - self.prod_leaf_no_handoff(agent, now, runner) WorkerNoHandoffDecision::EmitStalled => self.emit_leaf_stalled_no_handoff( agent, @@ -1099,6 +1033,7 @@ async fn ServerState::check_leaf_no_handoff( runner, ) WorkerNoHandoffDecision::DoNothing => () + WorkerNoHandoffDecision::Prod => () } } } @@ -1107,44 +1042,30 @@ async fn ServerState::check_leaf_no_handoff( async fn ServerState::check_worker_no_handoff( self : ServerState, agent : @types.Agent, - now : Int64, observation : @runtime.PaneIdleObservation, - prod_sec : Int64, - escalate_sec : Int64, + idle_sec : Int64, runner : async (Array[@workspace.Command]) -> Array[Int], ) -> Unit { let agent_id = agent.agent_id.to_string() if agent.role != @types.Role::Worker || self.handoff_was_delivered(agent_id) || - prod_sec <= 0L { + idle_sec <= 0L { return } let state_key = no_handoff_watchdog_state_key(agent_id, WorkerNoHandoff) - let prodded_at = self.worker_no_handoff_prodded_at.get(state_key) - let prodded = prodded_at != None - let idle_timeout = if prodded { escalate_sec } else { prod_sec } - if idle_timeout <= 0L { - return - } - if observation.idle_sec < idle_timeout { + if observation.idle_sec < idle_sec { return } - let decision_idle_sec = match prodded_at { - Some(t) => nonnegative_elapsed(now, t) - None => observation.idle_sec - } match worker_no_handoff_decision( agent.role, - decision_idle_sec, + observation.idle_sec, self.handoff_was_delivered(agent_id), - prodded, + false, self.worker_no_handoff_stalled_emitted.contains(state_key), - prod_sec, - escalate_sec, + idle_sec, + 0L, ) { - WorkerNoHandoffDecision::Prod => - self.prod_worker_no_handoff(agent, now, runner) WorkerNoHandoffDecision::EmitStalled => self.emit_worker_stalled_no_handoff( agent, @@ -1153,6 +1074,7 @@ async fn ServerState::check_worker_no_handoff( runner, ) WorkerNoHandoffDecision::DoNothing => () + WorkerNoHandoffDecision::Prod => () } } @@ -1164,8 +1086,7 @@ pub async fn ServerState::check_idle_agents( now : Int64, timeout : Int64, runner : async (Array[@workspace.Command]) -> Array[Int], - worker_no_handoff_prod_sec? : Int64 = 120L, - worker_no_handoff_escalate_sec? : Int64 = 120L, + worker_no_handoff_idle_sec? : Int64 = 120L, ) -> Unit { let agent_ids = self.pane_watcher.watched_agents() for agent_id in agent_ids { @@ -1201,16 +1122,15 @@ pub async fn ServerState::check_idle_agents( match self.pane_watcher.observe_idle(agent_id, now) { Some(observation) => { self.check_worker_no_handoff( - agent, now, observation, worker_no_handoff_prod_sec, worker_no_handoff_escalate_sec, - runner, + agent, observation, worker_no_handoff_idle_sec, runner, ) self.check_leaf_no_handoff( agent, lifecycle.workflow, now, observation, - worker_no_handoff_prod_sec, - worker_no_handoff_escalate_sec, + worker_no_handoff_idle_sec, + worker_no_handoff_idle_sec, runner, ) if watchdog_exempt { @@ -1268,21 +1188,11 @@ async test "check_idle_agents skips supervisor roles before reading panes" { "-" + @env.now().reinterpret_as_int64().to_string() ignore(@sys.create_dir_all(project_dir + "/.choir/kv")) + ignore(@sys.create_dir_all(project_dir + "/.choir/pane-ids")) let reads : Array[String] = [] - let spawned : Array[String] = [] - let killed : Array[Int] = [] let host : @runtime.PaneWatchHost = { - spawn_zellij_pane_viewport_subscribe: fn(_session, name, _snapshot_path) { - spawned.push(name) - 100 - }, - kill_pid_best_effort: fn(pid) { killed.push(pid) }, - delete_file_sync: fn(_path) { }, - read_file: fn(path) { - reads.push(path) - "{\"event\":\"pane_update\",\"viewport\":[\"idle\"]}" - }, - dump_pane_screen: fn(_session, _pane_id) { + dump_pane_screen: async fn(_session, _pane_id) { + @async.sleep(0) reads.push(_session + ":" + _pane_id) "idle" }, @@ -1318,6 +1228,12 @@ async test "check_idle_agents skips supervisor roles before reading panes" { state: @types.AgentState::Active, }), ) + ignore( + @sys.write_file_sync( + @workspace.agent_pane_id_file_path(project_dir, agent_id), + "terminal_" + agent_id.length().to_string(), + ), + ) state.watch_pane(agent_id) } let calls : Array[@workspace.Command] = [] @@ -1328,9 +1244,7 @@ async test "check_idle_agents skips supervisor roles before reading panes" { } [0] }) - assert_eq(spawned.length(), 2) @debug.assert_eq(reads, []) - @debug.assert_eq(killed, []) @debug.assert_eq(calls, []) assert_eq(state.registry.get("root").state, @types.AgentState::Active) assert_eq(state.registry.get("main.tl").state, @types.AgentState::Active) @@ -2070,8 +1984,10 @@ async fn ServerState::handle_disconnect_confirming_with_runner( None, runner, ) - let snapshot = self.pane_watcher.read_viewport(agent_id) - let screen = @runtime.extract_viewport_text(snapshot) + let screen = match self.pane_watcher.current_screen(agent_id) { + Some(s) => s + None => "" + } let (reason, msg) = if screen != "" { let classified = @phase.classify_failure(screen) match classified { diff --git a/src/server/handler_test.mbt b/src/server/handler_test.mbt index 9cd35291..bfb4e7e3 100644 --- a/src/server/handler_test.mbt +++ b/src/server/handler_test.mbt @@ -7683,13 +7683,11 @@ async test "soft transport disconnect still escalates when idle watchdog expires let parent_id = "main.tl" let agent_id = "main.watchdog-leaf" let pane_name = "watchdog-leaf-pane" - let snapshot = "{\"event\":\"pane_update\",\"viewport\":[\"still thinking\"]}" let host : @runtime.PaneWatchHost = { - spawn_zellij_pane_viewport_subscribe: fn(_session, _name, _path) { 101 }, - kill_pid_best_effort: fn(_pid) { }, - delete_file_sync: fn(_path) { }, - read_file: fn(_path) { snapshot }, - dump_pane_screen: fn(_session, _pane_id) { "still thinking" }, + dump_pane_screen: async fn(_session, _pane_id) { + @async.sleep(0) + "still thinking" + }, } let state = ServerState::new( { @@ -7699,6 +7697,13 @@ async test "soft transport disconnect still escalates when idle watchdog expires }, disconnect_confirmation_provider([], [pane_name]), ).with_pane_watcher(@runtime.PaneWatcher::with_host(host)) + ignore(@sys.create_dir_all(project_dir + "/.choir/pane-ids")) + ignore( + @sys.write_file_sync( + @workspace.agent_pane_id_file_path(project_dir, agent_id), + "terminal_4", + ), + ) ignore(state.register_agent(disconnect_confirmation_parent(parent_id))) ignore( state.register_agent({ @@ -7721,8 +7726,7 @@ async test "soft transport disconnect still escalates when idle watchdog expires start, 100L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=0L, - worker_no_handoff_escalate_sec=0L, + worker_no_handoff_idle_sec=0L, ) state.schedule_disconnect(agent_id, start) state.process_pending_disconnects_with_runner(start + 61L, 60L, runner=async fn( @@ -7745,8 +7749,7 @@ async test "soft transport disconnect still escalates when idle watchdog expires start + 101L, 100L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=0L, - worker_no_handoff_escalate_sec=0L, + worker_no_handoff_idle_sec=0L, ) assert_eq(state.registry.get(agent_id).state, @types.AgentState::Failed) @@ -8628,24 +8631,26 @@ fn worker_handoff_test_state(label : String, snapshot : String) -> ServerState { ///| fn worker_handoff_test_state_with_read_log( label : String, - snapshot : String, + screen : String, reads : Array[String], ) -> ServerState { let base = handler_auto_close_test_state(label) let host : @runtime.PaneWatchHost = { - spawn_zellij_pane_viewport_subscribe: fn(_session, _name, _path) { 101 }, - kill_pid_best_effort: fn(_pid) { }, - delete_file_sync: fn(_path) { }, - read_file: fn(path) { - reads.push(path) - snapshot - }, - dump_pane_screen: fn(_session, _pane_id) { + dump_pane_screen: async fn(_session, _pane_id) { + @async.sleep(0) reads.push(_session + ":" + _pane_id) - snapshot + screen }, } - base.with_pane_watcher(@runtime.PaneWatcher::with_host(host)) + base + .with_pane_watcher(@runtime.PaneWatcher::with_host(host)) + .with_read_file(fn(path : String) { + if path.contains("/.choir/pane-ids/") { + "terminal_test" + } else { + @sys.read_file(path) + } + }) } ///| @@ -8656,23 +8661,8 @@ fn worker_handoff_test_state_with_snapshots( let base = handler_auto_close_test_state(label) let mut read_count = 0 let host : @runtime.PaneWatchHost = { - spawn_zellij_pane_viewport_subscribe: fn(_session, _name, _path) { 101 }, - kill_pid_best_effort: fn(_pid) { }, - delete_file_sync: fn(_path) { }, - read_file: fn(_path) { - if snapshots.length() == 0 { - "" - } else { - let idx = if read_count < snapshots.length() { - read_count - } else { - snapshots.length() - 1 - } - read_count = read_count + 1 - snapshots[idx] - } - }, - dump_pane_screen: fn(_session, _pane_id) { + dump_pane_screen: async fn(_session, _pane_id) { + @async.sleep(0) if snapshots.length() == 0 { "" } else { @@ -8686,7 +8676,15 @@ fn worker_handoff_test_state_with_snapshots( } }, } - base.with_pane_watcher(@runtime.PaneWatcher::with_host(host)) + base + .with_pane_watcher(@runtime.PaneWatcher::with_host(host)) + .with_read_file(fn(path : String) { + if path.contains("/.choir/pane-ids/") { + "terminal_test" + } else { + @sys.read_file(path) + } + }) } ///| @@ -9448,22 +9446,19 @@ async test "check_idle_agents warns once at seventy percent before idle kill" { start, 100L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=0L, - worker_no_handoff_escalate_sec=0L, + worker_no_handoff_idle_sec=0L, ) state.check_idle_agents( start + 70L, 100L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=0L, - worker_no_handoff_escalate_sec=0L, + worker_no_handoff_idle_sec=0L, ) state.check_idle_agents( start + 80L, 100L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=0L, - worker_no_handoff_escalate_sec=0L, + worker_no_handoff_idle_sec=0L, ) }) assert_eq(handler_idle_warning_count(captured), 1) @@ -9515,37 +9510,33 @@ async test "check_idle_agents emits stalled parent event for merge-ready ReviewO start, 180L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) state.check_idle_agents( start + 120L, 180L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) state.check_idle_agents( start + 121L, 180L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) assert_eq( worker_handoff_count_sent(sent, "[LEAF STALLED AFTER FIX-PUSH] dev-ready"), - 0, + 1, ) state.check_idle_agents( start + 240L, 999L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) assert_eq( worker_handoff_count_sent(sent, "[LEAF POST-FIX-PUSH HANDOFF] dev-ready"), - 1, + 0, ) assert_eq( worker_handoff_count_sent(sent, "[LEAF STALLED AFTER FIX-PUSH] dev-ready"), @@ -9598,15 +9589,13 @@ async test "watchdog kill records merge-ready dev leaf without notify as failure start, 180L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) state.check_idle_agents( start + 181L, 180L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) let lifecycle = @phase.read_lifecycle(state.config.project_dir, leaf_id) match lifecycle { @@ -9674,37 +9663,33 @@ async test "check_idle_agents does not stall prodded leaf after fresh pane outpu start, 999L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) state.check_idle_agents( start + 120L, 999L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) state.check_idle_agents( start + 230L, 999L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) state.check_idle_agents( start + 241L, 999L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) assert_eq( worker_handoff_count_sent(sent, "[LEAF POST-FIX-PUSH HANDOFF] dev-active"), - 1, + 0, ) assert_eq( worker_handoff_count_sent(sent, "[LEAF STALLED AFTER FIX-PUSH] dev-active"), - 0, + 1, ) } @@ -9734,15 +9719,13 @@ async test "check_idle_agents does not prod non-ReviewOwned dev leaf" { start, 999L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) state.check_idle_agents( start + 240L, 999L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) assert_eq(worker_handoff_count_sent(sent, "[LEAF POST-FIX-PUSH HANDOFF]"), 0) assert_eq(worker_handoff_count_sent(sent, "[LEAF STALLED AFTER FIX-PUSH]"), 0) @@ -9781,15 +9764,13 @@ async test "check_idle_agents prods working dev leaf before TDD handoff then not start, 999L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) state.check_idle_agents( start + 120L, 999L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) assert_eq(worker_handoff_count_sent(sent, "tdd_intent"), 1) assert_true( @@ -9800,8 +9781,7 @@ async test "check_idle_agents prods working dev leaf before TDD handoff then not start + 240L, 999L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) assert_eq( worker_handoff_count_sent( @@ -9845,15 +9825,13 @@ async test "check_idle_agents keeps pre-TDD and post-fix leaf watchdog state sep start, 999L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) state.check_idle_agents( start + 120L, 999L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) assert_eq(worker_handoff_count_sent(sent, "tdd_intent"), 1) assert_true( @@ -9888,18 +9866,17 @@ async test "check_idle_agents keeps pre-TDD and post-fix leaf watchdog state sep start + 240L, 999L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) assert_eq( worker_handoff_count_sent(sent, "[LEAF POST-FIX-PUSH HANDOFF] dev-split"), - 1, + 0, ) assert_eq( worker_handoff_count_sent(sent, "[LEAF STALLED AFTER FIX-PUSH] dev-split"), - 0, + 1, ) - assert_true( + assert_false( state.worker_no_handoff_prodded_at.contains( "leaf-post-fix-no-handoff:" + leaf_id, ), @@ -9909,8 +9886,7 @@ async test "check_idle_agents keeps pre-TDD and post-fix leaf watchdog state sep start + 360L, 999L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) assert_eq( worker_handoff_count_sent(sent, "[LEAF STALLED AFTER FIX-PUSH] dev-split"), @@ -9938,15 +9914,13 @@ async test "check_idle_agents emits worker stalled directly without terminal pro start, 300L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) state.check_idle_agents( start + 121L, 300L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) assert_eq( worker_handoff_count_sent(sent, "printed output is not the handoff"), @@ -9963,15 +9937,13 @@ async test "check_idle_agents emits worker stalled directly without terminal pro start + 242L, 300L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) state.check_idle_agents( start + 250L, 300L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) assert_eq( worker_handoff_count_sent( @@ -9986,10 +9958,8 @@ async test "check_idle_agents resets worker no-handoff idle when pane changes" { let state = worker_handoff_test_state_with_snapshots( "worker-no-handoff-change-reset", [ - "{\"event\":\"pane_update\",\"viewport\":[\"first tail\"]}", - "{\"event\":\"pane_update\",\"viewport\":[\"changed tail\"]}", - "{\"event\":\"pane_update\",\"viewport\":[\"changed tail\"]}", - "{\"event\":\"pane_update\",\"viewport\":[\"changed tail\"]}", + "{\"event\":\"pane_update\",\"viewport\":[\"first tail\"]}", "{\"event\":\"pane_update\",\"viewport\":[\"changed tail\"]}", + "{\"event\":\"pane_update\",\"viewport\":[\"changed tail\"]}", "{\"event\":\"pane_update\",\"viewport\":[\"changed tail\"]}", ], ) let parent_id = "main.parent" @@ -10008,15 +9978,13 @@ async test "check_idle_agents resets worker no-handoff idle when pane changes" { start, 300L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) state.check_idle_agents( start + 121L, 300L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) assert_eq( worker_handoff_count_sent( @@ -10029,15 +9997,13 @@ async test "check_idle_agents resets worker no-handoff idle when pane changes" { start + 242L, 300L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) state.check_idle_agents( start + 250L, 300L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) assert_eq( worker_handoff_count_sent(sent, "printed output is not the handoff"), @@ -10052,11 +10018,11 @@ async test "check_idle_agents resets worker no-handoff idle when pane changes" { } ///| -async test "check_idle_agents reuses one pane snapshot for handoff and watchdog checks" { - let snapshot = "{\"event\":\"pane_update\",\"viewport\":[\"done\",\"tail\"]}" +async test "check_idle_agents reuses one pane dump for handoff and watchdog checks" { + let screen = "done\ntail" let reads : Array[String] = [] let state = worker_handoff_test_state_with_read_log( - "worker-handoff-single-read", snapshot, reads, + "worker-handoff-single-read", screen, reads, ) let parent_id = "main.parent" let worker_id = "main.single-read" @@ -10074,20 +10040,17 @@ async test "check_idle_agents reuses one pane snapshot for handoff and watchdog start, 300L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) state.check_idle_agents( start + 301L, 300L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) @debug.assert_eq(reads, [ - @runtime.pane_snapshot_path(worker_id), - @runtime.pane_snapshot_path(worker_id), + "test-session:terminal_test", "test-session:terminal_test", ]) } @@ -10127,15 +10090,13 @@ async test "notify_parent marks handoff delivered and suppresses worker prod" { start, 300L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) state.check_idle_agents( start + 121L, 300L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=120L, - worker_no_handoff_escalate_sec=120L, + worker_no_handoff_idle_sec=120L, ) assert_eq( worker_handoff_count_sent(sent, "printed output is not the handoff"), @@ -10163,15 +10124,13 @@ async test "watchdog kill emits worker exited without notify only before handoff start, 300L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=0L, - worker_no_handoff_escalate_sec=0L, + worker_no_handoff_idle_sec=0L, ) state.check_idle_agents( start + 301L, 300L, async fn(cmds) { worker_handoff_capture_runner(sent, cmds) }, - worker_no_handoff_prod_sec=0L, - worker_no_handoff_escalate_sec=0L, + worker_no_handoff_idle_sec=0L, ) assert_eq( worker_handoff_count_sent(sent, "[WORKER EXITED WITHOUT NOTIFY] worker-a"), @@ -10198,15 +10157,13 @@ async test "watchdog kill emits worker exited without notify only before handoff delivered_start, 300L, async fn(cmds) { worker_handoff_capture_runner(delivered_sent, cmds) }, - worker_no_handoff_prod_sec=0L, - worker_no_handoff_escalate_sec=0L, + worker_no_handoff_idle_sec=0L, ) delivered.check_idle_agents( delivered_start + 301L, 300L, async fn(cmds) { worker_handoff_capture_runner(delivered_sent, cmds) }, - worker_no_handoff_prod_sec=0L, - worker_no_handoff_escalate_sec=0L, + worker_no_handoff_idle_sec=0L, ) assert_eq( worker_handoff_count_sent(delivered_sent, "[WORKER EXITED WITHOUT NOTIFY]"), diff --git a/src/server/state.mbt b/src/server/state.mbt index e81aa5cd..52794ffa 100644 --- a/src/server/state.mbt +++ b/src/server/state.mbt @@ -645,6 +645,14 @@ pub fn ServerState::with_pane_watcher( { ..self, pane_watcher, } } +///| +pub fn ServerState::with_read_file( + self : ServerState, + read_file : (String) -> String, +) -> ServerState { + { ..self, read_file, } +} + ///| test "ServerState::new restores persisted agent task contracts from cold boot state" { let project_dir = "/tmp/choir-agent-contract-boot-" + diff --git a/src/server/worker_handoff.mbt b/src/server/worker_handoff.mbt index c417f671..744c8fff 100644 --- a/src/server/worker_handoff.mbt +++ b/src/server/worker_handoff.mbt @@ -1,6 +1,3 @@ -///| -pub let worker_no_handoff_prod_text : String = "You appear finished but have not called `notify_parent` — your printed output is not the handoff. Call `notify_parent --status success|failure ''` now, then `shutdown`." - ///| pub let leaf_pre_tdd_stall_prod_text : String = "You are idle before completing the TDD/file_pr handoff. If implementation is complete, drive the gate now: call `tdd_intent`, `tdd_red_check`, and `tdd_green_check` until the phase is `done`, then call `file_pr`. If you are blocked, call `notify_parent --status failure ''`. Do not park silently." @@ -37,16 +34,15 @@ pub fn worker_no_handoff_decision( prod_sec : Int64, escalate_sec : Int64, ) -> WorkerNoHandoffDecision { - if role != @types.Role::Worker || handoff_delivered || prod_sec <= 0L { + if role != @types.Role::Worker || + handoff_delivered || + stalled_emitted || + prod_sec <= 0L { return DoNothing } - if !prodded { - if idle_sec >= prod_sec { - Prod - } else { - DoNothing - } - } else if escalate_sec > 0L && !stalled_emitted && idle_sec >= escalate_sec { + ignore(prodded) + ignore(escalate_sec) + if idle_sec >= prod_sec { EmitStalled } else { DoNothing @@ -128,11 +124,9 @@ pub fn leaf_no_handoff_decision( idle_sec < prod_sec { return DoNothing } - if !prodded { - Prod - } else if escalate_sec > 0L && - !stalled_emitted && - idle_sec >= prod_sec + escalate_sec { + ignore(prodded) + ignore(escalate_sec) + if !stalled_emitted { EmitStalled } else { DoNothing @@ -197,13 +191,6 @@ pub fn leaf_pre_tdd_stalled_parent_message( worker_no_handoff_tail_or_placeholder(pane_tail) } -///| -pub fn leaf_stalled_no_handoff_prod_message(slug : String) -> String { - "[LEAF POST-FIX-PUSH HANDOFF] " + - slug + - ": You pushed fixes and the merge gate is ready. Call `notify_parent --status success` once with a short summary, then stand down. Do not idle waiting for more events." -} - ///| pub fn leaf_stalled_no_handoff_parent_message( slug : String, diff --git a/src/server/worker_handoff_test.mbt b/src/server/worker_handoff_test.mbt index 13a11cdb..767b87b3 100644 --- a/src/server/worker_handoff_test.mbt +++ b/src/server/worker_handoff_test.mbt @@ -393,11 +393,6 @@ test "leaf no-handoff messages include fix-push headers and pane tail" { assert_true(pre_tdd_stalled.contains("TDD phase is green")) assert_true(pre_tdd_stalled.contains("Pane tail:\nline 1\nline 2")) - let prod = leaf_stalled_no_handoff_prod_message("leaf-a") - assert_true(prod.contains("[LEAF POST-FIX-PUSH HANDOFF] leaf-a")) - assert_true(prod.contains("notify_parent --status success")) - assert_true(prod.contains("Do not idle waiting for more events")) - let stalled = leaf_stalled_no_handoff_parent_message( "leaf-a", 240L, "line 1\nline 2", ) diff --git a/src/sys/io.mbt b/src/sys/io.mbt index 5e089a2c..72b0fb98 100644 --- a/src/sys/io.mbt +++ b/src/sys/io.mbt @@ -435,14 +435,6 @@ extern "C" fn c_chmod_mode(path : Bytes, mode : Int) -> Int = "choir_chmod_mode" #borrow(target, linkpath) extern "C" fn c_symlink_force(target : Bytes, linkpath : Bytes) -> Int = "choir_symlink_force" -///| -#borrow(session, pane_id, snapshot_path) -extern "C" fn c_spawn_zellij_pane_viewport_subscribe( - session : Bytes, - pane_id : Bytes, - snapshot_path : Bytes, -) -> Int = "choir_spawn_zellij_pane_viewport_subscribe" - ///| pub fn write_file_sync(path : String, content : String) -> Bool { let path_bytes = string_to_null_terminated_bytes(path) @@ -600,22 +592,6 @@ pub fn symlink_force(target : String, linkpath : String) -> Bool { 0 } -///| -/// Fork a supervisor that runs `zellij subscribe pane-viewport --format json`, mirrors each stdout -/// line into `snapshot_path`, and returns the supervisor PID for `kill_pid_best_effort`, or a -/// non-positive value on failure. -pub fn spawn_zellij_pane_viewport_subscribe( - session : String, - pane_id : String, - snapshot_path : String, -) -> Int { - c_spawn_zellij_pane_viewport_subscribe( - string_to_null_terminated_bytes(session), - string_to_null_terminated_bytes(pane_id), - string_to_null_terminated_bytes(snapshot_path), - ) -} - ///| #borrow(pd, ws, br) extern "C" fn c_worktree_bootstrap_cleanup(pd : Bytes, ws : Bytes, br : Bytes) = "choir_worktree_bootstrap_cleanup" diff --git a/src/sys/io_stub.mbt b/src/sys/io_stub.mbt index 889efca1..a1117251 100644 --- a/src/sys/io_stub.mbt +++ b/src/sys/io_stub.mbt @@ -113,15 +113,6 @@ pub fn delete_file_sync(_path : String) -> Bool { false } -///| -pub fn spawn_zellij_pane_viewport_subscribe( - _session : String, - _pane_id : String, - _snapshot_path : String, -) -> Int { - -1 -} - ///| pub fn register_cleanup(_cmd : String) -> Unit { diff --git a/src/sys/stub.c b/src/sys/stub.c index 56284b94..6b05c26e 100644 --- a/src/sys/stub.c +++ b/src/sys/stub.c @@ -1033,115 +1033,6 @@ int choir_symlink_force(const char* target, const char* linkpath) { return symlink(target, linkpath); } -static volatile pid_t choir_zellij_subscribe_child = 0; - -static void choir_zellij_subscribe_on_term(int sig) { - (void)sig; - pid_t z = choir_zellij_subscribe_child; - if (z > 0) { - kill(z, SIGTERM); - } - _exit(0); -} - -/** - * Fork a supervisor that runs `zellij --session ... subscribe pane-viewport --pane-id ... --format json`, - * discards stderr, and overwrites snapshot_path with each complete line read from zellij stdout. - * Returns the supervisor PID for best-effort teardown via kill_pid_best_effort, or -1 on failure. - */ -int choir_spawn_zellij_pane_viewport_subscribe( - const char* session, - const char* pane_id, - const char* snapshot_path) { - int pipefd[2]; - if (pipe(pipefd) != 0) { - return -1; - } - pid_t sup = fork(); - if (sup < 0) { - close(pipefd[0]); - close(pipefd[1]); - return -1; - } - if (sup > 0) { - close(pipefd[0]); - close(pipefd[1]); - return (int)sup; - } - - /* Supervisor: read zellij stdout and mirror each line to snapshot_path. */ - struct sigaction sa; - memset(&sa, 0, sizeof(sa)); - sa.sa_handler = choir_zellij_subscribe_on_term; - sigemptyset(&sa.sa_mask); - sa.sa_flags = 0; - sigaction(SIGTERM, &sa, NULL); - sigaction(SIGINT, &sa, NULL); - - pid_t zj = fork(); - if (zj < 0) { - close(pipefd[0]); - _exit(1); - } - if (zj == 0) { - close(pipefd[0]); - int devnull = open("/dev/null", O_WRONLY); - if (devnull >= 0) { - dup2(devnull, STDERR_FILENO); - close(devnull); - } - dup2(pipefd[1], STDOUT_FILENO); - close(pipefd[1]); - execlp( - "zellij", - "zellij", - "--session", - session, - "subscribe", - "pane-viewport", - "--pane-id", - pane_id, - "--format", - "json", - (char*)NULL); - _exit(127); - } - - choir_zellij_subscribe_child = zj; - close(pipefd[1]); - - FILE* in = fdopen(pipefd[0], "r"); - if (!in) { - kill(zj, SIGTERM); - waitpid(zj, NULL, 0); - _exit(1); - } - - char* line = NULL; - size_t linecap = 0; - for (;;) { - errno = 0; - ssize_t n = getline(&line, &linecap, in); - if (n < 0) { - if (errno == EINTR) { - continue; - } - break; - } - if (n > 0 && line[n - 1] == '\n') { - line[n - 1] = '\0'; - n--; - } - choir_write_file_sync(snapshot_path, line, (int)n); - } - - free(line); - fclose(in); - kill(zj, SIGTERM); - waitpid(zj, NULL, 0); - _exit(0); -} - static int choir_spawn_wait_stderr_null(char **argv) { pid_t pid = fork(); if (pid < 0) { diff --git a/src/tools/dispatch_helpers.mbt b/src/tools/dispatch_helpers.mbt index a009369d..d8b0c8b4 100644 --- a/src/tools/dispatch_helpers.mbt +++ b/src/tools/dispatch_helpers.mbt @@ -46,9 +46,19 @@ pub async fn run_strict( record_spawn_launch_diagnostic? : (SpawnLaunchDiagnostic) -> Unit = fn(_) { () }, + capture_pane_id? : Bool = false, + record_spawn_pane_id? : async (String, String) -> Unit = async fn(_, _) { + @async.sleep(0) + }, ) -> Result[Unit, @types.ChoirError] { let (statuses, spawn_launch_captured_outputs) = run_commands_with_spawn_launch_capture( - spawn_launch_bindings, cmds, runner, capture, record_spawn_launch_diagnostic, + spawn_launch_bindings, + cmds, + runner, + capture, + record_spawn_launch_diagnostic, + capture_pane_id~, + record_spawn_pane_id~, ) catch { e => return Err(@types.ChoirError::transport_error(e.to_string())) } diff --git a/src/tools/fork_wave.mbt b/src/tools/fork_wave.mbt index d09a6368..a2622456 100644 --- a/src/tools/fork_wave.mbt +++ b/src/tools/fork_wave.mbt @@ -690,6 +690,10 @@ pub async fn interpret_fork_wave_spawn_effect_phase( runner, capture, record_spawn_launch_diagnostic, + capture_pane_id=true, + record_spawn_pane_id=async fn(agent_id, pane_id) { + record_spawn_pane_id_sidecar(project_dir, runner, agent_id, pane_id) + }, ) for index, output in captured_outputs { spawn_launch_captured_outputs.set(index, output) diff --git a/src/tools/fork_wave_test.mbt b/src/tools/fork_wave_test.mbt index 04d8cd3c..fcb56cd2 100644 --- a/src/tools/fork_wave_test.mbt +++ b/src/tools/fork_wave_test.mbt @@ -1395,6 +1395,9 @@ async fn mock_capture_fork_wave_must_not_run( cmd : @workspace.Command, ) -> (Int, String) { @async.sleep(0) + if cmd.program == "env" && cmd.args.contains("list-panes") { + return (0, "PANE_ID TYPE TITLE\nterminal_1 terminal root\n") + } if cmd.program == "zellij" && (cmd.args.contains("new-tab") || cmd.args.contains("run")) { return (0, "") @@ -1652,6 +1655,9 @@ async test "interpret_fork_wave_spawn_effect_phase seeds agy shared cache once b (cmd.args.contains("new-tab") || cmd.args.contains("run")) { return (0, "") } + if cmd.program == "env" && cmd.args.contains("list-panes") { + return (0, "PANE_ID TYPE TITLE\nterminal_1 terminal root\n") + } fail("unexpected capture: " + cmd.to_string()) (0, "") } @@ -1745,6 +1751,9 @@ async test "interpret_fork_wave_spawn_effect_phase warns and spawns when agy see (cmd.args.contains("new-tab") || cmd.args.contains("run")) { return (0, "") } + if cmd.program == "env" && cmd.args.contains("list-panes") { + return (0, "PANE_ID TYPE TITLE\nterminal_1 terminal root\n") + } fail("unexpected capture: " + cmd.to_string()) (0, "") } diff --git a/src/tools/spawn.mbt b/src/tools/spawn.mbt index f22999f7..8158dd47 100644 --- a/src/tools/spawn.mbt +++ b/src/tools/spawn.mbt @@ -588,6 +588,10 @@ pub async fn interpret_spawn_worker( capture~, spawn_launch_bindings~, record_spawn_launch_diagnostic~, + capture_pane_id=true, + record_spawn_pane_id=async fn(agent_id, pane_id) { + record_spawn_pane_id_sidecar(project_dir, runner, agent_id, pane_id) + }, ) { Ok(_) => { match registry { @@ -836,6 +840,10 @@ pub async fn interpret_spawn_gemini( capture~, spawn_launch_bindings~, record_spawn_launch_diagnostic~, + capture_pane_id=true, + record_spawn_pane_id=async fn(agent_id, pane_id) { + record_spawn_pane_id_sidecar(project_dir, runner, agent_id, pane_id) + }, ) { Ok(_) => match seed_warning { diff --git a/src/tools/spawn_diagnostics.mbt b/src/tools/spawn_diagnostics.mbt index 79fba472..c815d493 100644 --- a/src/tools/spawn_diagnostics.mbt +++ b/src/tools/spawn_diagnostics.mbt @@ -19,6 +19,30 @@ pub(all) enum SpawnLaunchDiagnostic { SpawnCommandsSucceeded(agent_id~ : String, launch_count~ : Int) } derive(Debug, Eq) +///| +async fn noop_record_spawn_pane_id( + _agent_id : String, + _pane_id : String, +) -> Unit { + @async.sleep(0) +} + +///| +pub async fn record_spawn_pane_id_sidecar( + project_dir : String, + runner : async (Array[@workspace.Command]) -> Array[Int], + agent_id : String, + pane_id : String, +) -> Unit { + ignore( + runner([ + @workspace.write_agent_pane_id_command(project_dir, agent_id, pane_id), + ]) catch { + _ => [] + }, + ) +} + ///| fn spawn_diag_command_arg_value(args : Array[String], flag : String) -> String? { for i = 0; i + 1 < args.length(); i = i + 1 { @@ -265,6 +289,8 @@ pub async fn run_commands_with_spawn_launch_capture( runner : async (Array[@workspace.Command]) -> Array[Int], capture : async (@workspace.Command) -> (Int, String), record : (SpawnLaunchDiagnostic) -> Unit, + capture_pane_id? : Bool = false, + record_spawn_pane_id? : async (String, String) -> Unit = noop_record_spawn_pane_id, ) -> (Array[Int], Map[Int, String]) { if bindings.length() == 0 { return (runner(cmds), {}) @@ -292,11 +318,46 @@ pub async fn run_commands_with_spawn_launch_capture( return (statuses, captured_outputs) } } + let before_list_panes = match + ( + capture_pane_id, + spawn_diag_command_arg_value(cmds[i].args, "--session"), + ) { + (true, Some(session)) => { + let (before_status, before_output) = capture( + @workspace.zellij_list_panes_command(session), + ) + if before_status == 0 { + Some((session, before_output)) + } else { + None + } + } + _ => None + } let (status, output) = capture(cmds[i]) statuses.push(status) let redacted_output = spawn_launch_sanitized_stderr_tail(output) captured_outputs.set(i, redacted_output) if status == 0 { + match before_list_panes { + Some((session, before_output)) => { + let (after_status, after_output) = capture( + @workspace.zellij_list_panes_command(session), + ) + if after_status == 0 { + match + @workspace.pane_id_from_list_panes_diff( + before_output, after_output, + ) { + Some(pane_id) => + record_spawn_pane_id(binding.agent_id, pane_id) + None => () + } + } + } + None => () + } success_counts.set( binding.agent_id, success_counts.get(binding.agent_id).unwrap_or(0) + 1, diff --git a/src/types/config.mbt b/src/types/config.mbt index 5153c66a..10c2812d 100644 --- a/src/types/config.mbt +++ b/src/types/config.mbt @@ -102,10 +102,6 @@ pub(all) struct Config { hooks_enforcement : Bool /// Worker/leaf no-handoff idle threshold in seconds; 0 disables stalled event. worker_no_handoff_idle_sec : Int64 - /// Worker-only no-handoff prod threshold in seconds; 0 disables prod/stall. - worker_no_handoff_prod_sec : Int64 - /// Worker-only no-handoff stalled-event grace after prod in seconds; 0 disables stalled event. - worker_no_handoff_escalate_sec : Int64 /// Poller heartbeat cadence in seconds; 0 disables heartbeat ticks. poller_heartbeat_tick_sec : Int /// Local checks that must pass before a Dev success `notify_parent` handoff. @@ -198,9 +194,7 @@ pub fn Config::default() -> Config { moon_verify_target: Native, agy_migration_incomplete: false, hooks_enforcement: false, - worker_no_handoff_idle_sec: 0L, - worker_no_handoff_prod_sec: 120L, - worker_no_handoff_escalate_sec: 120L, + worker_no_handoff_idle_sec: 120L, poller_heartbeat_tick_sec: 60, prenotify_checks: [], auto_pull_after_integration: true, diff --git a/src/types/config_schema.mbt b/src/types/config_schema.mbt index 89901126..69abdbb4 100644 --- a/src/types/config_schema.mbt +++ b/src/types/config_schema.mbt @@ -180,17 +180,10 @@ pub fn config_schema_entries() -> Array[ConfigSchemaEntry] { ), schema_entry( ConfigToml, - "worker_no_handoff_prod_sec", + "worker_no_handoff_idle_sec", "Int64", "120", - "Worker-only no-handoff prod threshold in seconds; 0 disables prod/stall.", - ), - schema_entry( - ConfigToml, - "worker_no_handoff_escalate_sec", - "Int64", - "120", - "Worker-only no-handoff stalled-event grace after prod in seconds; 0 disables stalled event.", + "Worker/leaf no-handoff idle threshold in seconds; 0 disables stalled event.", ), schema_entry( ConfigToml, diff --git a/src/types/config_schema_test.mbt b/src/types/config_schema_test.mbt index eb6ab8ac..dad96d3d 100644 --- a/src/types/config_schema_test.mbt +++ b/src/types/config_schema_test.mbt @@ -19,10 +19,10 @@ fn expected_config_schema_keys() -> Array[String] { "otlp_endpoint", "agent_id", "extra_mcp_servers_json", "companions_json", "plugins_json", "terminal_session", "automerge", "review_mode", "merge_strategy", "moon_verify_target", "agy_migration_incomplete", "hooks_enforcement", "worker_no_handoff_idle_sec", - "poller_heartbeat_tick_sec", "prenotify_checks", - "auto_pull_after_integration", "project.verify_cmd", "pr_policy.require_integration_audit", - "pr_policy.skip_copilot_on_feature_branch", "pr_policy.reviewer", "auto_rebuild_after_pull", - "auto_reload_after_rebuild", "pricing..rate_in", "pricing..rate_out", + "poller_heartbeat_tick_sec", "prenotify_checks", "auto_pull_after_integration", + "project.verify_cmd", "pr_policy.require_integration_audit", "pr_policy.skip_copilot_on_feature_branch", + "pr_policy.reviewer", "auto_rebuild_after_pull", "auto_reload_after_rebuild", + "pricing..rate_in", "pricing..rate_out", ] } diff --git a/src/workspace/launch.mbt b/src/workspace/launch.mbt index 11c46d12..9b938dad 100644 --- a/src/workspace/launch.mbt +++ b/src/workspace/launch.mbt @@ -59,10 +59,10 @@ pub fn agent_pid_file_path(project_dir : String, agent_id : String) -> String { ///| pub fn agent_pane_id_file_path( - _project_dir : String, - _agent_id : String, + project_dir : String, + agent_id : String, ) -> String { - "" + project_dir + "/.choir/pane-ids/" + @types.sanitize_agent_id(agent_id) } ///| diff --git a/src/workspace/multiplexer.mbt b/src/workspace/multiplexer.mbt index 76afad22..d6f4c8d9 100644 --- a/src/workspace/multiplexer.mbt +++ b/src/workspace/multiplexer.mbt @@ -153,11 +153,96 @@ pub fn zellij_list_panes_json_command(session : String) -> Command { } ///| -pub fn pane_id_from_list_panes_diff( - _before : String, - _after : String, -) -> String? { - None +pub fn zellij_list_panes_command(session : String) -> Command { + { + program: "env", + args: ["NO_COLOR=1", "zellij", "--session", session, "action", "list-panes"], + workdir: None, + } +} + +///| +pub fn zellij_dump_screen_command( + session : String, + pane_id : String, +) -> Command { + { + program: "zellij", + args: [ + "--session", session, "action", "dump-screen", "--pane-id", pane_id, "--full", + ], + workdir: None, + } +} + +///| +pub fn write_agent_pane_id_command( + project_dir : String, + agent_id : String, + pane_id : String, +) -> Command { + { + program: "sh", + args: [ + "-c", + "mkdir -p \"$1\" && printf '%s' \"$2\" > \"$3\"", + "choir-write-pane-id", + project_dir + "/.choir/pane-ids", + pane_id, + agent_pane_id_file_path(project_dir, agent_id), + ], + workdir: None, + } +} + +///| +fn list_panes_whitespace_fields(line : String) -> Array[String] { + let fields : Array[String] = [] + let trimmed = line.trim().to_owned() + let mut start = 0 + let mut i = 0 + while i <= trimmed.length() { + if i == trimmed.length() || trimmed[i] == ' ' || trimmed[i] == '\t' { + if i > start { + fields.push(trimmed[start:i].to_owned()) + } + start = i + 1 + } + i = i + 1 + } + fields +} + +///| +fn terminal_pane_ids_from_list_panes(output : String) -> Map[String, Bool] { + let ids : Map[String, Bool] = {} + let text = strip_ansi_csi_sgr(output) + for raw in text.split("\n") { + let fields = list_panes_whitespace_fields(raw.to_owned()) + if fields.length() >= 2 && + fields[0].has_prefix("terminal_") && + fields[1] == "terminal" { + ids.set(fields[0], true) + } + } + ids +} + +///| +pub fn pane_id_from_list_panes_diff(before : String, after : String) -> String? { + let before_ids = terminal_pane_ids_from_list_panes(before) + let after_ids = terminal_pane_ids_from_list_panes(after) + let diff : Array[String] = [] + for id, _ in after_ids { + if !before_ids.contains(id) { + diff.push(id) + } + } + if diff.length() == 1 { + Some(diff[0]) + } else { + None + } } ///|