Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/bin/choir/init.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,7 @@ fn init_default_terminal_session(project_dir : String) -> String {

///|
fn init_config_toml_content() -> String {
"worker_no_handoff_prod_sec = 120\n" +
"worker_no_handoff_escalate_sec = 120\n" +
"worker_no_handoff_idle_sec = 120\n" +
"auto_pull_after_integration = true\n" +
"auto_rebuild_after_pull = false\n" +
"auto_reload_after_rebuild = false\n" +
Expand All @@ -492,7 +491,7 @@ test "init config.toml scaffold omits machine local fields" {
assert_false(content.contains("project_dir"))
assert_false(content.contains("listen_uds"))
assert_false(content.contains("terminal_session"))
assert_true(content.contains("worker_no_handoff_prod_sec = 120"))
assert_true(content.contains("worker_no_handoff_idle_sec = 120"))
assert_true(content.contains("[pr_policy]"))
}

Expand Down
24 changes: 21 additions & 3 deletions src/bin/choir/uds_server.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,23 @@ fn grow_connection_buffer(buf : FixedArray[Byte]) -> FixedArray[Byte] {
next
}

///|
async fn serve_dump_pane_screen(session : String, pane_id : String) -> String {
let (status, output) = @exec.capture_command_stdout(
@workspace.zellij_dump_screen_command(session, pane_id),
)
if status == 0 {
output
} else {
""
}
}

///|
fn serve_pane_watcher() -> @runtime.PaneWatcher {
@runtime.PaneWatcher::with_host({ dump_pane_screen: serve_dump_pane_screen })
}

///|
struct ServeUDSClient {
fd : Int
Expand Down Expand Up @@ -1368,7 +1385,9 @@ fn serve_run() -> Unit {
}
}
let provider = @server.make_recovery_provider()
let state = @server.ServerState::new(config, provider)
let state = @server.ServerState::new(config, provider).with_pane_watcher(
serve_pane_watcher(),
)
@moontrace.set_global_field("service", "choir".to_json())
@moontrace.set_global_field("run_id", state.run_id.to_json())
@prompts.emit_prompt_drift_warnings(
Expand Down Expand Up @@ -1433,8 +1452,7 @@ fn serve_run() -> Unit {
now,
300L,
@exec.run_sequence,
worker_no_handoff_prod_sec=config.worker_no_handoff_prod_sec,
worker_no_handoff_escalate_sec=config.worker_no_handoff_escalate_sec,
worker_no_handoff_idle_sec=config.worker_no_handoff_idle_sec,
)
} catch {
e =>
Expand Down
21 changes: 4 additions & 17 deletions src/config/config.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -554,23 +554,11 @@ pub fn parse_config(
Ok(v) => v
Err(e) => return Err(e)
}
let worker_no_handoff_prod_sec = match doc.get("worker_no_handoff_prod_sec") {
None => default_cfg.worker_no_handoff_prod_sec
let worker_no_handoff_idle_sec = match doc.get("worker_no_handoff_idle_sec") {
None => default_cfg.worker_no_handoff_idle_sec
Some(raw) =>
match
parse_nonnegative_int64_config_value("worker_no_handoff_prod_sec", raw) {
Ok(v) => v
Err(e) => return Err(e)
}
}
let worker_no_handoff_escalate_sec = match
doc.get("worker_no_handoff_escalate_sec") {
None => default_cfg.worker_no_handoff_escalate_sec
Some(raw) =>
match
parse_nonnegative_int64_config_value(
"worker_no_handoff_escalate_sec", raw,
) {
parse_nonnegative_int64_config_value("worker_no_handoff_idle_sec", raw) {
Ok(v) => v
Err(e) => return Err(e)
}
Expand Down Expand Up @@ -677,8 +665,7 @@ pub fn parse_config(
moon_verify_target,
agy_migration_incomplete,
hooks_enforcement,
worker_no_handoff_prod_sec,
worker_no_handoff_escalate_sec,
worker_no_handoff_idle_sec,
poller_heartbeat_tick_sec,
prenotify_checks,
auto_pull_after_integration,
Expand Down
29 changes: 12 additions & 17 deletions src/config/config_test.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ test "parse_config - full" {
#|companions_json = '[{"name":"monitor","role":"worker","command":"claude","task":"Monitor the build"}]'
#|plugins_json = '[{"name":"plugin","command":"./plugin","tools":"search"}]'
#|hooks_enforcement = true
#|worker_no_handoff_prod_sec = 90
#|worker_no_handoff_escalate_sec = 45
#|worker_no_handoff_idle_sec = 90
#|poller_heartbeat_tick_sec = 10
#|prenotify_checks = ["moon check --target native", "CHOIR_TEST_NO_EXEC=1 moon test --target native", "CHOIR_TEST_REAL_EXEC=1 moon test --target native src/exec", "moon run src/bin/choir_lint --target native"]
#|moon_verify_target = "js"
Expand Down Expand Up @@ -53,8 +52,7 @@ test "parse_config - full" {
"[{\"name\":\"plugin\",\"command\":\"./plugin\",\"tools\":\"search\"}]",
),
)
assert_eq(cfg.worker_no_handoff_prod_sec, 90L)
assert_eq(cfg.worker_no_handoff_escalate_sec, 45L)
assert_eq(cfg.worker_no_handoff_idle_sec, 90L)
assert_eq(cfg.hooks_enforcement, true)
assert_eq(cfg.poller_heartbeat_tick_sec, 10)
@debug.assert_eq(cfg.prenotify_checks, [
Expand Down Expand Up @@ -149,8 +147,7 @@ test "parse_config - minimal" {
assert_eq(cfg.automerge, false)
assert_eq(cfg.review_mode, @types.ReviewMode::SinglePass)
assert_eq(cfg.merge_strategy, @types.MergeStrategy::Merge)
assert_eq(cfg.worker_no_handoff_prod_sec, 120L)
assert_eq(cfg.worker_no_handoff_escalate_sec, 120L)
assert_eq(cfg.worker_no_handoff_idle_sec, 120L)
assert_eq(cfg.hooks_enforcement, false)
assert_eq(cfg.poller_heartbeat_tick_sec, 60)
@debug.assert_eq(cfg.prenotify_checks, [])
Expand Down Expand Up @@ -336,32 +333,30 @@ test "parse_config pr_policy reviewer explicit copilot" {
}

///|
test "parse_config worker no-handoff knobs accept zero disable" {
test "parse_config worker no-handoff idle knob accepts zero disable" {
let cfg = parse_config(
(
#|worker_no_handoff_prod_sec = 0
#|worker_no_handoff_escalate_sec = 0
#|worker_no_handoff_idle_sec = 0
#|poller_heartbeat_tick_sec = 0
),
).unwrap()
assert_eq(cfg.worker_no_handoff_prod_sec, 0L)
assert_eq(cfg.worker_no_handoff_escalate_sec, 0L)
assert_eq(cfg.worker_no_handoff_idle_sec, 0L)
assert_eq(cfg.poller_heartbeat_tick_sec, 0)
}

///|
test "parse_config worker no-handoff knobs reject negatives" {
match parse_config("worker_no_handoff_prod_sec = -1") {
test "parse_config worker no-handoff idle knob rejects negatives" {
match parse_config("worker_no_handoff_idle_sec = -1") {
Err(e) => assert_true(e.message().contains("expected >= 0"))
Ok(_) => fail("expected negative prod config to fail")
Ok(_) => fail("expected negative idle config to fail")
}
}

///|
test "parse_config worker no-handoff knobs reject fractional values" {
match parse_config("worker_no_handoff_escalate_sec = 1.5") {
test "parse_config worker no-handoff idle knob rejects fractional values" {
match parse_config("worker_no_handoff_idle_sec = 1.5") {
Err(e) => assert_true(e.message().contains("expected integer"))
Ok(_) => fail("expected fractional escalate config to fail")
Ok(_) => fail("expected fractional idle config to fail")
}
}

Expand Down
119 changes: 35 additions & 84 deletions src/runtime/pane_watch.mbt
Original file line number Diff line number Diff line change
@@ -1,26 +1,20 @@
///|
/// Path for the zellij pane-viewport JSON snapshot for `agent_id` (under `/tmp`).
pub fn pane_snapshot_path(agent_id : String) -> String {
"/tmp/choir-viewport-" + agent_id + ".json"
pub(all) struct PaneWatchHost {
dump_pane_screen : async (String, String) -> String
}

///|
/// Host I/O for pane viewport subscribe: zellij child processes, pid cleanup, and snapshot files.
pub(all) struct PaneWatchHost {
spawn_zellij_pane_viewport_subscribe : (String, String, String) -> Int
kill_pid_best_effort : (Int) -> Unit
delete_file_sync : (String) -> Unit
read_file : (String) -> String
async fn native_dump_pane_screen(
_session : String,
_pane_id : String,
) -> String {
@async.sleep(0)
""
}

///|
pub fn PaneWatchHost::native() -> PaneWatchHost {
{
spawn_zellij_pane_viewport_subscribe: @sys.spawn_zellij_pane_viewport_subscribe,
kill_pid_best_effort: @sys.kill_pid_best_effort,
delete_file_sync: fn(path) { ignore(@sys.delete_file_sync(path)) },
read_file: @sys.read_file,
}
{ dump_pane_screen: native_dump_pane_screen }
}

///|
Expand All @@ -34,7 +28,8 @@ pub(all) struct WatchObservation {

///|
pub(all) struct WatchState {
pid : Int
session : String
pane_id : String
mut observation : WatchObservation
}

Expand Down Expand Up @@ -66,84 +61,41 @@ pub fn PaneWatcher::watch_pane(
agent_id : String,
target : @types.LocalTarget,
now : Int64,
pane_id? : String = "",
) -> Unit {
let snapshot_path = pane_snapshot_path(agent_id)
let pid = (self.host.spawn_zellij_pane_viewport_subscribe)(
target.session,
target.name,
snapshot_path,
)
if pid > 0 {
self.watches.set(agent_id, {
pid,
observation: { last_change_at: now, last_hash: 0, last_len: 0 },
})
let resolved = pane_id.trim().to_owned()
if resolved == "" {
return
}
self.watches.set(agent_id, {
session: target.session,
pane_id: resolved,
observation: { last_change_at: now, last_hash: 0, last_len: 0 },
})
}

///|
pub fn PaneWatcher::stop_subscribe(
self : PaneWatcher,
agent_id : String,
) -> Unit {
match self.watches.get(agent_id) {
Some(watch) => {
self.watches.remove(agent_id)
(self.host.kill_pid_best_effort)(watch.pid)
}
None => ()
}
(self.host.delete_file_sync)(pane_snapshot_path(agent_id))
self.watches.remove(agent_id)
}

///|
pub fn PaneWatcher::read_viewport(
pub async fn PaneWatcher::current_screen(
self : PaneWatcher,
agent_id : String,
) -> String {
let snapshot_path = pane_snapshot_path(agent_id)
(self.host.read_file)(snapshot_path)
}

///|
pub fn parse_int_simple(s : String) -> Int {
let mut result = 0
for c in s {
let d = c.to_int() - '0'.to_int()
if d >= 0 && d <= 9 {
result = result * 10 + d
}
}
result
}

///|
/// Extract viewport text from a zellij subscribe JSON snapshot.
/// Format: {"event":"pane_update","viewport":["line1","line2",...]}
pub fn extract_viewport_text(json_str : String) -> String {
if json_str == "" {
return ""
) -> String? {
let watch = match self.watches.get(agent_id) {
Some(watch) => watch
None => return None
}
let parsed : Json = @json.parse(json_str) catch { _ => return json_str }
match parsed {
Object(obj) =>
match obj.get("viewport") {
Some(Array(lines)) => {
let buf = StringBuilder::new()
for line in lines {
match line {
String(s) => {
buf.write_string(s)
buf.write_char('\n')
}
_ => ()
}
}
buf.to_string()
}
_ => json_str
}
_ => json_str
let screen = (self.host.dump_pane_screen)(watch.session, watch.pane_id)
if screen.trim().to_owned() == "" {
None
} else {
Some(screen)
}
}

Expand Down Expand Up @@ -182,7 +134,7 @@ fn PaneWatcher::record_observation(
/// Checks if the agent's pane has been idle. Returns `Some(screen_text)` if idle, `None` if not idle or not being watched.
/// This is the pane-watch idle/no-heartbeat proxy signal; it emits no hard
/// disconnect notification by itself.
pub fn PaneWatcher::observe_idle(
pub async fn PaneWatcher::observe_idle(
self : PaneWatcher,
agent_id : String,
now : Int64,
Expand All @@ -191,11 +143,10 @@ pub fn PaneWatcher::observe_idle(
Some(watch) => watch
None => return None
}
let snapshot = self.read_viewport(agent_id)
if snapshot.trim().to_owned() == "" {
let screen = (self.host.dump_pane_screen)(watch.session, watch.pane_id)
if screen.trim().to_owned() == "" {
return None
}
let screen = extract_viewport_text(snapshot)
let hash = simple_hash(screen)
let len = screen.length()
let observation = watch.observation
Expand All @@ -213,7 +164,7 @@ pub fn PaneWatcher::observe_idle(
/// Checks if the agent's pane has been idle. Returns `Some(screen_text)` if idle, `None` if not idle or not being watched.
/// This is the pane-watch idle/no-heartbeat proxy signal; it emits no hard
/// disconnect notification by itself.
pub fn PaneWatcher::check_idle(
pub async fn PaneWatcher::check_idle(
self : PaneWatcher,
agent_id : String,
now : Int64,
Expand Down
Loading
Loading