Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 59 additions & 4 deletions packages/agent-connector/src/adapters/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class BaseAdapter {
// Per-channel task tracking for parallel execution
this._channelBusy = new Set();
this._channelQueues = {};
this._bootstrappedChannels = new Set();
// Cached workspace.browser_enabled. Populated lazily on first read so we
// don't pay an HTTP roundtrip per message — adapters that toggle the
// workspace flag must reconnect/restart to pick up the change (matches
Expand All @@ -69,6 +70,17 @@ class BaseAdapter {
};
}

_runtimeEnv(channelName = this.channelName) {
return {
OPENAGENTS_WORKSPACE_ID: this.workspaceId,
OPENAGENTS_CHANNEL_NAME: channelName || this.channelName || 'general',
OPENAGENTS_AGENT_NAME: this.agentName,
OPENAGENTS_AGENT_TYPE: this.agentType || 'agent',
OPENAGENTS_ENDPOINT: this.endpoint,
OA_WORKSPACE_TOKEN: this.token,
};
}

// ------------------------------------------------------------------
// Lifecycle
// ------------------------------------------------------------------
Expand Down Expand Up @@ -406,6 +418,10 @@ class BaseAdapter {
for (const msg of messages) {
const msgId = msg.id || msg.messageId;
if (msgId && this._processedIds.has(msgId)) continue;
if (msg.eventType === 'workspace.agent.bootstrap') {
incoming.push(msg);
continue;
}
if (msg.messageType === 'status') continue;
// Handle queue cancellation signals from frontend
if (msg.messageType === 'queue_cancel') {
Expand All @@ -420,10 +436,16 @@ class BaseAdapter {

if (incoming.length > 0) {
idleCount = 0;
for (const msg of incoming) {
const bootstraps = incoming.filter((msg) => msg.eventType === 'workspace.agent.bootstrap');
const regularMessages = incoming.filter((msg) => msg.eventType !== 'workspace.agent.bootstrap');
for (const msg of [...bootstraps, ...regularMessages]) {
const msgId = msg.id || msg.messageId;
if (msgId) this._processedIds.add(msgId);
await this._dispatchMessage(msg);
if (msg.eventType === 'workspace.agent.bootstrap') {
await this._dispatchBootstrap(msg);
} else {
await this._dispatchMessage(msg);
}
}
// Cap dedup set
if (this._processedIds.size > 2000) {
Expand Down Expand Up @@ -514,6 +536,20 @@ class BaseAdapter {
this._wakeControlPoller();
}

async _dispatchBootstrap(msg) {
const channel = msg.sessionId || this.channelName || 'general';
const item = { ...msg, _bootstrap: true };

if (this._channelBusy.has(channel)) {
if (!this._channelQueues[channel]) this._channelQueues[channel] = [];
this._channelQueues[channel].push(item);
return;
}

this._channelWorker(channel, item);
this._wakeControlPoller();
}

_cancelQueuedMessage(channel, queueId) {
const queue = this._channelQueues[channel];
if (!queue) return false;
Expand All @@ -527,7 +563,7 @@ class BaseAdapter {
async _channelWorker(channel, msg) {
this._channelBusy.add(channel);
try {
await this._handleMessage(msg);
await this._processChannelItem(channel, msg);
} catch (e) {
this._log(`Error in channel worker for ${channel}: ${e.message}`);
try { await this.sendError(channel, `Agent error: ${e.message}`); } catch {}
Expand All @@ -542,7 +578,7 @@ class BaseAdapter {
try { await this.sendStatus(channel, 'processing queued message', { queue_id: nextMsg._queueId, queue_status: 'processed' }); } catch {}
}
try {
await this._handleMessage(nextMsg);
await this._processChannelItem(channel, nextMsg);
} catch (e) {
this._log(`Error processing queued message in ${channel}: ${e.message}`);
try { await this.sendError(channel, `Agent error: ${e.message}`); } catch {}
Expand All @@ -551,6 +587,25 @@ class BaseAdapter {
this._channelBusy.delete(channel);
}

async _processChannelItem(channel, item) {
if (item && item._bootstrap) {
if (!this._bootstrappedChannels.has(channel)) {
await this._bootstrapChannel(channel, item);
this._bootstrappedChannels.add(channel);
}
return;
}
if (!this._bootstrappedChannels.has(channel)) {
await this._bootstrapChannel(channel, { lazy: true });
this._bootstrappedChannels.add(channel);
}
await this._handleMessage(item);
}

async _bootstrapChannel(channel, _event) {
throw new Error(`${this.constructor.name} must implement _bootstrapChannel for ${channel}`);
}

// ------------------------------------------------------------------
// Auto-title helper
// ------------------------------------------------------------------
Expand Down
210 changes: 132 additions & 78 deletions packages/agent-connector/src/adapters/claude.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ const { formatAttachmentsForPrompt, SESSION_DEFAULT_RE, generateSessionTitle } =
const { buildClaudeSystemPrompt, buildClaudeSkillMd } = require('./workspace-prompt');

const IS_WINDOWS = process.platform === 'win32';
const CLAUDE_ENV_KEEP = new Set([
'CLAUDE_CODE_USE_VERTEX',
'CLAUDE_CODE_USE_BEDROCK',
'CLAUDE_MODEL',
'CLAUDE_API_KEY',
'CLAUDE_CODE_MAX_TURNS',
]);

class ClaudeAdapter extends BaseAdapter {
/**
Expand Down Expand Up @@ -364,33 +371,8 @@ class ClaudeAdapter extends BaseAdapter {
throw new Error('claude CLI not found. Install with: curl -fsSL https://claude.ai/install.sh | bash');
}

let systemPrompt = '\n' + buildClaudeSystemPrompt({
agentName: this.agentName,
workspaceId: this.workspaceId,
channelName,
mode: this._mode,
browserEnabled,
});

// In skills mode, replace MCP tool references with curl-based instructions
if (this.toolMode === 'skills') {
systemPrompt = systemPrompt
.replace(
'Use workspace_get_history to read previous messages.\n' +
'Use workspace_get_agents to see other agents.\n' +
'Use workspace_put_todos to track your progress. ALWAYS create a to-do list when given multiple tasks or multi-step work.\n' +
'Use workspace_create_timer to set a reminder that wakes you up later.\n' +
'Use workspace_create_routine to set up recurring scheduled tasks (e.g. daily reviews).\n',
'Use the openagents-workspace skill (Bash + curl) for workspace operations:\n' +
'reading message history, discovering agents, sharing files, browsing,\n' +
'managing to-do lists, setting timers, and creating routines.\n' +
'Refer to the skill instructions for the exact curl commands.\n'
);
}

const cmd = [claudeBin, '-p', prompt, '--output-format', 'stream-json', '--verbose'];

cmd.push('--append-system-prompt', systemPrompt);
cmd.push('--disallowedTools', 'AskUserQuestion', 'CronCreate', 'CronDelete', 'CronList', 'ScheduleWakeup');

// Resume existing conversation (skipped on retry after stale session)
Expand All @@ -408,6 +390,124 @@ class ClaudeAdapter extends BaseAdapter {
return this._buildMcpCmd(cmd, channelName);
}

_buildClaudeBootstrapPrompt(channelName, browserEnabled) {
let prompt = buildClaudeSystemPrompt({
agentName: this.agentName,
workspaceId: this.workspaceId,
channelName,
mode: this._mode,
browserEnabled,
});

if (this.toolMode === 'skills') {
prompt = prompt.replace(
'Use workspace_get_history to read previous messages.\n' +
'Use workspace_get_agents to see other agents.\n' +
'Use workspace_put_todos to track your progress. ALWAYS create a to-do list when given multiple tasks or multi-step work.\n' +
'Use workspace_create_timer to set a reminder that wakes you up later.\n' +
'Use workspace_create_routine to set up recurring scheduled tasks (e.g. daily reviews).\n',
'Use the openagents-workspace skill (Bash + curl) for workspace operations:\n' +
'reading message history, discovering agents, sharing files, browsing,\n' +
'managing to-do lists, setting timers, and creating routines.\n' +
'Refer to the skill instructions for the exact curl commands.\n'
);
}
return prompt;
}

_buildClaudeEnv(channelName) {
// Strip CLAUDE_* / AI_AGENT variables that make the spawned `claude`
// think it's running under an SDK harness (org-scoped auth path -> 403).
// Preserve provider auth and model selection vars needed by the child.
const cleanEnv = { ...(this.agentEnv || process.env), ...this._runtimeEnv(channelName) };
for (const k of Object.keys(cleanEnv)) {
if ((k.startsWith('CLAUDE_') && !CLAUDE_ENV_KEEP.has(k)) || k === 'CLAUDECODE' || k === 'AI_AGENT') {
delete cleanEnv[k];
}
}
return cleanEnv;
}

_spawnClaudeProcess(cmd, channelName, env) {
// Always resolve shim/symlink to node + JS entry point.
// On Windows: .cmd shims need cmd.exe which creates visible windows.
// On macOS/Linux: #!/usr/bin/env node fails when node isn't on system PATH.
const resolved = this._resolveToNodeCmd(cmd[0]);
if (resolved) {
cmd = [resolved[0], resolved[1], ...cmd.slice(1)];
} else if (IS_WINDOWS && cmd[0].toLowerCase().endsWith('.cmd')) {
cmd = ['cmd.exe', '/c', ...cmd];
}

const proc = spawn(cmd[0], cmd.slice(1), {
stdio: ['ignore', 'pipe', 'pipe'],
env,
cwd: this.workingDir,
detached: !IS_WINDOWS,
windowsHide: true,
});
this._channelProcesses[channelName] = proc;
return proc;
}

async _bootstrapChannel(channelName) {
if (this._channelSessions[channelName]) return;
this._log(`Bootstrapping Claude session for ${channelName}`);
const browserEnabled = await this.getBrowserEnabled();
const built = this._buildClaudeCmd(
this._buildClaudeBootstrapPrompt(channelName, browserEnabled),
channelName,
{ browserEnabled },
);
let cmd = built.cmd;
const mcpConfigFile = built.mcpConfigFile;
const cleanEnv = this._buildClaudeEnv(channelName);

try {
await new Promise((resolve, reject) => {
const proc = this._spawnClaudeProcess(cmd, channelName, cleanEnv);

let stderrBuf = '';
let lineBuffer = '';
if (proc.stderr) proc.stderr.on('data', (chunk) => { stderrBuf += chunk.toString('utf-8'); });
const processLine = (line) => {
line = line.trim();
if (!line) return;
let event;
try { event = JSON.parse(line); } catch { return; }
if (event.type === 'result' && event.session_id) {
this._channelSessions[channelName] = event.session_id;
this._saveSessions();
}
};
proc.stdout.on('data', (chunk) => {
lineBuffer += chunk.toString('utf-8');
const lines = lineBuffer.split('\n');
lineBuffer = lines.pop();
for (const line of lines) processLine(line);
});
proc.on('exit', (code) => {
delete this._channelProcesses[channelName];
for (const line of lineBuffer.split('\n')) processLine(line);
if (code !== 0) {
const detail = stderrBuf.trim() ? `: ${stderrBuf.trim().slice(0, 500)}` : '';
reject(new Error(`Claude bootstrap failed with code ${code}${detail}`));
return;
}
resolve();
});
proc.on('error', (err) => {
delete this._channelProcesses[channelName];
reject(err);
});
});
} finally {
if (mcpConfigFile) {
try { fs.unlinkSync(mcpConfigFile); } catch {}
}
}
}

/**
* Skills mode: write a SKILL.md file and allow Bash + curl for workspace ops.
*/
Expand All @@ -420,11 +520,11 @@ class ClaudeAdapter extends BaseAdapter {
cmd.push('--allowedTools', 'Read', 'Write', 'Edit', 'Bash', 'Glob', 'Grep');
}

// Write SKILL.md to .claude/skills/ in the working directory
// Write SKILL.md to .claude/skills/openagents-workspace/ in the working directory
const workDir = this.workingDir || process.cwd();
const skillDir = path.join(workDir, '.claude', 'skills');
const skillDir = path.join(workDir, '.claude', 'skills', 'openagents-workspace');
fs.mkdirSync(skillDir, { recursive: true });
const skillFile = path.join(skillDir, 'openagents-workspace.md');
const skillFile = path.join(skillDir, 'SKILL.md');

const skillContent = buildClaudeSkillMd({
endpoint: this.endpoint,
Expand Down Expand Up @@ -614,44 +714,16 @@ class ClaudeAdapter extends BaseAdapter {
let mcpConfigFile = null;
let cmd;

// Clean env: strip CLAUDE_* / AI_AGENT variables that make the spawned
// `claude` think it's running under an SDK harness (org-scoped auth
// path → 403). But preserve config vars the child needs for cloud
// provider auth (Vertex, Bedrock) and model selection.
const CLAUDE_ENV_KEEP = new Set([
'CLAUDE_CODE_USE_VERTEX',
'CLAUDE_CODE_USE_BEDROCK',
'CLAUDE_MODEL',
'CLAUDE_API_KEY',
'CLAUDE_CODE_MAX_TURNS',
]);
const cleanEnv = { ...(this.agentEnv || process.env) };
for (const k of Object.keys(cleanEnv)) {
if ((k.startsWith('CLAUDE_') && !CLAUDE_ENV_KEEP.has(k)) || k === 'CLAUDECODE' || k === 'AI_AGENT') {
delete cleanEnv[k];
}
}
const cleanEnv = this._buildClaudeEnv(msgChannel);

// Run up to 2 attempts: first with session resume, then fresh if stale session detected
let _shouldRetry = false;
let effectiveContent = content;
for (let attempt = 0; attempt < 2; attempt++) {
if (mcpConfigFile) { try { fs.unlinkSync(mcpConfigFile); } catch {} mcpConfigFile = null; }

// On the retry pass after a stale --resume, the spawned `claude`
// starts a brand-new session with no memory of prior turns. Replay
// the channel's recent chat history so the agent at least has a
// recap instead of saying "I don't see any previous messages."
if (attempt > 0) {
try {
const recap = await this._buildChannelRecap(msgChannel, content);
if (recap) effectiveContent = `${recap}\n\n---\n\n${content}`;
} catch {}
}

try {
const browserEnabled = await this.getBrowserEnabled();
const built = this._buildClaudeCmd(effectiveContent, msgChannel, {
const built = this._buildClaudeCmd(content, msgChannel, {
skipResume: attempt > 0,
browserEnabled,
});
Expand All @@ -663,24 +735,7 @@ class ClaudeAdapter extends BaseAdapter {
}

try {
// Always resolve shim/symlink to node + JS entry point.
// On Windows: .cmd shims need cmd.exe which creates visible windows.
// On macOS/Linux: #!/usr/bin/env node fails when node isn't on system PATH.
const resolved = this._resolveToNodeCmd(cmd[0]);
if (resolved) {
cmd = [resolved[0], resolved[1], ...cmd.slice(1)];
} else if (IS_WINDOWS && cmd[0].toLowerCase().endsWith('.cmd')) {
cmd = ['cmd.exe', '/c', ...cmd];
}

const proc = spawn(cmd[0], cmd.slice(1), {
stdio: ['ignore', 'pipe', 'pipe'],
env: cleanEnv,
cwd: this.workingDir,
detached: !IS_WINDOWS,
windowsHide: true,
});
this._channelProcesses[msgChannel] = proc;
const proc = this._spawnClaudeProcess(cmd, msgChannel, cleanEnv);

const lastResponseText = [];
let hasToolUseSinceLastText = false;
Expand Down Expand Up @@ -883,8 +938,7 @@ class ClaudeAdapter extends BaseAdapter {
if (lastResponseText.length > 0) {
const fullResponse = lastResponseText.join('\n').trim();
// "Prompt is too long" means the resumed session's context
// exceeded the model's limit. Clear the session and retry
// fresh with a bounded recap instead of the full history.
// exceeded the model's limit. Clear the stale session and retry.
if (/prompt is too long/i.test(fullResponse) && this._channelSessions[msgChannel]) {
this._log(`Prompt too long with resumed session for ${msgChannel}, clearing and retrying`);
delete this._channelSessions[msgChannel];
Expand Down
Loading
Loading