From 0f83f3aeaee70a45ad5dbfef0cfa3eefebf0981a Mon Sep 17 00:00:00 2001 From: QuanCheng <915158214@qq.com> Date: Sat, 27 Jun 2026 08:32:12 +0000 Subject: [PATCH] reconnect workspace adapters after liveness loss --- packages/agent-connector/src/adapters/base.js | 435 ++++++++++- .../agent-connector/src/workspace-client.js | 47 +- .../test/adapter-reconnect.test.js | 728 ++++++++++++++++++ 3 files changed, 1194 insertions(+), 16 deletions(-) create mode 100644 packages/agent-connector/test/adapter-reconnect.test.js diff --git a/packages/agent-connector/src/adapters/base.js b/packages/agent-connector/src/adapters/base.js index 18005fda8..df146695c 100644 --- a/packages/agent-connector/src/adapters/base.js +++ b/packages/agent-connector/src/adapters/base.js @@ -23,6 +23,25 @@ const { defaultAgentWorkdir } = require('../paths'); const DEFAULT_ENDPOINT = 'https://workspace-endpoint.openagents.org'; +// ── Connection self-healing (V1) tunables ── +// Heartbeat cadence (unchanged from legacy setInterval value). +const HEARTBEAT_INTERVAL_MS = 30000; +// A reconnect is only triggered when BOTH conditions hold (AND, not OR): +// • time since last confirmed liveness ≥ RECONNECT_AFTER_MS (we are almost +// certainly past the server's AGENT_TIMEOUT, so it considers us offline), and +// • ≥ RECONNECT_FAIL_THRESHOLD consecutive heartbeat failures. +// This avoids re-joining (and rotating sessions) on brief blips the server +// hasn't yet timed out. +const RECONNECT_AFTER_MS = 90000; +const RECONNECT_FAIL_THRESHOLD = 3; +// 404 / missing-session_id / unknown join outcomes are "ambiguous": retried a +// bounded number of times, then treated as terminal to avoid an infinite spin. +const JOIN_MAX_ATTEMPTS_AMBIGUOUS = 5; +// Join backoff: 2s → 4s → 8s → 16s → 30s cap, with ±20% jitter to spread a +// fleet of agents recovering at once. +const JOIN_BACKOFF_BASE_MS = 2000; +const JOIN_BACKOFF_MAX_MS = 30000; + class BaseAdapter { /** * @param {object} opts @@ -59,6 +78,37 @@ class BaseAdapter { // workspace flag must reconnect/restart to pick up the change (matches // the Python adapter behavior in workspace_prompt.py). this._browserEnabledCache = null; + // ── Connection self-healing state (V1) ── + // Kill-switch: OA_ADAPTER_RECONNECT=0 → run the exact pre-patch lifecycle. + this._reconnectEnabled = process.env.OA_ADAPTER_RECONNECT !== '0'; + // join attempt generation: bumped at the start of every (re)connect attempt. + this._attemptSeq = 0; + // active session generation: the attempt number of the live session, or + // null when no session is active (incl. the whole reconnect window). Async + // callbacks (heartbeat/poll/sleep) may only mutate connection state when + // their captured gen === _activeGen. + this._activeGen = null; + // Two distinct reconnect states (never conflated): + // _reconnectPending → threshold met but an active task is running, so + // the real reconnect is deferred (session stays live). + // _reconnectInProgress → reconnect has formally begun; _activeGen is null. + this._reconnectPending = false; + this._reconnectInProgress = false; + // ms timestamp of last confirmed liveness (successful join OR heartbeat). + this._lastLivenessOkAt = 0; + this._consecutiveHeartbeatFailures = 0; + // Terminal reason that must STOP the adapter without auto-reconnect: + // 'session_revoked' | 'auth' | 'join_failed' | null. + this._terminalReason = null; + // Coarse connection state, for logs/internal judgement only (NOT exposed to + // daemon status). 'idle'|'connecting'|'connected'|'reconnecting'|'disconnected'|'session_revoked'. + this._connState = 'idle'; + this._firstConnect = true; + // All interruptible-sleep wakers; _wakeAll() resolves every pending sleep. + this._waiters = new Set(); + // Injectable clock/jitter for deterministic tests (default real impls). + this._clock = null; + this._jitter = null; // Wall-clock timestamp of adapter init, used by the `status` control // action to report uptime back to the channel. Reset on reinstantiation // (e.g. after a `restart` IPC bounce) so uptime tracks "time since last @@ -75,6 +125,17 @@ class BaseAdapter { // ------------------------------------------------------------------ async run() { + // Kill-switch: when disabled, run the EXACT pre-patch lifecycle so the + // behavior is provably identical to before this patch. + if (!this._reconnectEnabled) return this._runLegacyLifecycle(); + return this._runSupervisedLifecycle(); + } + + // ------------------------------------------------------------------ + // Legacy lifecycle (OA_ADAPTER_RECONNECT=0) — verbatim pre-patch run() + // ------------------------------------------------------------------ + + async _runLegacyLifecycle() { this._running = true; // Announce agent to workspace @@ -136,8 +197,283 @@ class BaseAdapter { } } + // ------------------------------------------------------------------ + // Supervised lifecycle (default) — join-with-retry + heartbeat self-heal + // ------------------------------------------------------------------ + + async _runSupervisedLifecycle() { + this._running = true; + this._firstConnect = true; + try { + while (this._running) { + this._connState = this._firstConnect ? 'connecting' : 'reconnecting'; + const outcome = await this._joinWithRetry(); // 'ok' | 'terminal' | 'stopped' + if (!this._running) break; + if (outcome !== 'ok') { this._connState = 'disconnected'; break; } + + const gen = this._activeGen; // set by _joinWithRetry on success + this._log(`event=joined agent=${this.agentName}${this._sessionId ? ` session=${this._sessionId.slice(0, 8)}` : ''} reconnect=${!this._firstConnect}`); + + const reason = await this._runConnectedSession(gen); // 'reconnect' | 'terminal' | 'stopped' + this._teardownLocal(gen); + this._firstConnect = false; + + if (reason === 'reconnect') { + this._reconnectInProgress = false; + this._reconnectPending = false; + continue; + } + break; // 'terminal' or 'stopped' + } + } finally { + this._running = false; + this._activeGen = null; + this._wakeAll(); + this._wakeControlPoller(); + // NOTE (V1): the supervised path intentionally does NOT call + // client.disconnect() here. POST /v1/leave marks the member offline by + // agent_name WITHOUT validating session_id, so calling it on any + // automatic/terminal exit can knock a newer session (that already took + // over this agent name) offline. Going-away presence is instead handled + // by the server's heartbeat AGENT_TIMEOUT. See design §7. + } + } + + /** + * Join the workspace, retrying transient failures with backoff. A join only + * counts as success when the server returns a non-empty string session_id; + * on success the new active session generation is activated. Returns: + * 'ok' — joined, _activeGen set, _sessionId valid + * 'terminal' — auth failure / ambiguous-cap reached (_terminalReason set) + * 'stopped' — _running cleared or attempt superseded (stop/Disconnect) + */ + async _joinWithRetry() { + const attempt = ++this._attemptSeq; + let backoffN = 0; + let ambiguous = 0; + + while (this._running && attempt === this._attemptSeq) { + let res = null, err = null; + try { + res = await this.client.joinNetwork(this.agentName, this.token, { + network: this.workspaceId, + agentType: this.agentType || 'agent', + serverHost: require('os').hostname(), + workingDir: this.workingDir || defaultAgentWorkdir(this.agentName), + }); + } catch (e) { + err = e; + } + // Superseded (stop / newer attempt) while the request was in flight. + if (!this._running || attempt !== this._attemptSeq) return 'stopped'; + + if (!err) { + const sid = res && res.session_id; + if (typeof sid === 'string' && sid.length > 0) { + this._sessionId = sid; + this._lastLivenessOkAt = this._now(); + this._consecutiveHeartbeatFailures = 0; + this._terminalReason = null; + this._connState = 'connected'; + this._activeGen = attempt; // ★ activate new session + return 'ok'; + } + // 2xx but no usable session_id — anomalous; bounded retry. + ambiguous++; + this._log(`event=join_no_session agent=${this.agentName} attempt=${ambiguous}`); + if (ambiguous >= JOIN_MAX_ATTEMPTS_AMBIGUOUS) { + this._terminalReason = 'join_failed'; + this._connState = 'disconnected'; + return 'terminal'; + } + } else { + const cls = this._classifyError(err); + if (cls === 'terminal' || cls === 'revoked') { + this._terminalReason = cls === 'revoked' ? 'session_revoked' : 'auth'; + this._connState = 'disconnected'; + this._log(`event=join_terminal agent=${this.agentName} reason=${this._errLabel(err)}`); + return 'terminal'; + } + if (cls === 'ambiguous') { + ambiguous++; + if (ambiguous >= JOIN_MAX_ATTEMPTS_AMBIGUOUS) { + this._terminalReason = 'join_failed'; + this._connState = 'disconnected'; + this._log(`event=join_giveup agent=${this.agentName} reason=${this._errLabel(err)}`); + return 'terminal'; + } + } + this._log(`event=join_failed agent=${this.agentName} attempt=${backoffN + 1} err=${this._errLabel(err)}`); + } + + await this._interruptibleSleep(this._backoff(backoffN++)); + } + return 'stopped'; + } + + /** + * Run one connected session: skill sync, cursor setup, heartbeat loop, + * control poller, and the message poll loop. Returns when the session must + * end: 'reconnect' (heartbeat lapse), 'terminal' (revoked/auth), or + * 'stopped' (_running cleared). Does NOT perform any remote leave. + */ + async _runConnectedSession(gen) { + // Skill sync (best-effort) — only meaningful right after a join. + await this._syncSkillsFromWorkspace(); + // Skip stale control events each session so /restart etc. don't replay. + await this._skipExistingControlEvents(); + // Jump the message cursor to head ONLY on the first connect. On reconnect + // we resume from _lastEventId so messages queued during the outage are + // still delivered (and _processedIds guards against double-dispatch). + if (this._firstConnect) { + try { await this._skipExistingEvents(); } catch (e) { + this._log(`skip existing events failed (non-fatal): ${this._errLabel(e)}`); + } + } + + const heartbeatLoop = this._heartbeatLoop(gen); + const controlPoller = this._controlPollerLoop(); + this._log('Starting poll loop...'); + try { + await this._pollLoop(gen); + } finally { + // Tear down this session's background loops locally (no remote leave). + this._wakeAll(); + this._wakeControlPoller(); + try { await heartbeatLoop; } catch {} + try { await controlPoller; } catch {} + } + + if (this._terminalReason) return 'terminal'; + if (this._reconnectInProgress) return 'reconnect'; + return 'stopped'; + } + + /** Best-effort sync of workspace-managed skills (mirrors legacy block). */ + async _syncSkillsFromWorkspace() { + try { + const agents = await Promise.race([ + this.client.getAgents(this.workspaceId, this.token), + new Promise((_, reject) => setTimeout(() => reject(new Error('skill sync timed out (10s)')), 10000)), + ]); + const self = agents.find((a) => a.agentName === this.agentName); + if (self && self.enabledSkills) { + const { skillsToDisabledModules } = require('../skill-catalog'); + this.disabledModules = skillsToDisabledModules(self.enabledSkills); + this._log(`Synced skills from workspace: disabled=[${[...this.disabledModules].join(',')}]`); + } + } catch (e) { + this._log(`Warning: skill sync failed (non-fatal): ${e.message}`); + } + } + + // ------------------------------------------------------------------ + // Heartbeat — single-flight await loop (no overlapping requests) + // ------------------------------------------------------------------ + + async _heartbeatLoop(gen) { + // Initial heartbeat right after join. + await this._heartbeatOnce(gen); + while (this._running && gen === this._activeGen) { + await this._interruptibleSleep(HEARTBEAT_INTERVAL_MS); + if (!this._running || gen !== this._activeGen) break; + await this._heartbeatOnce(gen); // awaited → never overlaps with the next + } + } + + async _heartbeatOnce(gen) { + let err = null; + try { + await this.client.heartbeat(this.workspaceId, this.agentName, this.token, this._sessionId); + } catch (e) { + err = e; + } + // Stale callback from a superseded session — must not touch new state. + if (gen !== this._activeGen) return; + + if (!err) { + this._lastLivenessOkAt = this._now(); + this._consecutiveHeartbeatFailures = 0; + if (this._connState !== 'connected') this._connState = 'connected'; // soft recovery + if (this._reconnectPending) { + this._reconnectPending = false; + this._log(`event=reconnect_pending_cleared agent=${this.agentName}`); + } + return; + } + + const cls = this._classifyError(err); + if (cls === 'revoked') { this._onSessionRevoked(); return; } + if (cls === 'terminal') { + // 401/403 → token is bad; reconnect won't help. Terminal immediately, + // without waiting out the 90s freshness window. + this._terminalReason = 'auth'; + this._connState = 'disconnected'; + this._log(`event=heartbeat_auth_terminal agent=${this.agentName} reason=${this._errLabel(err)}`); + this._activeGen = null; + this._running = false; + this._wakeAll(); + return; + } + // retryable / ambiguous / unknown → count and evaluate reconnect. + this._consecutiveHeartbeatFailures++; + this._log(`event=heartbeat_failed agent=${this.agentName} consecutive=${this._consecutiveHeartbeatFailures} err=${this._errLabel(err)}`); + this._maybeTriggerReconnect(); + } + + /** + * Decide whether a heartbeat lapse warrants reconnect. Triggers only when + * BOTH the freshness window AND the consecutive-failure threshold are met. + * If a task is in flight, the reconnect is DEFERRED (_reconnectPending) and + * re-evaluated on the next heartbeat — the active session is NOT torn down. + */ + _maybeTriggerReconnect() { + const lapsed = this._now() - this._lastLivenessOkAt; + const meets = lapsed >= RECONNECT_AFTER_MS && + this._consecutiveHeartbeatFailures >= RECONNECT_FAIL_THRESHOLD; + if (!meets) return; + + if (this._hasActiveWork()) { + if (!this._reconnectPending) { + this._reconnectPending = true; + this._log(`event=reconnect_deferred_active_work agent=${this.agentName} lapsed_ms=${lapsed}`); + } + return; // keep current session + heartbeat alive; do not invalidate gen + } + this._beginReconnect(); + } + + /** + * Formally begin reconnect: invalidate the active session IMMEDIATELY (before + * a new join) so any in-flight heartbeat/poll/sleep from the old generation + * becomes a no-op, then wake the loops so they exit and the supervisor re-joins. + */ + _beginReconnect() { + if (this._activeGen === null) return; // already reconnecting / not live + const lapsed = this._now() - this._lastLivenessOkAt; + this._log(`event=reconnect_begin agent=${this.agentName} lapsed_ms=${lapsed}`); + this._activeGen = null; // ★ invalidate old session NOW + this._reconnectInProgress = true; + this._reconnectPending = false; + this._connState = 'reconnecting'; + this._wakeAll(); + } + + /** Local-only teardown of a finished session's loops. No remote leave. */ + _teardownLocal(_gen) { + this._wakeAll(); + this._wakeControlPoller(); + } + stop() { this._running = false; + // Invalidate any live session and wake every interruptible sleep so the + // supervised loops (join backoff, heartbeat, poll) exit promptly instead + // of waiting out their timers. Harmless in legacy mode (_waiters empty, + // _activeGen unused). + this._activeGen = null; + this._wakeAll(); + this._wakeControlPoller(); } // ------------------------------------------------------------------ @@ -492,11 +828,15 @@ class BaseAdapter { // Poll loop // ------------------------------------------------------------------ - async _pollLoop() { + async _pollLoop(gen) { + // Legacy (gen === undefined) keeps the non-interruptible _sleep so its + // timing is byte-identical to pre-patch. Supervised uses interruptible + // sleeps so reconnect/stop can wake the loop immediately. + const sleep = (ms) => (gen === undefined ? this._sleep(ms) : this._interruptibleSleep(ms)); let idleCount = 0; let pollCount = 0; - while (this._running) { + while (this._running && (gen === undefined || gen === this._activeGen)) { pollCount++; let messages, rawCursor, composingActive = false; try { @@ -512,12 +852,18 @@ class BaseAdapter { } } catch (e) { this._log(`Poll #${pollCount} failed: ${e.message} \nStack: ${e.stack}`); - await this._sleep(5000); + // Poll failures never trigger reconnect (only heartbeat freshness does); + // keep the original retry-after-5s behavior. + await sleep(5000); continue; } if (rawCursor) this._lastEventId = rawCursor; + // Discard results from a session that was invalidated while this poll + // was in flight (reconnect began): do NOT dispatch into the new session. + if (gen !== undefined && gen !== this._activeGen) break; + // Deduplicate const incoming = []; for (const msg of messages) { @@ -597,7 +943,7 @@ class BaseAdapter { } else { delay = Math.min(WARM_INTERVAL + (idleCount - WARM_POLLS) * 1000, 15000); } - await this._sleep(delay); + await sleep(delay); } } @@ -822,7 +1168,14 @@ class BaseAdapter { _onSessionRevoked() { this._log(`SESSION REVOKED: another client joined as '${this.agentName}'. Stopping adapter.`); + // Terminal: a newer client now owns this agent name. Never auto-reconnect + // (that would fight the new session). In legacy mode the extra fields are + // unused; only _running=false matters. + this._terminalReason = 'session_revoked'; + this._connState = 'session_revoked'; + this._activeGen = null; this._running = false; + this._wakeAll(); } // ------------------------------------------------------------------ @@ -845,6 +1198,80 @@ class BaseAdapter { return new Promise((resolve) => setTimeout(resolve, ms)); } + /** Current time in ms (injectable for deterministic tests). */ + _now() { + return this._clock ? this._clock() : Date.now(); + } + + /** + * Sleep that can be woken early by _wakeAll() (called on stop / reconnect / + * teardown). Every pending sleep registers its own resolver in _waiters, so + * concurrent sleepers (join backoff + heartbeat + poll pacing) are ALL woken + * — no single-callback overwrite. Each waiter removes itself on settle. + */ + _interruptibleSleep(ms) { + return new Promise((resolve) => { + let settled = false; + const finish = () => { + if (settled) return; + settled = true; + clearTimeout(timer); + this._waiters.delete(finish); + resolve(); + }; + const timer = setTimeout(finish, ms); + this._waiters.add(finish); + if (!this._running) finish(); // already stopping → don't wait + }); + } + + /** Wake every pending interruptible sleep. Each waiter self-removes. */ + _wakeAll() { + for (const wake of [...this._waiters]) { + try { wake(); } catch { /* ignore */ } + } + } + + /** Join backoff: 2s→4s→…→30s cap, with ±20% jitter (injectable for tests). */ + _backoff(n) { + const base = Math.min(JOIN_BACKOFF_MAX_MS, JOIN_BACKOFF_BASE_MS * Math.pow(2, n)); + const rand = this._jitter ? this._jitter() : (Math.random() * 2 - 1); // [-1, 1) + return Math.max(0, Math.round(base + base * 0.2 * rand)); + } + + /** + * Classify a transport error using STRUCTURED fields only (instanceof + + * statusCode + code) — never message-string matching: + * 'revoked' → SessionRevokedError (terminal, no auto-reconnect) + * 'terminal' → 401/403 (bad credentials; retrying is pointless) + * 'ambiguous' → 404 / unknown (bounded retry, then terminal) + * 'retryable' → 5xx / network errors (timeout, conn refused, DNS, reset) + */ + _classifyError(err) { + if (!err) return 'retryable'; + if (err instanceof SessionRevokedError || err.code === 'session_revoked') return 'revoked'; + const sc = err.statusCode; + if (sc === 401 || sc === 403) return 'terminal'; + if (sc === 404) return 'ambiguous'; + if (typeof sc === 'number' && sc >= 500) return 'retryable'; + const code = err.code; + if (code && ['ETIMEDOUT', 'ECONNREFUSED', 'ENOTFOUND', 'ECONNRESET', 'EAI_AGAIN'].includes(code)) { + return 'retryable'; + } + // No structured signal (e.g. "Invalid response: ..." parse failures, or an + // unexpected 4xx) → bounded ambiguous so we never spin forever. + return 'ambiguous'; + } + + /** Compact, secret-free error label for logs (no message/URL/token leak). */ + _errLabel(err) { + if (!err) return 'unknown'; + if (err instanceof SessionRevokedError || err.code === 'session_revoked') return 'session_revoked'; + if (typeof err.statusCode === 'number') return `http_${err.statusCode}`; + if (err.code) return String(err.code); + return 'error'; + } + /** * Return whether the workspace has the Browser Fabric viewer toggle on. * Cached for the lifetime of the adapter — restart to pick up a flip. diff --git a/packages/agent-connector/src/workspace-client.js b/packages/agent-connector/src/workspace-client.js index 9a08bd14d..5739efbc1 100644 --- a/packages/agent-connector/src/workspace-client.js +++ b/packages/agent-connector/src/workspace-client.js @@ -18,6 +18,29 @@ class SessionRevokedError extends Error { } } +/** + * Build a transport error for an HTTP >= 400 response. The message text is + * left byte-for-byte identical to the legacy `new Error(message)`; this only + * ATTACHES a structured `statusCode` so callers can classify by status code + * instead of parsing message strings. No change to return shape or behavior. + */ +function httpError(message, statusCode) { + const e = new Error(message); + if (typeof statusCode === 'number') e.statusCode = statusCode; + return e; +} + +/** + * Build a request-timeout error. Message text identical to the legacy + * `new Error('Request timed out')`; only ATTACHES `code = 'ETIMEDOUT'` so + * callers can classify it as a retryable network error. + */ +function timeoutError() { + const e = new Error('Request timed out'); + e.code = 'ETIMEDOUT'; + return e; +} + /** * HTTP client for workspace API operations. * @@ -809,7 +832,7 @@ class WorkspaceClient { try { const parsed = JSON.parse(data); if (res.statusCode >= 400) { - reject(new Error(parsed.message || `HTTP ${res.statusCode}`)); + reject(httpError(parsed.message || `HTTP ${res.statusCode}`, res.statusCode)); } else { resolve(parsed); } @@ -820,7 +843,7 @@ class WorkspaceClient { }); req.on('error', reject); - req.on('timeout', () => { req.destroy(); reject(new Error('Request timed out')); }); + req.on('timeout', () => { req.destroy(); reject(timeoutError()); }); req.end(); }); } @@ -842,7 +865,7 @@ class WorkspaceClient { res.on('end', () => { const buf = Buffer.concat(chunks); if (res.statusCode >= 400) { - reject(new Error(`HTTP ${res.statusCode}: ${buf.toString('utf-8').slice(0, 200)}`)); + reject(httpError(`HTTP ${res.statusCode}: ${buf.toString('utf-8').slice(0, 200)}`, res.statusCode)); } else { resolve(buf); } @@ -850,7 +873,7 @@ class WorkspaceClient { }); req.on('error', reject); - req.on('timeout', () => { req.destroy(); reject(new Error('Request timed out')); }); + req.on('timeout', () => { req.destroy(); reject(timeoutError()); }); req.end(); }); } @@ -879,7 +902,7 @@ class WorkspaceClient { if (typeof msg === 'string' && msg.toLowerCase().includes('session_revoked')) { reject(new SessionRevokedError(msg)); } else { - reject(new Error(msg)); + reject(httpError(msg, res.statusCode)); } } else { resolve(parsed); @@ -891,7 +914,7 @@ class WorkspaceClient { }); req.on('error', reject); - req.on('timeout', () => { req.destroy(); reject(new Error('Request timed out')); }); + req.on('timeout', () => { req.destroy(); reject(timeoutError()); }); req.write(jsonBody); req.end(); }); @@ -921,7 +944,7 @@ class WorkspaceClient { if (typeof msg === 'string' && msg.toLowerCase().includes('session_revoked')) { reject(new SessionRevokedError(msg)); } else { - reject(new Error(msg)); + reject(httpError(msg, res.statusCode)); } } else { resolve(parsed); @@ -933,7 +956,7 @@ class WorkspaceClient { }); req.on('error', reject); - req.on('timeout', () => { req.destroy(); reject(new Error('Request timed out')); }); + req.on('timeout', () => { req.destroy(); reject(timeoutError()); }); req.write(jsonBody); req.end(); }); @@ -959,7 +982,7 @@ class WorkspaceClient { try { const parsed = JSON.parse(data); if (res.statusCode >= 400) { - reject(new Error(parsed.message || `HTTP ${res.statusCode}`)); + reject(httpError(parsed.message || `HTTP ${res.statusCode}`, res.statusCode)); } else { resolve(parsed); } @@ -970,7 +993,7 @@ class WorkspaceClient { }); req.on('error', reject); - req.on('timeout', () => { req.destroy(); reject(new Error('Request timed out')); }); + req.on('timeout', () => { req.destroy(); reject(timeoutError()); }); req.write(jsonBody); req.end(); }); @@ -994,7 +1017,7 @@ class WorkspaceClient { try { const parsed = JSON.parse(data); if (res.statusCode >= 400) { - reject(new Error(parsed.message || `HTTP ${res.statusCode}`)); + reject(httpError(parsed.message || `HTTP ${res.statusCode}`, res.statusCode)); } else { resolve(parsed); } @@ -1005,7 +1028,7 @@ class WorkspaceClient { }); req.on('error', reject); - req.on('timeout', () => { req.destroy(); reject(new Error('Request timed out')); }); + req.on('timeout', () => { req.destroy(); reject(timeoutError()); }); req.end(); }); } diff --git a/packages/agent-connector/test/adapter-reconnect.test.js b/packages/agent-connector/test/adapter-reconnect.test.js new file mode 100644 index 000000000..faa5b3ea2 --- /dev/null +++ b/packages/agent-connector/test/adapter-reconnect.test.js @@ -0,0 +1,728 @@ +'use strict'; + +/** + * Deterministic tests for the BaseAdapter connection self-healing (V1). + * + * No real waiting: time is injected via adapter._clock, backoff jitter via + * adapter._jitter, and sleeps are replaced by a controllable gate that only + * resolves when the test fires them (or when _wakeAll runs on stop/reconnect). + * Heartbeat/join/poll outcomes are scripted through a mock WorkspaceClient. + * + * The final block is a protocol-level test against a real in-process HTTP + * server using the real WorkspaceClient, exercising in-flight-task → reconnect + * → completion post with server-side session validation. + */ + +const { test } = require('node:test'); +const assert = require('node:assert'); +const http = require('http'); + +const BaseAdapter = require('../src/adapters/base.js'); +const { WorkspaceClient, SessionRevokedError } = require('../src/workspace-client.js'); + +const flush = async (n = 5) => { for (let i = 0; i < n; i++) await new Promise((r) => setImmediate(r)); }; + +function deferred() { + let resolve, reject; + const promise = new Promise((res, rej) => { resolve = res; reject = rej; }); + return { promise, resolve, reject }; +} + +function httpErr(statusCode, message = `HTTP ${statusCode}`) { + const e = new Error(message); + e.statusCode = statusCode; + return e; +} +function netErr(code) { + const e = new Error(code); + e.code = code; + return e; +} + +class TestAdapter extends BaseAdapter { + constructor(opts) { + super(opts); + this.dispatched = []; + } + async _handleMessage(msg) { + this.dispatched.push(msg.id || msg.messageId); + } +} + +/** + * Build an adapter with all external I/O mocked and all time/sleeps under + * test control. Returns helpers to drive the supervised lifecycle step by step. + */ +function makeHarness({ pollPending } = {}) { + const h = { + now: 0, + joinQueue: [], // each: deferred-like {result} or {error} + hbQueue: [], // pending heartbeat deferreds (FIFO) + sleeps: new Set(), // active interruptible sleeps (finish fns) + disconnectCalls: 0, + joinCalls: 0, + hbCalls: 0, + }; + + const client = { + async joinNetwork() { + h.joinCalls++; + const next = h.joinQueue.shift(); + if (!next) throw netErr('ETIMEDOUT'); // default: transient + if (next.error) throw next.error; + return next.result; + }, + heartbeat() { + h.hbCalls++; + const d = deferred(); + h.hbQueue.push(d); + return d.promise; + }, + async pollPending(...args) { + if (pollPending) return pollPending(...args); + return { messages: [], cursor: null, composing: false }; + }, + async pollControl() { return []; }, + async pollToolResults() { return { events: [], cursor: null }; }, + async getHeadEventId() { return null; }, + async getAgents() { return []; }, + async disconnect() { h.disconnectCalls++; }, + async sendMessage() { return {}; }, + }; + + const adapter = new TestAdapter({ + workspaceId: 'ws-1', channelName: 'general', token: 'tok', + agentName: 'claude-1', endpoint: 'http://mock', agentType: 'claude', + }); + adapter.client = client; + adapter._clock = () => h.now; + adapter._jitter = () => 0; // deterministic backoff + adapter._syncSkillsFromWorkspace = async () => {}; // avoid skill-sync timer + adapter._controlPollerLoop = async () => {}; // not under test + // Controllable sleep: registers a waiter (so real _wakeAll wakes it) but does + // NOT auto-fire on a timer — the test advances it explicitly via fireSleeps(). + adapter._interruptibleSleep = function (_ms) { + return new Promise((resolve) => { + let settled = false; + const finish = () => { + if (settled) return; + settled = true; + this._waiters.delete(finish); + h.sleeps.delete(finish); + resolve(); + }; + this._waiters.add(finish); + h.sleeps.add(finish); + if (!this._running) finish(); + }); + }; + + h.client = client; + h.adapter = adapter; + h.queueJoinSuccess = (sid) => h.joinQueue.push({ result: { session_id: sid } }); + h.queueJoinResult = (result) => h.joinQueue.push({ result }); + h.queueJoinError = (err) => h.joinQueue.push({ error: err }); + h.fireSleeps = () => { for (const f of [...h.sleeps]) f(); }; + h.settleHeartbeat = (mode, err) => { + const d = h.hbQueue.shift(); + if (!d) return false; + if (mode === 'ok') d.resolve({ status: 'online' }); else d.reject(err); + return true; + }; + h.start = () => { h.runPromise = adapter.run(); return h.runPromise; }; + h.flush = flush; + return h; +} + +// --------------------------------------------------------------------------- +// 1. First join succeeds → connected, active generation set. +// --------------------------------------------------------------------------- +test('first join succeeds → connected with active generation', async () => { + const h = makeHarness(); + h.queueJoinSuccess('s1'); + h.start(); + await h.flush(); + assert.strictEqual(h.adapter._sessionId, 's1'); + assert.strictEqual(h.adapter._connState, 'connected'); + assert.notStrictEqual(h.adapter._activeGen, null); + assert.strictEqual(h.joinCalls, 1); + h.adapter.stop(); + h.settleHeartbeat('ok'); + h.fireSleeps(); + await h.runPromise; +}); + +// --------------------------------------------------------------------------- +// 2. Join returns 2xx but no session_id → not connected; ambiguous retry, +// then terminal after JOIN_MAX_ATTEMPTS_AMBIGUOUS. +// --------------------------------------------------------------------------- +test('join without session_id → never connects, becomes terminal after cap', async () => { + const h = makeHarness(); + for (let i = 0; i < 6; i++) h.queueJoinResult({ session_id: '' }); // empty → ambiguous + h.start(); + // Step through ambiguous retries by firing the backoff sleeps. + for (let i = 0; i < 8; i++) { await h.flush(); h.fireSleeps(); } + await h.runPromise; // terminal → run() returns + assert.strictEqual(h.adapter._activeGen, null); + assert.strictEqual(h.adapter._connState, 'disconnected'); + assert.strictEqual(h.adapter._terminalReason, 'join_failed'); + assert.strictEqual(h.joinCalls, 5, 'should stop at JOIN_MAX_ATTEMPTS_AMBIGUOUS'); +}); + +// --------------------------------------------------------------------------- +// 3. Join fails transiently then succeeds (workspace late / network blip). +// --------------------------------------------------------------------------- +test('join retries transient failures then connects', async () => { + const h = makeHarness(); + h.queueJoinError(netErr('ECONNREFUSED')); + h.queueJoinError(httpErr(503)); + h.queueJoinSuccess('s9'); + h.start(); + for (let i = 0; i < 4; i++) { await h.flush(); h.fireSleeps(); } + await h.flush(); + assert.strictEqual(h.adapter._connState, 'connected'); + assert.strictEqual(h.adapter._sessionId, 's9'); + assert.strictEqual(h.joinCalls, 3); + h.adapter.stop(); h.settleHeartbeat('ok'); h.fireSleeps(); await h.runPromise; +}); + +// --------------------------------------------------------------------------- +// 4. Join auth failure (401) → immediate terminal, no retry. +// --------------------------------------------------------------------------- +test('join 401 → terminal immediately, no retry', async () => { + const h = makeHarness(); + h.queueJoinError(httpErr(401, 'Invalid network token')); + h.start(); + await h.runPromise; + assert.strictEqual(h.joinCalls, 1); + assert.strictEqual(h.adapter._terminalReason, 'auth'); + assert.strictEqual(h.adapter._connState, 'disconnected'); +}); + +// --------------------------------------------------------------------------- +// 5. Heartbeat single-flight: a slow heartbeat never overlaps the next. +// --------------------------------------------------------------------------- +test('heartbeat is single-flight (no overlap while in flight)', async () => { + const h = makeHarness(); + h.queueJoinSuccess('s1'); + h.start(); + await h.flush(); + assert.strictEqual(h.hbCalls, 1, 'initial heartbeat fired'); + // Do NOT settle the first heartbeat; fire sleeps repeatedly. + for (let i = 0; i < 5; i++) { h.fireSleeps(); await h.flush(); } + assert.strictEqual(h.hbCalls, 1, 'no second heartbeat while first is in flight'); + // Settle it → loop sleeps → fire → second heartbeat. + h.settleHeartbeat('ok'); + await h.flush(); h.fireSleeps(); await h.flush(); + assert.strictEqual(h.hbCalls, 2); + h.adapter.stop(); h.settleHeartbeat('ok'); h.fireSleeps(); await h.runPromise; +}); + +// --------------------------------------------------------------------------- +// 6. AND reconnect judgement: neither / only-time / only-count → no reconnect; +// both → reconnect (when idle). +// --------------------------------------------------------------------------- +test('reconnect requires BOTH freshness window AND failure threshold (AND, not OR)', async () => { + const h = makeHarness(); + h.queueJoinSuccess('s1'); // attempt 1 + h.queueJoinSuccess('s2'); // attempt 2 (after reconnect) + h.start(); + await h.flush(); + const gen1 = h.adapter._activeGen; + + // Only count, time still fresh (now small): 3 failures, lapsed ~0 → no reconnect. + h.now = 0; + for (let i = 0; i < 3; i++) { + h.settleHeartbeat('fail', netErr('ETIMEDOUT')); + await h.flush(); h.fireSleeps(); await h.flush(); + } + assert.strictEqual(h.adapter._consecutiveHeartbeatFailures >= 3, true); + assert.strictEqual(h.adapter._activeGen, gen1, 'count-only must NOT reconnect'); + + // Only time: jump clock past window but reset failures via a success first. + h.settleHeartbeat('ok'); await h.flush(); h.fireSleeps(); await h.flush(); + assert.strictEqual(h.adapter._consecutiveHeartbeatFailures, 0); + h.now = 200000; // far past window, but only 1 upcoming failure + h.settleHeartbeat('fail', netErr('ETIMEDOUT')); + await h.flush(); h.fireSleeps(); await h.flush(); + assert.strictEqual(h.adapter._activeGen, gen1, 'time-only (1 failure) must NOT reconnect'); + + // Both: keep failing past threshold with clock past window → reconnect. + // (The reconnect + re-join to s2 completes within a flush, so assert the + // observable OUTCOME — a re-join happened — rather than the transient null.) + let guard = 0; + while (h.adapter._activeGen === gen1 && guard++ < 6) { + h.settleHeartbeat('fail', netErr('ETIMEDOUT')); + await h.flush(); h.fireSleeps(); await h.flush(); + } + assert.strictEqual(h.joinCalls, 2, 'both conditions → re-join occurred'); + assert.strictEqual(h.adapter._sessionId, 's2'); + assert.notStrictEqual(h.adapter._activeGen, gen1, 'new active generation after reconnect'); + h.adapter.stop(); h.settleHeartbeat('ok'); h.fireSleeps(); await h.runPromise; +}); + +// --------------------------------------------------------------------------- +// 7. Transient failures that recover before the window → soft recovery, no re-join. +// --------------------------------------------------------------------------- +test('transient failures recovering before window → soft recovery, no re-join', async () => { + const h = makeHarness(); + h.queueJoinSuccess('s1'); + h.start(); + await h.flush(); + const gen1 = h.adapter._activeGen; + h.now = 1000; + h.settleHeartbeat('fail', httpErr(500)); await h.flush(); h.fireSleeps(); await h.flush(); + h.settleHeartbeat('fail', httpErr(500)); await h.flush(); h.fireSleeps(); await h.flush(); + h.now = 2000; + h.settleHeartbeat('ok'); await h.flush(); + assert.strictEqual(h.adapter._consecutiveHeartbeatFailures, 0); + assert.strictEqual(h.adapter._activeGen, gen1, 'must not re-join'); + assert.strictEqual(h.joinCalls, 1); + assert.strictEqual(h.adapter._connState, 'connected'); + h.adapter.stop(); h.fireSleeps(); h.settleHeartbeat('ok'); h.fireSleeps(); await h.runPromise; +}); + +// --------------------------------------------------------------------------- +// 8. Heartbeat 401/403 → immediate terminal disconnected (no 90s wait). +// --------------------------------------------------------------------------- +test('heartbeat 403 → terminal immediately, no reconnect', async () => { + const h = makeHarness(); + h.queueJoinSuccess('s1'); + h.start(); + await h.flush(); + h.now = 1000; // fresh — proves we do NOT wait for the 90s window + h.settleHeartbeat('fail', httpErr(403, 'forbidden')); + await h.flush(); + assert.strictEqual(h.adapter._terminalReason, 'auth'); + assert.strictEqual(h.adapter._activeGen, null); + h.fireSleeps(); + await h.runPromise; // run() exits terminal + assert.strictEqual(h.joinCalls, 1, 'no auto re-join after auth-terminal'); +}); + +// --------------------------------------------------------------------------- +// 9. SessionRevokedError on heartbeat → terminal, never auto-reconnect. +// --------------------------------------------------------------------------- +test('heartbeat session_revoked → terminal, no auto re-join', async () => { + const h = makeHarness(); + h.queueJoinSuccess('s1'); + h.start(); + await h.flush(); + h.settleHeartbeat('fail', new SessionRevokedError('session_revoked')); + await h.flush(); + assert.strictEqual(h.adapter._terminalReason, 'session_revoked'); + assert.strictEqual(h.adapter._connState, 'session_revoked'); + assert.strictEqual(h.adapter._activeGen, null); + h.fireSleeps(); + await h.runPromise; + assert.strictEqual(h.joinCalls, 1, 'no re-join after session_revoked'); +}); + +// --------------------------------------------------------------------------- +// 10. stop() during join-retry backoff → wakes immediately, no further join. +// --------------------------------------------------------------------------- +test('stop during join backoff → exits immediately, no further join', async () => { + const h = makeHarness(); + h.queueJoinError(netErr('ECONNREFUSED')); + h.queueJoinError(netErr('ECONNREFUSED')); + h.queueJoinSuccess('sX'); // would be used if it kept retrying + h.start(); + await h.flush(); // first join fails → now sleeping in backoff + assert.strictEqual(h.joinCalls, 1); + assert.strictEqual(h.sleeps.size >= 1, true, 'backoff sleep registered'); + h.adapter.stop(); // wakes the backoff sleep via _wakeAll + await h.runPromise; + assert.strictEqual(h.joinCalls, 1, 'must not attempt another join after stop'); +}); + +// --------------------------------------------------------------------------- +// 11. Multi-waiter: stop wakes heartbeat-sleep AND poll-sleep AND backoff +// simultaneously (Set-based, no single-callback overwrite). +// --------------------------------------------------------------------------- +test('stop wakes all concurrent interruptible sleeps', async () => { + const h = makeHarness(); + h.queueJoinSuccess('s1'); + h.start(); + await h.flush(); + // Settle initial heartbeat so the hb loop proceeds to its 30s sleep; poll + // loop is already at its pacing sleep. Now multiple sleeps are registered. + h.settleHeartbeat('ok'); + await h.flush(); + assert.strictEqual(h.sleeps.size >= 2, true, 'heartbeat + poll sleeps both registered'); + const before = h.sleeps.size; + h.adapter.stop(); // real _wakeAll + await h.flush(); + assert.strictEqual(h.sleeps.size, 0, `all ${before} waiters woken and removed`); + await h.runPromise; +}); + +// --------------------------------------------------------------------------- +// 12. Stale heartbeat success after reconnect must NOT overwrite new state. +// --------------------------------------------------------------------------- +test('stale heartbeat result is ignored after generation changes', async () => { + const h = makeHarness(); + h.queueJoinSuccess('s1'); + h.queueJoinSuccess('s2'); + h.start(); + await h.flush(); + const gen1 = h.adapter._activeGen; + // Force a reconnect: both conditions met. + h.now = 200000; + for (let i = 0; i < 3; i++) { + h.settleHeartbeat('fail', netErr('ETIMEDOUT')); + await h.flush(); h.fireSleeps(); await h.flush(); + } + // gen1 invalidated; an OLD heartbeat from gen1 now resolves late: + const stillPending = h.hbQueue.length; + // The reconnect already began; activeGen should be null or s2's gen. + h.fireSleeps(); await h.flush(); // let re-join (s2) complete + const gen2 = h.adapter._activeGen; + assert.notStrictEqual(gen2, gen1); + assert.strictEqual(h.adapter._sessionId, 's2'); + // Any leftover gen1 heartbeat resolving now must not change consecutive count + // or session for gen2. + const failBefore = h.adapter._consecutiveHeartbeatFailures; + for (let i = 0; i < stillPending; i++) h.settleHeartbeat('ok'); + await h.flush(); + assert.strictEqual(h.adapter._sessionId, 's2', 'stale hb must not change session'); + assert.strictEqual(h.adapter._activeGen, gen2, 'stale hb must not change active gen'); + assert.strictEqual(h.adapter._consecutiveHeartbeatFailures, failBefore); + h.adapter.stop(); h.settleHeartbeat('ok'); h.fireSleeps(); await h.runPromise; +}); + +// --------------------------------------------------------------------------- +// 13. Active-task DEFER: threshold met while a task is in flight → only +// _reconnectPending; activeGen NOT invalidated; heartbeat/poll not stopped. +// Then: heartbeat recovers → pending cleared, no re-join. +// --------------------------------------------------------------------------- +test('reconnect deferred while task active; cleared on heartbeat recovery', async () => { + const h = makeHarness(); + h.queueJoinSuccess('s1'); + h.start(); + await h.flush(); + const gen1 = h.adapter._activeGen; + // Simulate an active task. + h.adapter._channelBusy.add('general'); + assert.strictEqual(h.adapter._hasActiveWork(), true); + + h.now = 200000; + for (let i = 0; i < 3; i++) { + h.settleHeartbeat('fail', netErr('ETIMEDOUT')); + await h.flush(); h.fireSleeps(); await h.flush(); + } + assert.strictEqual(h.adapter._reconnectPending, true, 'threshold met → pending'); + assert.strictEqual(h.adapter._reconnectInProgress, false); + assert.strictEqual(h.adapter._activeGen, gen1, 'activeGen NOT invalidated while task active'); + assert.strictEqual(h.joinCalls, 1, 'no re-join while deferred'); + + // Heartbeat recovers before the task ends → pending cleared, no re-join. + h.settleHeartbeat('ok'); await h.flush(); + assert.strictEqual(h.adapter._reconnectPending, false, 'pending cleared on recovery'); + assert.strictEqual(h.adapter._activeGen, gen1); + assert.strictEqual(h.joinCalls, 1); + h.adapter._channelBusy.delete('general'); + h.adapter.stop(); h.fireSleeps(); h.settleHeartbeat('ok'); h.fireSleeps(); await h.runPromise; +}); + +// --------------------------------------------------------------------------- +// 14. Active-task DEFER then task ends with heartbeat still failing → real +// reconnect begins only after work drains. +// --------------------------------------------------------------------------- +test('deferred reconnect proceeds only after active task ends', async () => { + const h = makeHarness(); + h.queueJoinSuccess('s1'); + h.queueJoinSuccess('s2'); + h.start(); + await h.flush(); + const gen1 = h.adapter._activeGen; + h.adapter._channelBusy.add('general'); + h.now = 200000; + for (let i = 0; i < 3; i++) { + h.settleHeartbeat('fail', netErr('ETIMEDOUT')); + await h.flush(); h.fireSleeps(); await h.flush(); + } + assert.strictEqual(h.adapter._reconnectPending, true); + assert.strictEqual(h.adapter._activeGen, gen1); + + // Task ends; next failing heartbeat re-evaluates and now begins reconnect. + // (Begin + re-join to s2 completes within the flush — assert the outcome.) + h.adapter._channelBusy.delete('general'); + h.settleHeartbeat('fail', netErr('ETIMEDOUT')); + await h.flush(); h.fireSleeps(); await h.flush(); + assert.strictEqual(h.joinCalls, 2, 'reconnect proceeds after work drains'); + assert.strictEqual(h.adapter._sessionId, 's2'); + assert.notStrictEqual(h.adapter._activeGen, gen1); + h.adapter.stop(); h.settleHeartbeat('ok'); h.fireSleeps(); await h.runPromise; +}); + +// --------------------------------------------------------------------------- +// 15. Poll failures never trigger reconnect. +// --------------------------------------------------------------------------- +test('poll failures do not trigger reconnect', async () => { + let pollCalls = 0; + const h = makeHarness({ + pollPending: async () => { pollCalls++; throw httpErr(500); }, + }); + h.queueJoinSuccess('s1'); + h.now = 500000; // even with a stale clock, poll failures alone must not reconnect + h.start(); + await h.flush(); + const gen1 = h.adapter._activeGen; + for (let i = 0; i < 5; i++) { h.fireSleeps(); await h.flush(); } + assert.ok(pollCalls >= 1, 'poll was attempted and failed'); + assert.strictEqual(h.adapter._activeGen, gen1, 'poll failures must not change generation'); + assert.strictEqual(h.adapter._consecutiveHeartbeatFailures, 0, 'poll failures must not count as heartbeat failures'); + assert.strictEqual(h.joinCalls, 1); + h.adapter.stop(); h.settleHeartbeat('ok'); h.fireSleeps(); await h.runPromise; +}); + +// --------------------------------------------------------------------------- +// 16. Supervised path never calls remote leave (decision table: all auto exits). +// --------------------------------------------------------------------------- +test('supervised lifecycle never calls remote disconnect (leave)', async () => { + const h = makeHarness(); + h.queueJoinSuccess('s1'); + h.start(); + await h.flush(); + // session_revoked terminal exit + h.settleHeartbeat('fail', new SessionRevokedError('session_revoked')); + await h.flush(); h.fireSleeps(); + await h.runPromise; + assert.strictEqual(h.disconnectCalls, 0, 'no POST /v1/leave on any supervised exit'); +}); + +// --------------------------------------------------------------------------- +// 17. Backoff jitter spreads a fleet (different delays for same attempt index). +// --------------------------------------------------------------------------- +test('backoff applies jitter so a fleet does not retry in lockstep', () => { + const a = new TestAdapter({ workspaceId: 'w', channelName: 'general', token: 't', agentName: 'a', endpoint: 'http://x' }); + const b = new TestAdapter({ workspaceId: 'w', channelName: 'general', token: 't', agentName: 'b', endpoint: 'http://x' }); + // Deterministic-but-different jitter per instance. + a._jitter = () => -0.5; b._jitter = () => 0.5; + const da = a._backoff(2); // base 8000 + const db = b._backoff(2); + assert.notStrictEqual(da, db, 'jittered delays differ'); + assert.ok(da >= 6400 && da <= 9600 && db >= 6400 && db <= 9600, 'within ±20% of 8s'); + // Cap holds. + assert.ok(a._backoff(10) <= 30000 * 1.2); +}); + +// --------------------------------------------------------------------------- +// 18. Kill-switch: OA_ADAPTER_RECONNECT=0 routes to the legacy lifecycle +// (one-shot join, setInterval heartbeat, finally remote disconnect). +// --------------------------------------------------------------------------- +test('kill-switch=0 uses legacy lifecycle (one-shot join + finally leave)', async () => { + const prev = process.env.OA_ADAPTER_RECONNECT; + process.env.OA_ADAPTER_RECONNECT = '0'; + try { + let joinCalls = 0, disconnectCalls = 0, pollCalls = 0; + const adapter = new TestAdapter({ + workspaceId: 'ws', channelName: 'general', token: 't', + agentName: 'legacy-1', endpoint: 'http://mock', agentType: 'claude', + }); + assert.strictEqual(adapter._reconnectEnabled, false, 'kill-switch disables reconnect'); + adapter._sleep = () => Promise.resolve(); // avoid the real 5s legacy poll pacing wait + adapter.client = { + async joinNetwork() { joinCalls++; return { session_id: 's1' }; }, + async heartbeat() { return {}; }, + async pollControl() { return []; }, + async pollToolResults() { return { events: [], cursor: null }; }, + async getHeadEventId() { return null; }, + // Throw synchronously so legacy's `Promise.race([getAgents(), <10s timer>])` + // never constructs the 10s setTimeout (skill sync is best-effort). + getAgents() { throw new Error('skip skill sync'); }, + async disconnect() { disconnectCalls++; }, + async pollPending() { + pollCalls++; + // Stop after the first poll so the legacy loop exits deterministically. + adapter._running = false; + return { messages: [], cursor: null, composing: false }; + }, + }; + await adapter.run(); // legacy path; pollPending stops it after one iteration + assert.strictEqual(joinCalls, 1, 'legacy one-shot join'); + assert.strictEqual(disconnectCalls, 1, 'legacy finally calls remote disconnect (verbatim old behavior)'); + assert.ok(pollCalls >= 1); + // Supervised-only state must be untouched by the legacy path. + assert.strictEqual(adapter._activeGen, null); + } finally { + if (prev === undefined) delete process.env.OA_ADAPTER_RECONNECT; + else process.env.OA_ADAPTER_RECONNECT = prev; + } +}); + +// --------------------------------------------------------------------------- +// 19. PROTOCOL-LEVEL: in-flight task → reconnect → completion post. +// Real WorkspaceClient against a mock HTTP server that rotates and +// validates session_id exactly like the backend. Asserts the result post +// is accepted under the current session and a stale-session post is 401. +// --------------------------------------------------------------------------- +test('protocol: session rotation + message-post validation (real client + mock server)', async () => { + let currentSession = null; + let sessionSeq = 0; + const posted = []; + + const server = http.createServer((req, res) => { + let body = ''; + req.on('data', (c) => { body += c; }); + req.on('end', () => { + const json = body ? JSON.parse(body) : {}; + const send = (code, obj) => { res.writeHead(code, { 'Content-Type': 'application/json' }); res.end(JSON.stringify(obj)); }; + if (req.url === '/v1/join') { + sessionSeq++; currentSession = `sess-${sessionSeq}`; + return send(200, { data: { network_id: 'w', agent_name: json.agent_name, status: 'online', session_id: currentSession } }); + } + if (req.url === '/v1/heartbeat') { + if (json.session_id && json.session_id !== currentSession) { + return send(401, { message: 'session_revoked: another client is now running as this agent' }); + } + return send(200, { data: { status: 'online' } }); + } + if (req.url === '/v1/events' && req.method === 'POST') { + // Mirrors _validate_session: claim != stored → session_revoked. + const claimed = (json.metadata && json.metadata.session_id) || json.session_id; + if (claimed && claimed !== currentSession) { + return send(401, { message: 'session_revoked: another client is now running as this agent' }); + } + posted.push({ session: claimed }); + return send(200, { data: { id: `evt-${posted.length}` } }); + } + return send(404, { message: 'Network not found' }); + }); + }); + await new Promise((r) => server.listen(0, r)); + const port = server.address().port; + const endpoint = `http://127.0.0.1:${port}`; + + try { + const client = new WorkspaceClient(endpoint); + + // 1. Initial join → session 1. + const j1 = await client.joinNetwork('claude-1', 'tok', { network: 'w' }); + assert.strictEqual(j1.session_id, 'sess-1'); + + // 2. Task arrives under session 1. Reconnect happens → session 2. + const j2 = await client.joinNetwork('claude-1', 'tok', { network: 'w' }); + assert.strictEqual(j2.session_id, 'sess-2'); + + // 3. Old task completes; adapter posts result with the CURRENT session id + // (sess-2, as base.js reads this._sessionId at send time) → accepted. + await client.sendMessage('w', 'general', 'tok', 'task done', { + senderName: 'claude-1', sessionId: 'sess-2', + }); + assert.strictEqual(posted.length, 1); + assert.strictEqual(posted[0].session, 'sess-2'); + + // 4. A genuinely stale post (sess-1, e.g. another client took over) → 401 + // SessionRevokedError, NOT silently accepted. + await assert.rejects( + client.sendMessage('w', 'general', 'tok', 'stale', { senderName: 'claude-1', sessionId: 'sess-1' }), + (e) => e instanceof SessionRevokedError, + ); + assert.strictEqual(posted.length, 1, 'stale post not persisted'); + + // 5. Heartbeat with current session ok; with stale session → revoked. + await client.heartbeat('w', 'claude-1', 'tok', 'sess-2'); + await assert.rejects( + client.heartbeat('w', 'claude-1', 'tok', 'sess-1'), + (e) => e instanceof SessionRevokedError, + ); + } finally { + await new Promise((r) => server.close(r)); + } +}); + +// --------------------------------------------------------------------------- +// 20. workspace-client error enrichment: statusCode / code attached, message +// text unchanged; SessionRevokedError still typed. +// --------------------------------------------------------------------------- +test('workspace-client attaches statusCode/code without changing message text', async () => { + const server = http.createServer((req, res) => { + let body = ''; req.on('data', (c) => { body += c; }); + req.on('end', () => { + const send = (code, obj) => { res.writeHead(code, { 'Content-Type': 'application/json' }); res.end(JSON.stringify(obj)); }; + if (req.url === '/auth') return send(401, { message: 'Invalid network token' }); + if (req.url === '/missing') return send(404, { message: 'Network not found' }); + if (req.url === '/boom') return send(503, { message: 'upstream unavailable' }); + if (req.url === '/revoked') return send(401, { message: 'session_revoked: taken over' }); + return send(200, { data: {} }); + }); + }); + await new Promise((r) => server.listen(0, r)); + const port = server.address().port; + try { + const client = new WorkspaceClient(`http://127.0.0.1:${port}`); + await assert.rejects(client._post('/auth', {}), (e) => { + assert.strictEqual(e.statusCode, 401); + assert.strictEqual(e.message, 'Invalid network token'); // text preserved + assert.ok(!(e instanceof SessionRevokedError)); + return true; + }); + await assert.rejects(client._post('/missing', {}), (e) => { assert.strictEqual(e.statusCode, 404); return true; }); + await assert.rejects(client._post('/boom', {}), (e) => { assert.strictEqual(e.statusCode, 503); return true; }); + await assert.rejects(client._post('/revoked', {}), (e) => { assert.ok(e instanceof SessionRevokedError); return true; }); + } finally { + await new Promise((r) => server.close(r)); + } +}); + +// --------------------------------------------------------------------------- +// 21. Heartbeat 404 (member lost server-side): classified by statusCode, counts +// as a heartbeat failure, reconnects ONLY under the AND judgement, exactly +// once, resets state on rejoin, never calls remote leave, and a stale +// generation callback cannot pollute the new session. +// --------------------------------------------------------------------------- +test('heartbeat 404 → counts as failure, reconnects under AND only, no leave', async () => { + const h = makeHarness(); + h.queueJoinSuccess('s1'); // attempt 1 + h.queueJoinSuccess('s2'); // attempt 2 after reconnect + h.start(); + await h.flush(); + const gen1 = h.adapter._activeGen; + + // (a) Structured classification: 404 → 'ambiguous' via statusCode, not message. + assert.strictEqual(h.adapter._classifyError(httpErr(404, 'Network not found')), 'ambiguous'); + assert.strictEqual(h.adapter._classifyError(httpErr(404, 'literally anything else')), 'ambiguous'); + + // (b) AND not yet satisfied: 2 × 404 while the clock is fresh → no reconnect, + // but the 404s ARE counted as heartbeat failures. + h.now = 1000; + for (let i = 0; i < 2; i++) { + h.settleHeartbeat('fail', httpErr(404, 'Network not found')); + await h.flush(); h.fireSleeps(); await h.flush(); + } + assert.strictEqual(h.adapter._activeGen, gen1, 'no reconnect before BOTH conditions met'); + assert.ok(h.adapter._consecutiveHeartbeatFailures >= 2, '404 counted as heartbeat failures'); + assert.strictEqual(h.joinCalls, 1); + assert.strictEqual(h.disconnectCalls, 0); + + // (c) Both conditions: advance clock past the freshness window and keep 404-ing + // → exactly ONE reconnect (one extra join), then connected again. + h.now = 200000; + let guard = 0; + while (h.adapter._activeGen === gen1 && guard++ < 6) { + h.settleHeartbeat('fail', httpErr(404, 'Network not found')); + await h.flush(); h.fireSleeps(); await h.flush(); + } + assert.strictEqual(h.joinCalls, 2, 'exactly one reconnect (one additional join)'); + assert.strictEqual(h.adapter._sessionId, 's2'); + assert.notStrictEqual(h.adapter._activeGen, gen1, '_activeGen updated after rejoin'); + assert.strictEqual(h.adapter._consecutiveHeartbeatFailures, 0, 'failure count reset on rejoin'); + assert.strictEqual(h.adapter._connState, 'connected'); + assert.strictEqual(h.disconnectCalls, 0, 'reconnect never calls remote leave (/v1/leave)'); + + // (d) A stale (old-generation) heartbeat callback must NOT pollute new state. + const liveGen = h.adapter._activeGen; + const sidBefore = h.adapter._sessionId; + const livenessBefore = h.adapter._lastLivenessOkAt; + const origHb = h.client.heartbeat; + h.client.heartbeat = async () => ({ status: 'online' }); // resolves without queueing + h.now = 999999; + await h.adapter._heartbeatOnce(liveGen + 1000); // a generation that is NOT active + h.client.heartbeat = origHb; + assert.strictEqual(h.adapter._sessionId, sidBefore, 'stale-gen hb did not change session'); + assert.strictEqual(h.adapter._activeGen, liveGen, 'stale-gen hb did not change active gen'); + assert.strictEqual(h.adapter._lastLivenessOkAt, livenessBefore, 'stale-gen hb did not refresh liveness'); + + h.adapter.stop(); h.settleHeartbeat('ok'); h.fireSleeps(); await h.runPromise; +});