From 94aad647d1604702ae833e45d6d29751df03a470 Mon Sep 17 00:00:00 2001 From: yui-stingray Date: Wed, 24 Jun 2026 10:34:11 +0900 Subject: [PATCH] fix(codex): bound bridge app-server stalls --- scripts/drivers/types/codex/codex-bridge.js | 232 +++++++++-- tests/test_codex_bridge.bats | 425 +++++++++++++++++++- 2 files changed, 626 insertions(+), 31 deletions(-) diff --git a/scripts/drivers/types/codex/codex-bridge.js b/scripts/drivers/types/codex/codex-bridge.js index 91130e4..411aab2 100755 --- a/scripts/drivers/types/codex/codex-bridge.js +++ b/scripts/drivers/types/codex/codex-bridge.js @@ -33,6 +33,12 @@ Options: --interval watch-once poll interval (default: 2). --max-wakes Stop after n wakeups, useful for tests. --stale-wake-limit Stop after n repeated unchanged wakeups (default: 1). + --connect-timeout-ms + Max wait for direct app-server connect/upgrade (default: 10000). + --request-timeout-ms + Max wait for each app-server request (default: 30000). + --watch-failure-limit + Stop after n consecutive watch-once failures; 0 disables (default: 3). --app-server Connect through an existing app-server endpoint. Supports unix://PATH or ws://host:port over WebSocket. --thread @@ -59,6 +65,9 @@ function parseArgs(argv) { interval: Number(process.env.AGMSG_WATCH_ONCE_INTERVAL || 2), maxWakes: 0, staleWakeLimit: Number(process.env.AGMSG_CODEX_BRIDGE_STALE_WAKE_LIMIT || 1), + connectTimeoutMs: Number(process.env.AGMSG_CODEX_BRIDGE_CONNECT_TIMEOUT_MS || 10000), + requestTimeoutMs: Number(process.env.AGMSG_CODEX_BRIDGE_REQUEST_TIMEOUT_MS || 30000), + watchFailureLimit: Number(process.env.AGMSG_CODEX_BRIDGE_WATCH_FAILURE_LIMIT || 3), inlineInbox: false, turnTimeout: Number(process.env.AGMSG_CODEX_BRIDGE_TURN_TIMEOUT || 60), }; @@ -85,6 +94,12 @@ function parseArgs(argv) { opts.maxWakes = Number(argv[++i]); } else if (arg === "--stale-wake-limit") { opts.staleWakeLimit = Number(argv[++i]); + } else if (arg === "--connect-timeout-ms") { + opts.connectTimeoutMs = Number(argv[++i]); + } else if (arg === "--request-timeout-ms") { + opts.requestTimeoutMs = Number(argv[++i]); + } else if (arg === "--watch-failure-limit") { + opts.watchFailureLimit = Number(argv[++i]); } else if (arg === "--turn-timeout") { opts.turnTimeout = Number(argv[++i]); } else if (arg === "--app-server") { @@ -108,6 +123,15 @@ function parseArgs(argv) { if (!Number.isFinite(opts.staleWakeLimit) || opts.staleWakeLimit < 0) { die("--stale-wake-limit must be a non-negative number"); } + if (!Number.isFinite(opts.connectTimeoutMs) || opts.connectTimeoutMs < 0) { + die("--connect-timeout-ms must be a non-negative number"); + } + if (!Number.isFinite(opts.requestTimeoutMs) || opts.requestTimeoutMs < 0) { + die("--request-timeout-ms must be a non-negative number"); + } + if (!Number.isFinite(opts.watchFailureLimit) || opts.watchFailureLimit < 0) { + die("--watch-failure-limit must be a non-negative number"); + } if (!Number.isFinite(opts.turnTimeout) || opts.turnTimeout < 0) { die("--turn-timeout must be a non-negative number"); } @@ -165,9 +189,10 @@ function resolveIdentity(opts) { } class AppServerClient { - constructor(command, cwd) { + constructor(command, cwd, opts = {}) { this.command = command; this.cwd = cwd; + this.requestTimeoutMs = opts.requestTimeoutMs || 0; this.nextId = 1; this.pending = new Map(); this.handlers = new Map(); @@ -231,7 +256,7 @@ class AppServerClient { } if (message.method && this.handlers.has(message.method)) { - this.handlers.get(message.method)(message.params || {}); + this.dispatch(message.method, message.params || {}); } } @@ -239,11 +264,35 @@ class AppServerClient { const id = this.nextId++; const payload = { jsonrpc: "2.0", id, method, params }; return new Promise((resolve, reject) => { - this.pending.set(id, { resolve, reject }); + let timer = null; + const clear = () => { + if (timer) { + clearTimeout(timer); + timer = null; + } + }; + const pending = { + resolve: (value) => { + clear(); + resolve(value); + }, + reject: (error) => { + clear(); + reject(error); + }, + }; + if (this.requestTimeoutMs > 0) { + timer = setTimeout(() => { + if (!this.pending.delete(id)) return; + reject(new Error(`app-server request '${method}' timed out after ${this.requestTimeoutMs}ms`)); + }, this.requestTimeoutMs); + if (timer.unref) timer.unref(); + } + this.pending.set(id, pending); this.child.stdin.write(`${JSON.stringify(payload)}\n`, (error) => { if (error) { this.pending.delete(id); - reject(error); + pending.reject(error); } }); }); @@ -253,6 +302,16 @@ class AppServerClient { this.child.stdin.write(`${JSON.stringify({ jsonrpc: "2.0", method, params })}\n`); } + dispatch(method, params) { + try { + Promise.resolve(this.handlers.get(method)(params)).catch((error) => { + console.error(`codex-bridge: ${method} handler failed: ${error.message}`); + }); + } catch (error) { + console.error(`codex-bridge: ${method} handler failed: ${error.message}`); + } + } + stop() { if (this.child && !this.child.killed) { this.child.kill("SIGTERM"); @@ -266,9 +325,11 @@ class AppServerClient { // `--app-server ws://host:port` (codex 0.141+ accepts only ws:// for `--remote`, // see #170). class WebSocketAppServerClient { - constructor(connectOptions, label) { + constructor(connectOptions, label, opts = {}) { this.connectOptions = connectOptions; this.label = label || "app-server"; + this.connectTimeoutMs = opts.connectTimeoutMs || 0; + this.requestTimeoutMs = opts.requestTimeoutMs || 0; this.nextId = 1; this.pending = new Map(); this.handlers = new Map(); @@ -282,12 +343,39 @@ class WebSocketAppServerClient { start() { this.startPromise = new Promise((resolve, reject) => { + let settled = false; + let timer = null; + const finish = (error) => { + if (settled) return; + settled = true; + if (timer) { + clearTimeout(timer); + timer = null; + } + if (error) { + reject(error); + } else { + resolve(); + } + }; const key = crypto.randomBytes(16).toString("base64"); this.expectedAccept = crypto .createHash("sha1") .update(`${key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11`) .digest("base64"); + if (this.connectTimeoutMs > 0) { + timer = setTimeout(() => { + const error = new Error( + `app-server websocket handshake timed out after ${this.connectTimeoutMs}ms (${this.label})`, + ); + this.rejectAll(error); + finish(error); + this.stop(); + }, this.connectTimeoutMs); + if (timer.unref) timer.unref(); + } + this.socket = net.createConnection(this.connectOptions); this.socket.on("connect", () => { this.socket.write( @@ -303,13 +391,15 @@ class WebSocketAppServerClient { ].join("\r\n"), ); }); - this.socket.on("data", (chunk) => this.handleData(chunk, resolve, reject)); + this.socket.on("data", (chunk) => this.handleData(chunk, () => finish(), finish)); this.socket.on("error", (error) => { this.rejectAll(error); - reject(error); + finish(error); }); this.socket.on("close", () => { - this.rejectAll(new Error(`app-server connection closed (${this.label})`)); + const error = new Error(`app-server connection closed (${this.label})`); + this.rejectAll(error); + if (!this.handshakeComplete) finish(error); }); }); } @@ -430,7 +520,7 @@ class WebSocketAppServerClient { return; } if (message.method && this.handlers.has(message.method)) { - this.handlers.get(message.method)(message.params || {}); + this.dispatch(message.method, message.params || {}); } } @@ -438,11 +528,35 @@ class WebSocketAppServerClient { const id = this.nextId++; const payload = { jsonrpc: "2.0", id, method, params }; return new Promise((resolve, reject) => { - this.pending.set(id, { resolve, reject }); + let timer = null; + const clear = () => { + if (timer) { + clearTimeout(timer); + timer = null; + } + }; + const pending = { + resolve: (value) => { + clear(); + resolve(value); + }, + reject: (error) => { + clear(); + reject(error); + }, + }; + if (this.requestTimeoutMs > 0) { + timer = setTimeout(() => { + if (!this.pending.delete(id)) return; + reject(new Error(`app-server request '${method}' timed out after ${this.requestTimeoutMs}ms`)); + }, this.requestTimeoutMs); + if (timer.unref) timer.unref(); + } + this.pending.set(id, pending); this.sendJson(payload, (error) => { if (error) { this.pending.delete(id); - reject(error); + pending.reject(error); } }); }); @@ -452,6 +566,16 @@ class WebSocketAppServerClient { this.sendJson({ jsonrpc: "2.0", method, params }); } + dispatch(method, params) { + try { + Promise.resolve(this.handlers.get(method)(params)).catch((error) => { + console.error(`codex-bridge: ${method} handler failed: ${error.message}`); + }); + } catch (error) { + console.error(`codex-bridge: ${method} handler failed: ${error.message}`); + } + } + sendJson(value, callback = () => {}) { if (!this.connected) { callback(new Error("app-server websocket is not connected")); @@ -514,6 +638,8 @@ class CodexBridge { this.wakeCount = 0; this.lastWakeMaxId = 0; this.staleWakeCount = 0; + this.watchFailureCount = 0; + this.watchRearmTimer = null; this.inlineInboxText = ""; this.stopping = false; this.pidfile = path.join(RUN_DIR, `codex-bridge.${identity.team}.${identity.name}.pid`); @@ -525,16 +651,16 @@ class CodexBridge { this.ensureSingleInstance(); this.writeMeta(); this.installSignals(); - this.client.on("process/exited", (params) => this.onProcessExited(params)); - this.client.on("error", (params) => this.onServerError(params)); - this.client.on("item/agentMessage/delta", (params) => this.onAgentMessageDelta(params)); - this.client.on("thread/status/changed", (params) => this.onThreadStatus(params)); - this.client.on("turn/started", () => { + this.client.on("process/exited", this.clientHandler("process/exited", (params) => this.onProcessExited(params))); + this.client.on("error", this.clientHandler("error", (params) => this.onServerError(params))); + this.client.on("item/agentMessage/delta", this.clientHandler("item/agentMessage/delta", (params) => this.onAgentMessageDelta(params))); + this.client.on("thread/status/changed", this.clientHandler("thread/status/changed", (params) => this.onThreadStatus(params))); + this.client.on("turn/started", this.clientHandler("turn/started", () => { this.turnActive = true; this.threadIdle = false; - }); - this.client.on("turn/completed", (params) => this.onTurnCompleted(params)); - this.client.on("turn/failed", () => this.onTurnCompleted()); + })); + this.client.on("turn/completed", this.clientHandler("turn/completed", (params) => this.onTurnCompleted(params))); + this.client.on("turn/failed", this.clientHandler("turn/failed", () => this.onTurnCompleted())); this.client.start(); await this.client.ready?.(); @@ -543,6 +669,21 @@ class CodexBridge { await this.armWatch(); } + clientHandler(method, handler) { + return (params) => { + try { + Promise.resolve(handler(params)).catch((error) => this.failClientHandler(method, error)); + } catch (error) { + this.failClientHandler(method, error); + } + }; + } + + failClientHandler(method, error) { + console.error(`codex-bridge: ${method} handler failed: ${error.message}`); + this.shutdown().finally(() => process.exit(1)); + } + writeMeta() { fs.writeFileSync(this.pidfile, `${process.pid}\n`); fs.writeFileSync( @@ -640,6 +781,7 @@ class CodexBridge { } async armWatch() { + this.clearWatchRearmTimer(); if (this.stopping || this.watchHandle) return; const handle = `agmsg-watch-${Date.now()}-${Math.random().toString(36).slice(2)}`; this.watchHandle = handle; @@ -656,13 +798,18 @@ class CodexBridge { "--interval", String(this.opts.interval), ]; - await this.client.request("process/spawn", { - command, - processHandle: handle, - cwd: this.opts.project, - outputBytesCap: 8192, - timeoutMs: (this.opts.timeout + this.opts.interval + 10) * 1000, - }); + try { + await this.client.request("process/spawn", { + command, + processHandle: handle, + cwd: this.opts.project, + outputBytesCap: 8192, + timeoutMs: (this.opts.timeout + this.opts.interval + 10) * 1000, + }); + } catch (error) { + if (this.watchHandle === handle) this.watchHandle = null; + throw error; + } console.error(`codex-bridge: armed ${this.identity.team}/${this.identity.name}`); } @@ -671,6 +818,7 @@ class CodexBridge { this.watchHandle = null; if (params.exitCode === 0) { + this.watchFailureCount = 0; const maxId = parseMaxId(params.stdout); if (this.isStaleWake(maxId)) { await this.shutdown(); @@ -684,13 +832,36 @@ class CodexBridge { } if (params.exitCode === 2) { + this.watchFailureCount = 0; await this.armWatch(); return; } + this.watchFailureCount += 1; const detail = [params.stderr, params.stdout].filter(Boolean).join("\n").trim(); console.error(`codex-bridge: watch-once failed with exit ${params.exitCode}${detail ? `: ${detail}` : ""}`); - setTimeout(() => this.armWatch().catch((error) => console.error(`codex-bridge: re-arm failed: ${error.message}`)), 5000); + if (this.opts.watchFailureLimit > 0 && this.watchFailureCount >= this.opts.watchFailureLimit) { + console.error( + `codex-bridge: stopping after ${this.watchFailureCount} consecutive watch-once failure(s)`, + ); + await this.shutdown(); + process.exit(1); + } + this.scheduleWatchRearm(); + } + + scheduleWatchRearm() { + if (this.stopping || this.watchHandle || this.watchRearmTimer) return; + this.watchRearmTimer = setTimeout(() => { + this.watchRearmTimer = null; + this.armWatch().catch((error) => this.failClientHandler("process/exited", error)); + }, 5000); + } + + clearWatchRearmTimer() { + if (!this.watchRearmTimer) return; + clearTimeout(this.watchRearmTimer); + this.watchRearmTimer = null; } onThreadStatus(params) { @@ -856,6 +1027,7 @@ class CodexBridge { async shutdown() { if (this.stopping) return; this.stopping = true; + this.clearWatchRearmTimer(); this.clearTurnWatchdog(); if (this.watchHandle) { try { @@ -970,13 +1142,13 @@ function createAppServerClient(opts) { const rawSocketPath = opts.appServer.slice("unix://".length); if (!rawSocketPath) die("--app-server unix:// requires a socket path"); const socketPath = path.isAbsolute(rawSocketPath) ? rawSocketPath : path.resolve(process.cwd(), rawSocketPath); - return new WebSocketAppServerClient({ path: socketPath }, `unix://${socketPath}`); + return new WebSocketAppServerClient({ path: socketPath }, `unix://${socketPath}`, opts); } if (opts.appServer && opts.appServer.startsWith("ws://")) { const target = parseWsTarget(opts.appServer); - return new WebSocketAppServerClient(target, opts.appServer); + return new WebSocketAppServerClient(target, opts.appServer, opts); } - return new AppServerClient(appServerCommand(opts), opts.project); + return new AppServerClient(appServerCommand(opts), opts.project, opts); } function readVersion() { diff --git a/tests/test_codex_bridge.bats b/tests/test_codex_bridge.bats index fb9226c..1d3b205 100644 --- a/tests/test_codex_bridge.bats +++ b/tests/test_codex_bridge.bats @@ -4,7 +4,7 @@ load test_helper setup() { setup_test_env - export PROJ="/tmp/agmsg-codex-bridge-proj" + export PROJ="$TEST_SKILL_DIR/proj" mkdir -p "$PROJ" bash "$SCRIPTS/join.sh" team alice codex "$PROJ" >/dev/null bash "$SCRIPTS/join.sh" team bob codex "$PROJ" >/dev/null @@ -15,6 +15,46 @@ teardown() { teardown_test_env } +write_bridge_timeout_runner() { + local runner="$TEST_SKILL_DIR/run-with-timeout.js" + cat >"$runner" <<'EOF' +const { spawn } = require("child_process"); + +const timeoutMs = Number(process.argv[2]); +const command = process.argv[3]; +const args = process.argv.slice(4); +let timedOut = false; +let exited = false; +let stdout = ""; +let stderr = ""; + +const child = spawn(command, args, { env: process.env, stdio: ["ignore", "pipe", "pipe"] }); +child.stdout.on("data", (chunk) => { stdout += chunk; }); +child.stderr.on("data", (chunk) => { stderr += chunk; }); +child.on("error", (error) => { + process.stderr.write(error.message); + process.exit(127); +}); + +const timer = setTimeout(() => { + timedOut = true; + child.kill("SIGTERM"); + setTimeout(() => { + if (!exited) child.kill("SIGKILL"); + }, 250).unref(); +}, timeoutMs); + +child.on("close", (code) => { + exited = true; + clearTimeout(timer); + process.stdout.write(stdout); + process.stderr.write(stderr); + process.exit(timedOut ? 124 : (code ?? 1)); +}); +EOF + printf '%s\n' "$runner" +} + @test "codex-bridge: help exits successfully" { run node "$TYPES/codex/codex-bridge.js" --help [ "$status" -eq 0 ] @@ -328,6 +368,163 @@ EOF grep -q "thread/resume" "$log" } +@test "codex-bridge: times out when a websocket upgrade never completes" { + skip_on_windows "codex bridge identity resolution on Windows (#182)" + run node -e 'const net = require("net"); if (!net) process.exit(1);' + if [ "$status" -ne 0 ]; then + skip "node net module is not available in this sandbox" + fi + + local fake="$TEST_SKILL_DIR/fake-ws-handshake-stall.js" + local portfile="$TEST_SKILL_DIR/fake-ws-handshake-stall.port" + local log="$TEST_SKILL_DIR/fake-ws-handshake-stall.log" + rm -f "$portfile" + cat >"$fake" <<'EOF' +const fs = require("fs"); +const net = require("net"); + +const portfile = process.argv[2]; +const log = process.argv[3]; + +const server = net.createServer((socket) => { + fs.appendFileSync(log, "accepted\n"); + socket.on("data", () => { + fs.appendFileSync(log, "received-handshake\n"); + }); + socket.on("close", () => server.close(() => process.exit(0))); +}); + +server.listen(0, "127.0.0.1", () => { + fs.writeFileSync(portfile, String(server.address().port)); +}); +EOF + + node "$fake" "$portfile" "$log" & + local server_pid="$!" + for _ in {1..50}; do + [ -s "$portfile" ] && break + sleep 0.1 + done + local port + port="$(cat "$portfile")" + local runner + runner="$(write_bridge_timeout_runner)" + + run node "$runner" 3000 node "$TYPES/codex/codex-bridge.js" \ + --project "$PROJ" --team team --name alice --thread thread-existing \ + --app-server "ws://127.0.0.1:$port" --connect-timeout-ms 100 --request-timeout-ms 1000 \ + --timeout 1 --interval 1 + + kill "$server_pid" 2>/dev/null || true + + [ "$status" -eq 1 ] + [[ "$output" =~ "websocket handshake timed out" ]] + grep -q "accepted" "$log" + grep -q "received-handshake" "$log" +} + +@test "codex-bridge: times out when an app-server request never responds" { + skip_on_windows "codex bridge identity resolution on Windows (#182)" + run node -e 'const net = require("net"); const crypto = require("crypto"); if (!net || !crypto) process.exit(1);' + if [ "$status" -ne 0 ]; then + skip "node net/crypto modules are not available in this sandbox" + fi + + local fake="$TEST_SKILL_DIR/fake-ws-request-stall.js" + local portfile="$TEST_SKILL_DIR/fake-ws-request-stall.port" + local log="$TEST_SKILL_DIR/fake-ws-request-stall.log" + rm -f "$portfile" + cat >"$fake" <<'EOF' +const crypto = require("crypto"); +const fs = require("fs"); +const net = require("net"); + +const portfile = process.argv[2]; +const log = process.argv[3]; + +const server = net.createServer((socket) => { + const state = { upgraded: false, header: Buffer.alloc(0) }; + socket.on("data", (chunk) => { + if (!state.upgraded) { + state.header = Buffer.concat([state.header, chunk]); + const end = state.header.indexOf("\r\n\r\n"); + if (end === -1) return; + const header = state.header.slice(0, end).toString("utf8"); + const key = (header.match(/Sec-WebSocket-Key: (.*)\r\n/i) || [])[1].trim(); + const accept = crypto.createHash("sha1").update(`${key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11`).digest("base64"); + socket.write(["HTTP/1.1 101 Switching Protocols", "Upgrade: websocket", "Connection: Upgrade", `Sec-WebSocket-Accept: ${accept}`, "", ""].join("\r\n")); + fs.appendFileSync(log, "upgraded\n"); + state.upgraded = true; + return; + } + fs.appendFileSync(log, "ignored-frame\n"); + }); + socket.on("close", () => server.close(() => process.exit(0))); +}); + +server.listen(0, "127.0.0.1", () => { + fs.writeFileSync(portfile, String(server.address().port)); +}); +EOF + + node "$fake" "$portfile" "$log" & + local server_pid="$!" + for _ in {1..50}; do + [ -s "$portfile" ] && break + sleep 0.1 + done + local port + port="$(cat "$portfile")" + local runner + runner="$(write_bridge_timeout_runner)" + + run node "$runner" 3000 node "$TYPES/codex/codex-bridge.js" \ + --project "$PROJ" --team team --name alice --thread thread-existing \ + --app-server "ws://127.0.0.1:$port" --connect-timeout-ms 1000 --request-timeout-ms 100 \ + --timeout 1 --interval 1 + + kill "$server_pid" 2>/dev/null || true + + [ "$status" -eq 1 ] + [[ "$output" =~ "app-server request 'initialize' timed out" ]] + grep -q "upgraded" "$log" + grep -q "ignored-frame" "$log" +} + +@test "codex-bridge: times out when a stdio app-server request never responds" { + run node -e 'const r = require("child_process").spawnSync("/bin/sh", ["-c", "true"]); if (r.error) { console.error(r.error.message); process.exit(1); }' + if [ "$status" -ne 0 ]; then + skip "node child_process.spawn is not available in this sandbox" + fi + + local fake="$TEST_SKILL_DIR/fake-stdio-request-stall.js" + local log="$TEST_SKILL_DIR/fake-stdio-request-stall.log" + cat >"$fake" <<'EOF' +const fs = require("fs"); +const readline = require("readline"); + +const log = process.argv[2]; +const rl = readline.createInterface({ input: process.stdin }); + +rl.on("line", (line) => { + const message = JSON.parse(line); + fs.appendFileSync(log, `${message.method}\n`); + // Deliberately keep the process alive and never answer initialize. +}); +EOF + + local runner + runner="$(write_bridge_timeout_runner)" + + AGMSG_CODEX_APP_SERVER_CMD="node $fake $log" run node "$runner" 3000 node "$TYPES/codex/codex-bridge.js" \ + --project "$PROJ" --team team --name alice --thread thread-existing \ + --request-timeout-ms 500 --timeout 1 --interval 1 + + [ "$status" -eq 1 ] + [[ "$output" =~ "app-server request 'initialize' timed out" ]] + grep -q "initialize" "$log" +} + @test "codex-bridge: refuses when the same identity already has a live bridge" { skip_on_windows "codex bridge identity resolution on Windows (#182)" mkdir -p "$TEST_SKILL_DIR/run" @@ -661,6 +858,232 @@ EOF [[ "$output" =~ "stopping to avoid a repeated wakeup loop" ]] } +@test "codex-bridge: stops after the configured watch-once failure limit" { + run node -e 'const r = require("child_process").spawnSync("/bin/sh", ["-c", "true"]); if (r.error) { console.error(r.error.message); process.exit(1); }' + if [ "$status" -ne 0 ]; then + skip "node child_process.spawn is not available in this sandbox" + fi + + local fake="$TEST_SKILL_DIR/fake-app-server-watch-fail.js" + cat >"$fake" <<'EOF' +const readline = require("readline"); +const rl = readline.createInterface({ input: process.stdin }); + +function send(value) { + process.stdout.write(`${JSON.stringify(value)}\n`); +} + +rl.on("line", (line) => { + const message = JSON.parse(line); + if (message.method === "initialize") { + send({ jsonrpc: "2.0", id: message.id, result: {} }); + } else if (message.method === "thread/start") { + send({ jsonrpc: "2.0", id: message.id, result: { thread: { id: "thread-1", status: { type: "idle" } } } }); + } else if (message.method === "process/spawn") { + send({ jsonrpc: "2.0", id: message.id, result: {} }); + setTimeout(() => { + send({ + jsonrpc: "2.0", + method: "process/exited", + params: { + processHandle: message.params.processHandle, + exitCode: 1, + stdout: "", + stderr: "fake watch failure", + }, + }); + }, 10); + } else if (message.method === "process/kill") { + send({ jsonrpc: "2.0", id: message.id, result: {} }); + } +}); +EOF + + AGMSG_CODEX_APP_SERVER_CMD="node $fake" run node "$TYPES/codex/codex-bridge.js" \ + --project "$PROJ" --team team --name alice --timeout 1 --interval 1 \ + --request-timeout-ms 1000 --watch-failure-limit 1 + + [ "$status" -eq 1 ] + [[ "$output" =~ "watch-once failed with exit 1: fake watch failure" ]] + [[ "$output" =~ "stopping after 1 consecutive watch-once failure" ]] +} + +@test "codex-bridge: watch-once timeout exit does not count toward failure limit" { + run node -e 'const r = require("child_process").spawnSync("/bin/sh", ["-c", "true"]); if (r.error) { console.error(r.error.message); process.exit(1); }' + if [ "$status" -ne 0 ]; then + skip "node child_process.spawn is not available in this sandbox" + fi + + local fake="$TEST_SKILL_DIR/fake-app-server-watch-timeout-then-wake.js" + cat >"$fake" <<'EOF' +const readline = require("readline"); +const rl = readline.createInterface({ input: process.stdin }); +let spawns = 0; + +function send(value) { + process.stdout.write(`${JSON.stringify(value)}\n`); +} + +rl.on("line", (line) => { + const message = JSON.parse(line); + if (message.method === "initialize") { + send({ jsonrpc: "2.0", id: message.id, result: {} }); + } else if (message.method === "thread/start") { + send({ jsonrpc: "2.0", id: message.id, result: { thread: { id: "thread-1", status: { type: "idle" } } } }); + } else if (message.method === "process/spawn") { + spawns += 1; + send({ jsonrpc: "2.0", id: message.id, result: {} }); + setTimeout(() => { + if (spawns === 1) { + send({ + jsonrpc: "2.0", + method: "process/exited", + params: { processHandle: message.params.processHandle, exitCode: 2, stdout: "", stderr: "" }, + }); + return; + } + send({ + jsonrpc: "2.0", + method: "process/exited", + params: { + processHandle: message.params.processHandle, + exitCode: 0, + stdout: "status=pending count=1 max_id=1\n", + stderr: "", + }, + }); + }, 10); + } else if (message.method === "turn/start") { + send({ jsonrpc: "2.0", id: message.id, result: {} }); + setTimeout(() => { + send({ jsonrpc: "2.0", method: "turn/completed", params: { threadId: message.params.threadId, turn: { id: "turn-1" } } }); + }, 10); + } else if (message.method === "process/kill") { + send({ jsonrpc: "2.0", id: message.id, result: {} }); + } +}); +EOF + + AGMSG_CODEX_APP_SERVER_CMD="node $fake" run node "$TYPES/codex/codex-bridge.js" \ + --project "$PROJ" --team team --name alice --timeout 1 --interval 1 \ + --request-timeout-ms 1000 --watch-failure-limit 1 --max-wakes 1 + + [ "$status" -eq 0 ] + [[ "$output" =~ "wakeup 1" ]] + [[ "$output" != *"stopping after"* ]] +} + +@test "codex-bridge: re-arm spawn request timeout exits without a phantom watch" { + run node -e 'const r = require("child_process").spawnSync("/bin/sh", ["-c", "true"]); if (r.error) { console.error(r.error.message); process.exit(1); }' + if [ "$status" -ne 0 ]; then + skip "node child_process.spawn is not available in this sandbox" + fi + + local fake="$TEST_SKILL_DIR/fake-app-server-rearm-spawn-stall.js" + cat >"$fake" <<'EOF' +const readline = require("readline"); +const rl = readline.createInterface({ input: process.stdin }); +let spawns = 0; + +function send(value) { + process.stdout.write(`${JSON.stringify(value)}\n`); +} + +rl.on("line", (line) => { + const message = JSON.parse(line); + if (message.method === "initialize") { + send({ jsonrpc: "2.0", id: message.id, result: {} }); + } else if (message.method === "thread/start") { + send({ jsonrpc: "2.0", id: message.id, result: { thread: { id: "thread-1", status: { type: "idle" } } } }); + } else if (message.method === "process/spawn") { + spawns += 1; + if (spawns === 1) { + send({ jsonrpc: "2.0", id: message.id, result: {} }); + setTimeout(() => { + send({ + jsonrpc: "2.0", + method: "process/exited", + params: { processHandle: message.params.processHandle, exitCode: 2, stdout: "", stderr: "" }, + }); + }, 10); + } + // The second process/spawn deliberately never receives a response. + } else if (message.method === "process/kill") { + send({ jsonrpc: "2.0", id: message.id, result: {} }); + } +}); +EOF + + local runner + runner="$(write_bridge_timeout_runner)" + + AGMSG_CODEX_APP_SERVER_CMD="node $fake" run node "$runner" 3000 node "$TYPES/codex/codex-bridge.js" \ + --project "$PROJ" --team team --name alice --timeout 1 --interval 1 \ + --request-timeout-ms 500 --watch-failure-limit 1 + + [ "$status" -eq 1 ] + [[ "$output" =~ "process/exited handler failed: app-server request 'process/spawn' timed out" ]] +} + +@test "codex-bridge: delayed re-arm after sub-limit watch failure times out fatally" { + run node -e 'const r = require("child_process").spawnSync("/bin/sh", ["-c", "true"]); if (r.error) { console.error(r.error.message); process.exit(1); }' + if [ "$status" -ne 0 ]; then + skip "node child_process.spawn is not available in this sandbox" + fi + + local fake="$TEST_SKILL_DIR/fake-app-server-delayed-rearm-stall.js" + cat >"$fake" <<'EOF' +const readline = require("readline"); +const rl = readline.createInterface({ input: process.stdin }); +let spawns = 0; + +function send(value) { + process.stdout.write(`${JSON.stringify(value)}\n`); +} + +rl.on("line", (line) => { + const message = JSON.parse(line); + if (message.method === "initialize") { + send({ jsonrpc: "2.0", id: message.id, result: {} }); + } else if (message.method === "thread/start") { + send({ jsonrpc: "2.0", id: message.id, result: { thread: { id: "thread-1", status: { type: "idle" } } } }); + } else if (message.method === "process/spawn") { + spawns += 1; + if (spawns === 1) { + send({ jsonrpc: "2.0", id: message.id, result: {} }); + setTimeout(() => { + send({ + jsonrpc: "2.0", + method: "process/exited", + params: { + processHandle: message.params.processHandle, + exitCode: 1, + stdout: "", + stderr: "fake transient watch failure", + }, + }); + }, 10); + } + // The delayed re-arm process/spawn deliberately never receives a response. + } else if (message.method === "process/kill") { + send({ jsonrpc: "2.0", id: message.id, result: {} }); + } +}); +EOF + + local runner + runner="$(write_bridge_timeout_runner)" + + AGMSG_CODEX_APP_SERVER_CMD="node $fake" run node "$runner" 10000 node "$TYPES/codex/codex-bridge.js" \ + --project "$PROJ" --team team --name alice --timeout 1 --interval 1 \ + --request-timeout-ms 3000 --watch-failure-limit 2 + + [ "$status" -eq 1 ] + [[ "$output" =~ "watch-once failed with exit 1: fake transient watch failure" ]] + [[ "$output" =~ "process/exited handler failed: app-server request 'process/spawn' timed out" ]] + [[ "$output" != *"stopping after 2 consecutive watch-once failure"* ]] +} + # --- re-arm regression (#41): real app-server may never send turn/completed --- @test "codex-bridge: re-arms after a turn via the watchdog when no turn/completed arrives" {