From 4f09e9a0b13c7541b5962abc1abcc67b1896a684 Mon Sep 17 00:00:00 2001 From: Gautam Kumar Date: Sat, 20 Jun 2026 18:18:14 +0530 Subject: [PATCH] fix: prevent runaway status/event loop in multi-agent workspaces Add queue safety limits to BaseAdapter to prevent the self-amplifying event loop that occurs when agents restart into busy historical workspaces. Key changes: - Add max queue size per channel (10 messages) with backpressure - Add 5-minute TTL for queued messages to expire stale work - Add startup reconciliation to clear stale queues from prior lifecycles - Emit terminal states (expired) for dropped/expired queued messages - Add startup grace period to suppress replay of stale queued work These guards prevent the scenario where transient status/thinking/todos messages accumulate unboundedly, exhaust the DB connection pool, and render the workspace unusable after restart. Signed-off-by: Gautam Kumar --- packages/agent-connector/src/adapters/base.js | 64 ++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/packages/agent-connector/src/adapters/base.js b/packages/agent-connector/src/adapters/base.js index 18005fda8..b06226ac3 100644 --- a/packages/agent-connector/src/adapters/base.js +++ b/packages/agent-connector/src/adapters/base.js @@ -23,6 +23,11 @@ const { defaultAgentWorkdir } = require('../paths'); const DEFAULT_ENDPOINT = 'https://workspace-endpoint.openagents.org'; +// Queue safety limits +const MAX_QUEUE_SIZE = 10; // Max queued messages per channel +const QUEUE_ITEM_TTL_MS = 5 * 60 * 1000; // 5 minutes — queued messages older than this are expired +const STARTUP_GRACE_MS = 10 * 1000; // 10 seconds after startup: suppress replay of stale queued work + class BaseAdapter { /** * @param {object} opts @@ -77,6 +82,9 @@ class BaseAdapter { async run() { this._running = true; + // Clear any stale channel queues from a prior lifecycle + this._clearStaleQueues(); + // Announce agent to workspace try { const joinResult = await this.client.joinNetwork(this.agentName, this.token, { @@ -157,6 +165,25 @@ class BaseAdapter { } } + // ------------------------------------------------------------------ + // Startup queue reconciliation + // ------------------------------------------------------------------ + + /** + * Discard any leftover channel queues from a previous adapter lifecycle. + * On restart, _skipExistingEvents jumps the cursor to head, so any queued + * messages from the old lifecycle are orphans — replaying them would + * generate duplicate transient status/thinking messages and potentially + * trigger the runaway event loop described in issue #492. + */ + _clearStaleQueues() { + const queueCount = Object.keys(this._channelQueues).length; + if (queueCount > 0) { + this._log(`Clearing ${queueCount} stale channel queue(s) from prior lifecycle`); + this._channelQueues = {}; + } + } + // ------------------------------------------------------------------ // Heartbeat // ------------------------------------------------------------------ @@ -612,10 +639,35 @@ class BaseAdapter { channel = msg.sessionId; } + // Startup grace: suppress stale queued work that would replay after restart. + // On restart, _skipExistingEvents jumps the cursor to head, so any leftover + // queue entries from a prior lifecycle are orphans that would generate + // duplicate transient status messages. Drop them silently. + if (msg._queueId && msg._queuedAt && (Date.now() - msg._queuedAt > STARTUP_GRACE_MS)) { + this._log(`Discarding stale queued message ${msg._queueId} (age > ${STARTUP_GRACE_MS / 1000}s)`); + return; + } + if (this._channelBusy.has(channel)) { if (!this._channelQueues[channel]) this._channelQueues[channel] = []; + + // Enforce max queue size — drop oldest if full + if (this._channelQueues[channel].length >= MAX_QUEUE_SIZE) { + const dropped = this._channelQueues[channel].shift(); + if (dropped && dropped._queueId) { + this._log(`Queue overflow in ${channel}: dropped ${dropped._queueId}`); + try { + await this.sendStatus(channel, 'message dropped — queue full', { + queue_id: dropped._queueId, + queue_status: 'expired', + }); + } catch {} + } + } + const queueId = `q-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; msg._queueId = queueId; + msg._queuedAt = Date.now(); this._channelQueues[channel].push(msg); try { await this.sendStatus(channel, 'message queued — will process after current task', { @@ -650,11 +702,21 @@ class BaseAdapter { try { await this.sendError(channel, `Agent error: ${e.message}`); } catch {} } - // Drain queue + // Drain queue — expire entries older than TTL while (true) { const queue = this._channelQueues[channel]; if (!queue || queue.length === 0) break; const nextMsg = queue.shift(); + + // Check TTL: discard stale queued messages + if (nextMsg._queuedAt && (Date.now() - nextMsg._queuedAt > QUEUE_ITEM_TTL_MS)) { + this._log(`Expired queued message ${nextMsg._queueId || 'unknown'} in ${channel} (age > ${QUEUE_ITEM_TTL_MS / 1000}s)`); + if (nextMsg._queueId) { + try { await this.sendStatus(channel, 'queued message expired', { queue_id: nextMsg._queueId, queue_status: 'expired' }); } catch {} + } + continue; // skip processing, check next + } + if (nextMsg._queueId) { try { await this.sendStatus(channel, 'processing queued message', { queue_id: nextMsg._queueId, queue_status: 'processed' }); } catch {} }