Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion packages/agent-connector/src/adapters/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ const { defaultAgentWorkdir } = require('../paths');

const DEFAULT_ENDPOINT = 'https://workspace-endpoint.openagents.org';

// Queue safety limits
const MAX_QUEUE_SIZE = 10; // Max queued messages per channel
const QUEUE_ITEM_TTL_MS = 5 * 60 * 1000; // 5 minutes — queued messages older than this are expired
const STARTUP_GRACE_MS = 10 * 1000; // 10 seconds after startup: suppress replay of stale queued work

class BaseAdapter {
/**
* @param {object} opts
Expand Down Expand Up @@ -77,6 +82,9 @@ class BaseAdapter {
async run() {
this._running = true;

// Clear any stale channel queues from a prior lifecycle
this._clearStaleQueues();

// Announce agent to workspace
try {
const joinResult = await this.client.joinNetwork(this.agentName, this.token, {
Expand Down Expand Up @@ -157,6 +165,25 @@ class BaseAdapter {
}
}

// ------------------------------------------------------------------
// Startup queue reconciliation
// ------------------------------------------------------------------

/**
* Discard any leftover channel queues from a previous adapter lifecycle.
* On restart, _skipExistingEvents jumps the cursor to head, so any queued
* messages from the old lifecycle are orphans — replaying them would
* generate duplicate transient status/thinking messages and potentially
* trigger the runaway event loop described in issue #492.
*/
_clearStaleQueues() {
const queueCount = Object.keys(this._channelQueues).length;
if (queueCount > 0) {
this._log(`Clearing ${queueCount} stale channel queue(s) from prior lifecycle`);
this._channelQueues = {};
}
}

// ------------------------------------------------------------------
// Heartbeat
// ------------------------------------------------------------------
Expand Down Expand Up @@ -612,10 +639,35 @@ class BaseAdapter {
channel = msg.sessionId;
}

// Startup grace: suppress stale queued work that would replay after restart.
// On restart, _skipExistingEvents jumps the cursor to head, so any leftover
// queue entries from a prior lifecycle are orphans that would generate
// duplicate transient status messages. Drop them silently.
if (msg._queueId && msg._queuedAt && (Date.now() - msg._queuedAt > STARTUP_GRACE_MS)) {
this._log(`Discarding stale queued message ${msg._queueId} (age > ${STARTUP_GRACE_MS / 1000}s)`);
return;
}

if (this._channelBusy.has(channel)) {
if (!this._channelQueues[channel]) this._channelQueues[channel] = [];

// Enforce max queue size — drop oldest if full
if (this._channelQueues[channel].length >= MAX_QUEUE_SIZE) {
const dropped = this._channelQueues[channel].shift();
if (dropped && dropped._queueId) {
this._log(`Queue overflow in ${channel}: dropped ${dropped._queueId}`);
try {
await this.sendStatus(channel, 'message dropped — queue full', {
queue_id: dropped._queueId,
queue_status: 'expired',
});
} catch {}
}
}

const queueId = `q-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
msg._queueId = queueId;
msg._queuedAt = Date.now();
this._channelQueues[channel].push(msg);
try {
await this.sendStatus(channel, 'message queued — will process after current task', {
Expand Down Expand Up @@ -650,11 +702,21 @@ class BaseAdapter {
try { await this.sendError(channel, `Agent error: ${e.message}`); } catch {}
}

// Drain queue
// Drain queue — expire entries older than TTL
while (true) {
const queue = this._channelQueues[channel];
if (!queue || queue.length === 0) break;
const nextMsg = queue.shift();

// Check TTL: discard stale queued messages
if (nextMsg._queuedAt && (Date.now() - nextMsg._queuedAt > QUEUE_ITEM_TTL_MS)) {
this._log(`Expired queued message ${nextMsg._queueId || 'unknown'} in ${channel} (age > ${QUEUE_ITEM_TTL_MS / 1000}s)`);
if (nextMsg._queueId) {
try { await this.sendStatus(channel, 'queued message expired', { queue_id: nextMsg._queueId, queue_status: 'expired' }); } catch {}
}
continue; // skip processing, check next
}

if (nextMsg._queueId) {
try { await this.sendStatus(channel, 'processing queued message', { queue_id: nextMsg._queueId, queue_status: 'processed' }); } catch {}
}
Expand Down
Loading