diff --git a/packages/agent-connector/src/adapters/base.js b/packages/agent-connector/src/adapters/base.js index 7d85b579b..3a2b4c6b6 100644 --- a/packages/agent-connector/src/adapters/base.js +++ b/packages/agent-connector/src/adapters/base.js @@ -53,6 +53,7 @@ class BaseAdapter { // Per-channel task tracking for parallel execution this._channelBusy = new Set(); this._channelQueues = {}; + this._bootstrappedChannels = new Set(); // Cached workspace.browser_enabled. Populated lazily on first read so we // don't pay an HTTP roundtrip per message — adapters that toggle the // workspace flag must reconnect/restart to pick up the change (matches @@ -69,6 +70,17 @@ class BaseAdapter { }; } + _runtimeEnv(channelName = this.channelName) { + return { + OPENAGENTS_WORKSPACE_ID: this.workspaceId, + OPENAGENTS_CHANNEL_NAME: channelName || this.channelName || 'general', + OPENAGENTS_AGENT_NAME: this.agentName, + OPENAGENTS_AGENT_TYPE: this.agentType || 'agent', + OPENAGENTS_ENDPOINT: this.endpoint, + OA_WORKSPACE_TOKEN: this.token, + }; + } + // ------------------------------------------------------------------ // Lifecycle // ------------------------------------------------------------------ @@ -406,6 +418,10 @@ class BaseAdapter { for (const msg of messages) { const msgId = msg.id || msg.messageId; if (msgId && this._processedIds.has(msgId)) continue; + if (msg.eventType === 'workspace.agent.bootstrap') { + incoming.push(msg); + continue; + } if (msg.messageType === 'status') continue; // Handle queue cancellation signals from frontend if (msg.messageType === 'queue_cancel') { @@ -420,10 +436,16 @@ class BaseAdapter { if (incoming.length > 0) { idleCount = 0; - for (const msg of incoming) { + const bootstraps = incoming.filter((msg) => msg.eventType === 'workspace.agent.bootstrap'); + const regularMessages = incoming.filter((msg) => msg.eventType !== 'workspace.agent.bootstrap'); + for (const msg of [...bootstraps, ...regularMessages]) { const msgId = msg.id || msg.messageId; if (msgId) this._processedIds.add(msgId); - await this._dispatchMessage(msg); + if (msg.eventType === 'workspace.agent.bootstrap') { + await this._dispatchBootstrap(msg); + } else { + await this._dispatchMessage(msg); + } } // Cap dedup set if (this._processedIds.size > 2000) { @@ -514,6 +536,20 @@ class BaseAdapter { this._wakeControlPoller(); } + async _dispatchBootstrap(msg) { + const channel = msg.sessionId || this.channelName || 'general'; + const item = { ...msg, _bootstrap: true }; + + if (this._channelBusy.has(channel)) { + if (!this._channelQueues[channel]) this._channelQueues[channel] = []; + this._channelQueues[channel].push(item); + return; + } + + this._channelWorker(channel, item); + this._wakeControlPoller(); + } + _cancelQueuedMessage(channel, queueId) { const queue = this._channelQueues[channel]; if (!queue) return false; @@ -527,7 +563,7 @@ class BaseAdapter { async _channelWorker(channel, msg) { this._channelBusy.add(channel); try { - await this._handleMessage(msg); + await this._processChannelItem(channel, msg); } catch (e) { this._log(`Error in channel worker for ${channel}: ${e.message}`); try { await this.sendError(channel, `Agent error: ${e.message}`); } catch {} @@ -542,7 +578,7 @@ class BaseAdapter { try { await this.sendStatus(channel, 'processing queued message', { queue_id: nextMsg._queueId, queue_status: 'processed' }); } catch {} } try { - await this._handleMessage(nextMsg); + await this._processChannelItem(channel, nextMsg); } catch (e) { this._log(`Error processing queued message in ${channel}: ${e.message}`); try { await this.sendError(channel, `Agent error: ${e.message}`); } catch {} @@ -551,6 +587,25 @@ class BaseAdapter { this._channelBusy.delete(channel); } + async _processChannelItem(channel, item) { + if (item && item._bootstrap) { + if (!this._bootstrappedChannels.has(channel)) { + await this._bootstrapChannel(channel, item); + this._bootstrappedChannels.add(channel); + } + return; + } + if (!this._bootstrappedChannels.has(channel)) { + await this._bootstrapChannel(channel, { lazy: true }); + this._bootstrappedChannels.add(channel); + } + await this._handleMessage(item); + } + + async _bootstrapChannel(channel, _event) { + throw new Error(`${this.constructor.name} must implement _bootstrapChannel for ${channel}`); + } + // ------------------------------------------------------------------ // Auto-title helper // ------------------------------------------------------------------ diff --git a/packages/agent-connector/src/adapters/claude.js b/packages/agent-connector/src/adapters/claude.js index d70bd7eab..14bc63bfe 100644 --- a/packages/agent-connector/src/adapters/claude.js +++ b/packages/agent-connector/src/adapters/claude.js @@ -21,6 +21,13 @@ const { formatAttachmentsForPrompt, SESSION_DEFAULT_RE, generateSessionTitle } = const { buildClaudeSystemPrompt, buildClaudeSkillMd } = require('./workspace-prompt'); const IS_WINDOWS = process.platform === 'win32'; +const CLAUDE_ENV_KEEP = new Set([ + 'CLAUDE_CODE_USE_VERTEX', + 'CLAUDE_CODE_USE_BEDROCK', + 'CLAUDE_MODEL', + 'CLAUDE_API_KEY', + 'CLAUDE_CODE_MAX_TURNS', +]); class ClaudeAdapter extends BaseAdapter { /** @@ -364,33 +371,8 @@ class ClaudeAdapter extends BaseAdapter { throw new Error('claude CLI not found. Install with: curl -fsSL https://claude.ai/install.sh | bash'); } - let systemPrompt = '\n' + buildClaudeSystemPrompt({ - agentName: this.agentName, - workspaceId: this.workspaceId, - channelName, - mode: this._mode, - browserEnabled, - }); - - // In skills mode, replace MCP tool references with curl-based instructions - if (this.toolMode === 'skills') { - systemPrompt = systemPrompt - .replace( - 'Use workspace_get_history to read previous messages.\n' + - 'Use workspace_get_agents to see other agents.\n' + - 'Use workspace_put_todos to track your progress. ALWAYS create a to-do list when given multiple tasks or multi-step work.\n' + - 'Use workspace_create_timer to set a reminder that wakes you up later.\n' + - 'Use workspace_create_routine to set up recurring scheduled tasks (e.g. daily reviews).\n', - 'Use the openagents-workspace skill (Bash + curl) for workspace operations:\n' + - 'reading message history, discovering agents, sharing files, browsing,\n' + - 'managing to-do lists, setting timers, and creating routines.\n' + - 'Refer to the skill instructions for the exact curl commands.\n' - ); - } - const cmd = [claudeBin, '-p', prompt, '--output-format', 'stream-json', '--verbose']; - cmd.push('--append-system-prompt', systemPrompt); cmd.push('--disallowedTools', 'AskUserQuestion', 'CronCreate', 'CronDelete', 'CronList', 'ScheduleWakeup'); // Resume existing conversation (skipped on retry after stale session) @@ -408,6 +390,124 @@ class ClaudeAdapter extends BaseAdapter { return this._buildMcpCmd(cmd, channelName); } + _buildClaudeBootstrapPrompt(channelName, browserEnabled) { + let prompt = buildClaudeSystemPrompt({ + agentName: this.agentName, + workspaceId: this.workspaceId, + channelName, + mode: this._mode, + browserEnabled, + }); + + if (this.toolMode === 'skills') { + prompt = prompt.replace( + 'Use workspace_get_history to read previous messages.\n' + + 'Use workspace_get_agents to see other agents.\n' + + 'Use workspace_put_todos to track your progress. ALWAYS create a to-do list when given multiple tasks or multi-step work.\n' + + 'Use workspace_create_timer to set a reminder that wakes you up later.\n' + + 'Use workspace_create_routine to set up recurring scheduled tasks (e.g. daily reviews).\n', + 'Use the openagents-workspace skill (Bash + curl) for workspace operations:\n' + + 'reading message history, discovering agents, sharing files, browsing,\n' + + 'managing to-do lists, setting timers, and creating routines.\n' + + 'Refer to the skill instructions for the exact curl commands.\n' + ); + } + return prompt; + } + + _buildClaudeEnv(channelName) { + // Strip CLAUDE_* / AI_AGENT variables that make the spawned `claude` + // think it's running under an SDK harness (org-scoped auth path -> 403). + // Preserve provider auth and model selection vars needed by the child. + const cleanEnv = { ...(this.agentEnv || process.env), ...this._runtimeEnv(channelName) }; + for (const k of Object.keys(cleanEnv)) { + if ((k.startsWith('CLAUDE_') && !CLAUDE_ENV_KEEP.has(k)) || k === 'CLAUDECODE' || k === 'AI_AGENT') { + delete cleanEnv[k]; + } + } + return cleanEnv; + } + + _spawnClaudeProcess(cmd, channelName, env) { + // Always resolve shim/symlink to node + JS entry point. + // On Windows: .cmd shims need cmd.exe which creates visible windows. + // On macOS/Linux: #!/usr/bin/env node fails when node isn't on system PATH. + const resolved = this._resolveToNodeCmd(cmd[0]); + if (resolved) { + cmd = [resolved[0], resolved[1], ...cmd.slice(1)]; + } else if (IS_WINDOWS && cmd[0].toLowerCase().endsWith('.cmd')) { + cmd = ['cmd.exe', '/c', ...cmd]; + } + + const proc = spawn(cmd[0], cmd.slice(1), { + stdio: ['ignore', 'pipe', 'pipe'], + env, + cwd: this.workingDir, + detached: !IS_WINDOWS, + windowsHide: true, + }); + this._channelProcesses[channelName] = proc; + return proc; + } + + async _bootstrapChannel(channelName) { + if (this._channelSessions[channelName]) return; + this._log(`Bootstrapping Claude session for ${channelName}`); + const browserEnabled = await this.getBrowserEnabled(); + const built = this._buildClaudeCmd( + this._buildClaudeBootstrapPrompt(channelName, browserEnabled), + channelName, + { browserEnabled }, + ); + let cmd = built.cmd; + const mcpConfigFile = built.mcpConfigFile; + const cleanEnv = this._buildClaudeEnv(channelName); + + try { + await new Promise((resolve, reject) => { + const proc = this._spawnClaudeProcess(cmd, channelName, cleanEnv); + + let stderrBuf = ''; + let lineBuffer = ''; + if (proc.stderr) proc.stderr.on('data', (chunk) => { stderrBuf += chunk.toString('utf-8'); }); + const processLine = (line) => { + line = line.trim(); + if (!line) return; + let event; + try { event = JSON.parse(line); } catch { return; } + if (event.type === 'result' && event.session_id) { + this._channelSessions[channelName] = event.session_id; + this._saveSessions(); + } + }; + proc.stdout.on('data', (chunk) => { + lineBuffer += chunk.toString('utf-8'); + const lines = lineBuffer.split('\n'); + lineBuffer = lines.pop(); + for (const line of lines) processLine(line); + }); + proc.on('exit', (code) => { + delete this._channelProcesses[channelName]; + for (const line of lineBuffer.split('\n')) processLine(line); + if (code !== 0) { + const detail = stderrBuf.trim() ? `: ${stderrBuf.trim().slice(0, 500)}` : ''; + reject(new Error(`Claude bootstrap failed with code ${code}${detail}`)); + return; + } + resolve(); + }); + proc.on('error', (err) => { + delete this._channelProcesses[channelName]; + reject(err); + }); + }); + } finally { + if (mcpConfigFile) { + try { fs.unlinkSync(mcpConfigFile); } catch {} + } + } + } + /** * Skills mode: write a SKILL.md file and allow Bash + curl for workspace ops. */ @@ -420,11 +520,11 @@ class ClaudeAdapter extends BaseAdapter { cmd.push('--allowedTools', 'Read', 'Write', 'Edit', 'Bash', 'Glob', 'Grep'); } - // Write SKILL.md to .claude/skills/ in the working directory + // Write SKILL.md to .claude/skills/openagents-workspace/ in the working directory const workDir = this.workingDir || process.cwd(); - const skillDir = path.join(workDir, '.claude', 'skills'); + const skillDir = path.join(workDir, '.claude', 'skills', 'openagents-workspace'); fs.mkdirSync(skillDir, { recursive: true }); - const skillFile = path.join(skillDir, 'openagents-workspace.md'); + const skillFile = path.join(skillDir, 'SKILL.md'); const skillContent = buildClaudeSkillMd({ endpoint: this.endpoint, @@ -614,44 +714,16 @@ class ClaudeAdapter extends BaseAdapter { let mcpConfigFile = null; let cmd; - // Clean env: strip CLAUDE_* / AI_AGENT variables that make the spawned - // `claude` think it's running under an SDK harness (org-scoped auth - // path → 403). But preserve config vars the child needs for cloud - // provider auth (Vertex, Bedrock) and model selection. - const CLAUDE_ENV_KEEP = new Set([ - 'CLAUDE_CODE_USE_VERTEX', - 'CLAUDE_CODE_USE_BEDROCK', - 'CLAUDE_MODEL', - 'CLAUDE_API_KEY', - 'CLAUDE_CODE_MAX_TURNS', - ]); - const cleanEnv = { ...(this.agentEnv || process.env) }; - for (const k of Object.keys(cleanEnv)) { - if ((k.startsWith('CLAUDE_') && !CLAUDE_ENV_KEEP.has(k)) || k === 'CLAUDECODE' || k === 'AI_AGENT') { - delete cleanEnv[k]; - } - } + const cleanEnv = this._buildClaudeEnv(msgChannel); // Run up to 2 attempts: first with session resume, then fresh if stale session detected let _shouldRetry = false; - let effectiveContent = content; for (let attempt = 0; attempt < 2; attempt++) { if (mcpConfigFile) { try { fs.unlinkSync(mcpConfigFile); } catch {} mcpConfigFile = null; } - // On the retry pass after a stale --resume, the spawned `claude` - // starts a brand-new session with no memory of prior turns. Replay - // the channel's recent chat history so the agent at least has a - // recap instead of saying "I don't see any previous messages." - if (attempt > 0) { - try { - const recap = await this._buildChannelRecap(msgChannel, content); - if (recap) effectiveContent = `${recap}\n\n---\n\n${content}`; - } catch {} - } - try { const browserEnabled = await this.getBrowserEnabled(); - const built = this._buildClaudeCmd(effectiveContent, msgChannel, { + const built = this._buildClaudeCmd(content, msgChannel, { skipResume: attempt > 0, browserEnabled, }); @@ -663,24 +735,7 @@ class ClaudeAdapter extends BaseAdapter { } try { - // Always resolve shim/symlink to node + JS entry point. - // On Windows: .cmd shims need cmd.exe which creates visible windows. - // On macOS/Linux: #!/usr/bin/env node fails when node isn't on system PATH. - const resolved = this._resolveToNodeCmd(cmd[0]); - if (resolved) { - cmd = [resolved[0], resolved[1], ...cmd.slice(1)]; - } else if (IS_WINDOWS && cmd[0].toLowerCase().endsWith('.cmd')) { - cmd = ['cmd.exe', '/c', ...cmd]; - } - - const proc = spawn(cmd[0], cmd.slice(1), { - stdio: ['ignore', 'pipe', 'pipe'], - env: cleanEnv, - cwd: this.workingDir, - detached: !IS_WINDOWS, - windowsHide: true, - }); - this._channelProcesses[msgChannel] = proc; + const proc = this._spawnClaudeProcess(cmd, msgChannel, cleanEnv); const lastResponseText = []; let hasToolUseSinceLastText = false; @@ -883,8 +938,7 @@ class ClaudeAdapter extends BaseAdapter { if (lastResponseText.length > 0) { const fullResponse = lastResponseText.join('\n').trim(); // "Prompt is too long" means the resumed session's context - // exceeded the model's limit. Clear the session and retry - // fresh with a bounded recap instead of the full history. + // exceeded the model's limit. Clear the stale session and retry. if (/prompt is too long/i.test(fullResponse) && this._channelSessions[msgChannel]) { this._log(`Prompt too long with resumed session for ${msgChannel}, clearing and retrying`); delete this._channelSessions[msgChannel]; diff --git a/packages/agent-connector/src/adapters/codex.js b/packages/agent-connector/src/adapters/codex.js index 816035bd3..ce86f95f6 100644 --- a/packages/agent-connector/src/adapters/codex.js +++ b/packages/agent-connector/src/adapters/codex.js @@ -239,20 +239,38 @@ class CodexAdapter extends BaseAdapter { } } + _buildCodexEnv(channelName) { + const env = { ...(this.agentEnv || process.env), ...this._runtimeEnv(channelName) }; + if (this._directModel) env.CODEX_MODEL = this._directModel; + if (this._directApiKey) env.OPENAI_API_KEY = this._directApiKey; + if (this._directBaseUrl) env.OPENAI_BASE_URL = this._directBaseUrl; + return env; + } + + async _bootstrapChannel(channelName) { + const context = this._buildSystemContext(channelName); + if (this._useCliMode) { + const cmd = [this._codexBin, 'exec', '--json', '--dangerously-bypass-approvals-and-sandbox', '--skip-git-repo-check']; + if (this._directModel) cmd.push('-m', this._directModel); + if (this.workingDir) cmd.push('-C', this.workingDir); + const result = await this._spawnCodex(cmd, this._buildCodexEnv(channelName), channelName, context, { discardResponse: true }); + if (result.exitCode !== 0) { + throw new Error(`Codex bootstrap failed with code ${result.exitCode}`); + } + return; + } + + if (this._directMode && !this._conversationHistory.some((m) => m.role === 'system')) { + this._conversationHistory.push({ role: 'system', content: context }); + } + } + // ------------------------------------------------------------------ // CLI subprocess mode (primary) // ------------------------------------------------------------------ async _handleViaSubprocess(content, msgChannel) { - const env = { ...(this.agentEnv || process.env) }; - - // Set model via env if configured - if (this._directModel) env.CODEX_MODEL = this._directModel; - if (this._directApiKey) env.OPENAI_API_KEY = this._directApiKey; - if (this._directBaseUrl) env.OPENAI_BASE_URL = this._directBaseUrl; - - const context = this._buildSystemContext(msgChannel); - const fullPrompt = `${context}\n\n---\n\nUser message:\n${content}`; + const env = this._buildCodexEnv(msgChannel); // Run up to 2 attempts: first with resume, then fresh if stale for (let attempt = 0; attempt < 2; attempt++) { @@ -279,7 +297,7 @@ class CodexAdapter extends BaseAdapter { this._log(`Spawning: codex exec ${threadId && attempt === 0 ? `resume ${threadId} ` : ''}--json --full-auto -m ${this._directModel || 'default'}`); try { - const result = await this._spawnCodex(cmd, env, msgChannel, fullPrompt); + const result = await this._spawnCodex(cmd, env, msgChannel, content); if (result.responseText) { await this.sendResponse(msgChannel, result.responseText); @@ -302,7 +320,7 @@ class CodexAdapter extends BaseAdapter { } } - async _spawnCodex(cmd, env, msgChannel, prompt) { + async _spawnCodex(cmd, env, msgChannel, prompt, { discardResponse = false } = {}) { return new Promise((resolve, reject) => { const proc = spawn(cmd[0], cmd.slice(1), { stdio: ['pipe', 'pipe', 'pipe'], @@ -406,7 +424,7 @@ class CodexAdapter extends BaseAdapter { } resolve({ - responseText: responseTexts.join('\n').trim(), + responseText: discardResponse ? '' : responseTexts.join('\n').trim(), exitCode: code, stderr: stderrBuf, }); @@ -443,9 +461,7 @@ class CodexAdapter extends BaseAdapter { } async _callCompletionApi(userMessage, channel) { - const systemPrompt = this._buildSystemContext(channel); - const messages = [{ role: 'system', content: systemPrompt }]; - messages.push(...this._conversationHistory); + const messages = [...this._conversationHistory]; messages.push({ role: 'user', content: userMessage }); const url = `${this._directBaseUrl}/chat/completions`; diff --git a/packages/agent-connector/src/adapters/cursor.js b/packages/agent-connector/src/adapters/cursor.js index 47f961baa..db04e7b3c 100644 --- a/packages/agent-connector/src/adapters/cursor.js +++ b/packages/agent-connector/src/adapters/cursor.js @@ -16,7 +16,7 @@ const { execSync, spawn } = require('child_process'); const BaseAdapter = require('./base'); const { formatAttachmentsForPrompt, SESSION_DEFAULT_RE, generateSessionTitle } = require('./utils'); -const { buildCursorSkillMd } = require('./workspace-prompt'); +const { buildAgentBootstrapPrompt, buildCursorSkillMd } = require('./workspace-prompt'); const IS_WINDOWS = process.platform === 'win32'; @@ -272,9 +272,9 @@ class CursorAdapter extends BaseAdapter { _writeSkillFile(channelName) { const workDir = this.workingDir || process.cwd(); - const skillDir = path.join(workDir, '.cursor', 'skills'); + const skillDir = path.join(workDir, '.cursor', 'skills', 'openagents-workspace'); fs.mkdirSync(skillDir, { recursive: true }); - const skillFile = path.join(skillDir, 'openagents-workspace.md'); + const skillFile = path.join(skillDir, 'SKILL.md'); const skillContent = buildCursorSkillMd({ endpoint: this.endpoint, @@ -288,6 +288,16 @@ class CursorAdapter extends BaseAdapter { this._log(`Wrote workspace skill to ${skillFile}`); } + _buildBootstrapPrompt(channelName) { + return buildAgentBootstrapPrompt({ + agentName: this.agentName, + agentType: this.agentType || 'cursor', + workspaceId: this.workspaceId, + channelName, + mode: this._mode, + }); + } + // ── Command building ── _buildCursorCmd(prompt, channelName, { skipResume = false } = {}) { @@ -318,6 +328,103 @@ class CursorAdapter extends BaseAdapter { return cmd; } + _buildCursorBootstrapCmd(channelName) { + const agentBin = this._findCursorBinary(); + if (!agentBin) { + throw new Error('Cursor CLI not found. Install with: curl https://cursor.com/install -fsSL | bash'); + } + + const cmd = [agentBin, '-p', this._buildBootstrapPrompt(channelName), '--output-format', 'stream-json', '--trust', '--force']; + + const model = (this.agentEnv || process.env).CURSOR_MODEL; + if (model) cmd.push('--model', model); + if (this.workingDir) cmd.push('--workspace', this.workingDir); + + const sessionId = this._channelSessions[channelName]; + if (sessionId) cmd.push('--resume', sessionId); + + return cmd; + } + + _buildCursorEnv(channelName) { + return { ...(this.agentEnv || process.env), ...this._runtimeEnv(channelName) }; + } + + _spawnCursorProcess(cmd, channelName, env) { + const resolved = this._resolveToNodeCmd(cmd[0]); + if (resolved) { + cmd = [resolved[0], resolved[1], ...cmd.slice(1)]; + } else if (IS_WINDOWS && cmd[0].toLowerCase().endsWith('.cmd')) { + cmd = ['cmd.exe', '/c', ...cmd]; + } + + const proc = spawn(cmd[0], cmd.slice(1), { + stdio: ['ignore', 'pipe', 'pipe'], + env, + cwd: this.workingDir, + detached: !IS_WINDOWS, + windowsHide: true, + }); + this._channelProcesses[channelName] = proc; + return proc; + } + + async _bootstrapChannel(channelName) { + if (this._channelSessions[channelName]) return; + this._log(`Bootstrapping Cursor session for ${channelName}`); + + try { + this._writeSkillFile(channelName); + } catch (e) { + this._log(`Warning: could not write skill file: ${e.message}`); + } + + const cmd = this._buildCursorBootstrapCmd(channelName); + const proc = this._spawnCursorProcess(cmd, channelName, this._buildCursorEnv(channelName)); + + let stderrBuf = ''; + let lineBuffer = ''; + if (proc.stderr) { + proc.stderr.on('data', (chunk) => { stderrBuf += chunk.toString('utf-8'); }); + } + + await new Promise((resolve, reject) => { + const processLine = (line) => { + line = line.trim(); + if (!line) return; + let event; + try { event = JSON.parse(line); } catch { return; } + if ((event.type === 'result' || event.type === 'system') && event.session_id) { + this._channelSessions[channelName] = event.session_id; + this._saveSessions(); + } + }; + + proc.stdout.on('data', (chunk) => { + lineBuffer += chunk.toString('utf-8'); + const lines = lineBuffer.split('\n'); + lineBuffer = lines.pop(); + for (const line of lines) processLine(line); + }); + + proc.on('exit', (code) => { + delete this._channelProcesses[channelName]; + for (const line of lineBuffer.split('\n')) processLine(line); + if (code !== 0) { + const detail = stderrBuf.trim() ? `: ${stderrBuf.trim().slice(0, 500)}` : ''; + reject(new Error(`Cursor bootstrap failed with code ${code}${detail}`)); + return; + } + resolve(); + }); + + proc.on('error', (err) => { + delete this._channelProcesses[channelName]; + reject(err); + }); + }); + } + // ── Message handling ── async _handleMessage(msg) { @@ -371,41 +478,17 @@ class CursorAdapter extends BaseAdapter { let cmd; let _shouldRetry = false; - let effectiveContent = content; for (let attempt = 0; attempt < 2; attempt++) { - if (attempt > 0) { - try { - const recap = await this._buildChannelRecap(msgChannel, content); - if (recap) effectiveContent = `${recap}\n\n---\n\n${content}`; - } catch {} - } - try { - cmd = this._buildCursorCmd(effectiveContent, msgChannel, { skipResume: attempt > 0 }); + cmd = this._buildCursorCmd(content, msgChannel, { skipResume: attempt > 0 }); } catch (e) { await this.sendError(msgChannel, e.message); return; } try { - const resolved = this._resolveToNodeCmd(cmd[0]); - if (resolved) { - cmd = [resolved[0], resolved[1], ...cmd.slice(1)]; - } else if (IS_WINDOWS && cmd[0].toLowerCase().endsWith('.cmd')) { - cmd = ['cmd.exe', '/c', ...cmd]; - } - - const cleanEnv = { ...(this.agentEnv || process.env) }; - - const proc = spawn(cmd[0], cmd.slice(1), { - stdio: ['ignore', 'pipe', 'pipe'], - env: cleanEnv, - cwd: this.workingDir, - detached: !IS_WINDOWS, - windowsHide: true, - }); - this._channelProcesses[msgChannel] = proc; + const proc = this._spawnCursorProcess(cmd, msgChannel, this._buildCursorEnv(msgChannel)); const lastResponseText = []; let hasToolUseSinceLastText = false; diff --git a/packages/agent-connector/src/adapters/gemini.js b/packages/agent-connector/src/adapters/gemini.js index d55e328f4..f3736db78 100644 --- a/packages/agent-connector/src/adapters/gemini.js +++ b/packages/agent-connector/src/adapters/gemini.js @@ -136,6 +136,29 @@ class GeminiAdapter extends BaseAdapter { return null; } + _buildGeminiEnv(channelName) { + return { ...(this.agentEnv || process.env), ...this._runtimeEnv(channelName) }; + } + + _spawnGeminiProcess(cmd, channelName, env) { + const resolved = this._resolveToNodeCmd(cmd[0]); + if (resolved) { + cmd = [resolved[0], resolved[1], ...cmd.slice(1)]; + } else if (IS_WINDOWS && cmd[0].toLowerCase().endsWith('.cmd')) { + cmd = ['cmd.exe', '/c', ...cmd]; + } + + const proc = spawn(cmd[0], cmd.slice(1), { + stdio: ['ignore', 'pipe', 'pipe'], + env, + cwd: this.workingDir, + detached: !IS_WINDOWS, + windowsHide: true, + }); + this._channelProcesses[channelName] = proc; + return proc; + } + _findGeminiBinary() { const home = os.homedir(); const ext = IS_WINDOWS ? '.cmd' : ''; @@ -183,17 +206,7 @@ class GeminiAdapter extends BaseAdapter { throw new Error('gemini CLI not found. Install with: npm install -g @google/gemini-cli'); } - const systemPrompt = '\n' + buildClaudeSystemPrompt({ - agentName: this.agentName, - workspaceId: this.workspaceId, - channelName, - mode: this._mode, - }); - - // For gemini, we combine system prompt with the user message since it doesn't have an append-system-prompt flag - const fullPrompt = `${systemPrompt}\n\n---\n\nUser message:\n${prompt}`; - - const cmd = [geminiBin, '-p', fullPrompt, '-y', '-o', 'stream-json']; + const cmd = [geminiBin, '-p', prompt, '-y', '-o', 'stream-json']; const sessionId = this._channelSessions[channelName]; if (sessionId && !skipResume) { @@ -203,6 +216,59 @@ class GeminiAdapter extends BaseAdapter { return { cmd }; } + _buildGeminiBootstrapCmd(channelName) { + const prompt = buildClaudeSystemPrompt({ + agentName: this.agentName, + workspaceId: this.workspaceId, + channelName, + mode: this._mode, + }); + return this._buildGeminiCmd(prompt, channelName); + } + + async _bootstrapChannel(channelName) { + if (this._channelSessions[channelName]) return; + this._log(`Bootstrapping Gemini session for ${channelName}`); + const { cmd } = this._buildGeminiBootstrapCmd(channelName); + const env = this._buildGeminiEnv(channelName); + await new Promise((resolve, reject) => { + const proc = this._spawnGeminiProcess(cmd, channelName, env); + let stderrBuf = ''; + let lineBuffer = ''; + if (proc.stderr) proc.stderr.on('data', (chunk) => { stderrBuf += chunk.toString('utf-8'); }); + const processLine = (line) => { + line = line.trim(); + if (!line) return; + let event; + try { event = JSON.parse(line); } catch { return; } + if ((event.type === 'init' || event.type === 'result') && event.session_id) { + this._channelSessions[channelName] = event.session_id; + this._saveSessions(); + } + }; + proc.stdout.on('data', (chunk) => { + lineBuffer += chunk.toString('utf-8'); + const lines = lineBuffer.split('\n'); + lineBuffer = lines.pop(); + for (const line of lines) processLine(line); + }); + proc.on('exit', (code) => { + delete this._channelProcesses[channelName]; + for (const line of lineBuffer.split('\n')) processLine(line); + if (code !== 0) { + const detail = stderrBuf.trim() ? `: ${stderrBuf.trim().slice(0, 500)}` : ''; + reject(new Error(`Gemini bootstrap failed with code ${code}${detail}`)); + return; + } + resolve(); + }); + proc.on('error', (err) => { + delete this._channelProcesses[channelName]; + reject(err); + }); + }); + } + async _handleMessage(msg) { let content = (msg.content || '').trim(); const attachments = msg.attachments || []; @@ -244,7 +310,7 @@ class GeminiAdapter extends BaseAdapter { await this.sendStatus(msgChannel, 'thinking...'); let cmd; - const cleanEnv = { ...(this.agentEnv || process.env) }; + const cleanEnv = this._buildGeminiEnv(msgChannel); let _shouldRetry = false; for (let attempt = 0; attempt < 2; attempt++) { @@ -257,21 +323,7 @@ class GeminiAdapter extends BaseAdapter { } try { - const resolved = this._resolveToNodeCmd(cmd[0]); - if (resolved) { - cmd = [resolved[0], resolved[1], ...cmd.slice(1)]; - } else if (IS_WINDOWS && cmd[0].toLowerCase().endsWith('.cmd')) { - cmd = ['cmd.exe', '/c', ...cmd]; - } - - const proc = spawn(cmd[0], cmd.slice(1), { - stdio: ['ignore', 'pipe', 'pipe'], - env: cleanEnv, - cwd: this.workingDir, - detached: !IS_WINDOWS, - windowsHide: true, - }); - this._channelProcesses[msgChannel] = proc; + const proc = this._spawnGeminiProcess(cmd, msgChannel, cleanEnv); const lastResponseText = []; let hasToolUseSinceLastText = false; diff --git a/packages/agent-connector/src/adapters/hermes.js b/packages/agent-connector/src/adapters/hermes.js index b7a430f04..ecc192e46 100644 --- a/packages/agent-connector/src/adapters/hermes.js +++ b/packages/agent-connector/src/adapters/hermes.js @@ -254,7 +254,7 @@ class HermesAdapter extends BaseAdapter { const args = this._buildHermesCmd(prompt, resumeId); this._log(`Running hermes (profile=${this.hermesProfile}, channel=${channelName}, resume=${!!resumeId})`); - const env = { ...(this.agentEnv || process.env) }; + const env = { ...(this.agentEnv || process.env), ...this._runtimeEnv(channelName) }; const proc = spawn(this._hermesBin, args, { env, stdio: ['ignore', 'pipe', 'pipe'], @@ -325,6 +325,11 @@ class HermesAdapter extends BaseAdapter { } } + async _bootstrapChannel(channelName) { + const context = await this._buildContextPrefix(channelName); + await this._runHermes(context, channelName); + } + // ------------------------------------------------------------------ // Message handler // ------------------------------------------------------------------ @@ -341,9 +346,7 @@ class HermesAdapter extends BaseAdapter { await this.sendStatus(msgChannel, 'thinking...'); try { - const context = await this._buildContextPrefix(msgChannel); - const prompt = context ? `${context}\n\n---\n\nUser message:\n${content}` : content; - const responseText = await this._runHermes(prompt, msgChannel); + const responseText = await this._runHermes(content, msgChannel); if (responseText) { await this.sendResponse(msgChannel, responseText); diff --git a/packages/agent-connector/src/adapters/llm-direct.js b/packages/agent-connector/src/adapters/llm-direct.js index 9618de3a5..05037dca7 100644 --- a/packages/agent-connector/src/adapters/llm-direct.js +++ b/packages/agent-connector/src/adapters/llm-direct.js @@ -81,6 +81,12 @@ class LlmDirectAdapter extends BaseAdapter { }); } + async _bootstrapChannel(channelName) { + if (!this._conversationHistory.some((m) => m.role === 'system')) { + this._conversationHistory.push({ role: 'system', content: this._buildSystemPrompt(channelName) }); + } + } + async _handleMessage(msg) { let content = (msg.content || '').trim(); const attachments = msg.attachments || []; @@ -127,10 +133,7 @@ class LlmDirectAdapter extends BaseAdapter { * Call OpenAI-compatible chat completions API with SSE streaming. */ _callCompletionApi(userMessage, channel) { - const systemPrompt = this._buildSystemPrompt(channel); - - const messages = [{ role: 'system', content: systemPrompt }]; - messages.push(...this._conversationHistory); + const messages = [...this._conversationHistory]; messages.push({ role: 'user', content: userMessage }); const url = `${this._baseUrl}/chat/completions`; diff --git a/packages/agent-connector/src/adapters/openclaw.js b/packages/agent-connector/src/adapters/openclaw.js index bfd0309ac..e252db2ea 100644 --- a/packages/agent-connector/src/adapters/openclaw.js +++ b/packages/agent-connector/src/adapters/openclaw.js @@ -151,7 +151,7 @@ class OpenClawAdapter extends BaseAdapter { return; } - const skillName = `openagents-workspace-${this.agentName}`; + const skillName = 'openagents-workspace'; const skillDir = path.join(wsDir, 'skills', skillName); fs.mkdirSync(skillDir, { recursive: true }); @@ -211,11 +211,24 @@ class OpenClawAdapter extends BaseAdapter { } } + async _bootstrapChannel(channelName) { + const context = buildOpenclawSystemPrompt({ + agentName: this.agentName, + workspaceId: this.workspaceId, + channelName, + endpoint: this.endpoint, + token: this.token, + mode: this._mode, + disabledModules: this.disabledModules, + }); + await this._runCliAgent(context, channelName, { discardResponse: true }); + } + // ------------------------------------------------------------------ // CLI mode (openclaw agent --local) // ------------------------------------------------------------------ - _runCliAgent(userMessage, channel) { + _runCliAgent(userMessage, channel, { discardResponse = false } = {}) { return new Promise((resolve, reject) => { // Re-check binary if not found at construction time (installed after daemon started) if (!this._openclawBinary) { @@ -244,7 +257,7 @@ class OpenClawAdapter extends BaseAdapter { this._log(`CLI: ${binary} ${args.slice(0, 5).join(' ')} ...`); - const spawnEnv = { ...(this.agentEnv || process.env) }; + const spawnEnv = { ...(this.agentEnv || process.env), ...this._runtimeEnv(channel) }; if (IS_WINDOWS) { const nodeBinDir = path.dirname(process.execPath); const npmBin = path.join(process.env.APPDATA || '', 'npm'); @@ -394,7 +407,11 @@ class OpenClawAdapter extends BaseAdapter { reject(new Error(`CLI exited ${code}: ${allOutput.slice(-300)}`)); return; } - this._parseCliOutput(allOutput, resolve); + if (discardResponse) { + resolve(''); + } else { + this._parseCliOutput(allOutput, resolve); + } }); }); } diff --git a/packages/agent-connector/src/adapters/opencode.js b/packages/agent-connector/src/adapters/opencode.js index dd46983dd..052f19e53 100644 --- a/packages/agent-connector/src/adapters/opencode.js +++ b/packages/agent-connector/src/adapters/opencode.js @@ -90,8 +90,8 @@ class OpenCodeAdapter extends BaseAdapter { * Write workspace skill to OpenCode's skill directory for auto-discovery. */ _ensureWorkspaceSkill(channelName) { - const skillDir = path.join(this.agentHome, '.opencode', 'skills'); - const skillFile = path.join(skillDir, 'openagents-workspace.md'); + const skillDir = path.join(this.agentHome, '.opencode', 'skills', 'openagents-workspace'); + const skillFile = path.join(skillDir, 'SKILL.md'); try { const content = buildOpenCodeSkillMd({ endpoint: this.endpoint, @@ -192,6 +192,12 @@ class OpenCodeAdapter extends BaseAdapter { } } + async _bootstrapChannel(channelName) { + const context = this._buildSystemContext(channelName); + const responseText = await this._runOpencode(context, channelName, { discardResponse: true }); + if (responseText) this._log(`OpenCode bootstrap produced discarded output for ${channelName}`); + } + // ------------------------------------------------------------------ // JSON output parsing // ------------------------------------------------------------------ @@ -300,7 +306,7 @@ class OpenCodeAdapter extends BaseAdapter { // Subprocess execution // ------------------------------------------------------------------ - _runOpencode(content, msgChannel) { + _runOpencode(content, msgChannel, { discardResponse = false } = {}) { const binary = this._opencodeBinary || this._findOpencodeBinary(); if (binary) this._opencodeBinary = binary; if (!binary) { @@ -312,19 +318,16 @@ class OpenCodeAdapter extends BaseAdapter { const cmd = [binary, 'run', '--format', 'json', '--dir', this.agentHome]; const sessionId = this._channelSessions[msgChannel]; - let fullPrompt; + let fullPrompt = content; if (sessionId) { - fullPrompt = content; cmd.push('--session', sessionId); } else { this._ensureWorkspaceSkill(msgChannel); - const context = this._buildSystemContext(msgChannel); - fullPrompt = `${context}\n\n---\n\n${content}`; } this._log(`CLI: ${binary} ${cmd.slice(1, 5).join(' ')} ...`); - const spawnEnv = { ...(this.agentEnv || process.env) }; + const spawnEnv = { ...(this.agentEnv || process.env), ...this._runtimeEnv(msgChannel) }; let spawnBinary = cmd[0]; let spawnArgs = cmd.slice(1); @@ -364,7 +367,7 @@ class OpenCodeAdapter extends BaseAdapter { if (stdout) { this._persistSessionId(msgChannel, stdout); - resolve(OpenCodeAdapter._extractTextFromJson(stdout)); + resolve(discardResponse ? '' : OpenCodeAdapter._extractTextFromJson(stdout)); } else { if (stderr) { this._log(`opencode stderr: ${stderr.slice(0, 300)}`); diff --git a/packages/agent-connector/src/adapters/workspace-prompt.js b/packages/agent-connector/src/adapters/workspace-prompt.js index 4360c9328..bb4f12550 100644 --- a/packages/agent-connector/src/adapters/workspace-prompt.js +++ b/packages/agent-connector/src/adapters/workspace-prompt.js @@ -75,6 +75,25 @@ function buildWorkspaceIdentity(agentName, workspaceId, channelName, mode = 'exe ); } +function buildAgentBootstrapPrompt({ agentName, agentType = 'agent', workspaceId, channelName, mode = 'execute' }) { + return ( + '## OpenAgents Workspace Runtime Context\n\n' + + 'This bootstrap initializes a new agent session in an OpenAgents workspace thread. ' + + 'Treat it as authoritative runtime context for this session.\n\n' + + `- Agent name: ${agentName}\n` + + `- Agent type: ${agentType || 'agent'}\n` + + `- Workspace ID: ${workspaceId}\n` + + `- Thread/channel: ${channelName || 'general'}\n` + + `- Mode: ${mode}\n\n` + + 'Your text responses are automatically posted to the OpenAgents workspace chat. ' + + 'Use the OpenAgents workspace tools or skill for shared history, files, browser, todos, timers, routines, and agent discovery. ' + + 'If context compaction makes you uncertain, recover current state with workspace tools such as `workspace_get_history` and `workspace_get_agents`.\n' + + buildCollaborationPrompt() + + buildModePrompt(mode) + + buildGuardrails() + ); +} + /** * Build the multi-agent collaboration instructions. */ @@ -525,12 +544,16 @@ function buildOpenclawSystemPrompt({ agentName, workspaceId, channelName, endpoi */ function buildOpenclawSkillMd({ endpoint, workspaceId, token, agentName, channelName, disabledModules, browserEnabled = false }) { const body = buildApiSkillsPrompt({ - endpoint, workspaceId, token, agentName, channelName, disabledModules, mode: 'execute', + endpoint, + workspaceId: '$OPENAGENTS_WORKSPACE_ID', + token: '$OA_WORKSPACE_TOKEN', + agentName: '$OPENAGENTS_AGENT_NAME', + channelName: '$OPENAGENTS_CHANNEL_NAME', + disabledModules, + mode: 'execute', }); - const identity = buildWorkspaceIdentity(agentName, workspaceId, channelName, 'execute'); const directive = buildBrowserDirective(browserEnabled); - const collab = buildCollaborationPrompt(); const frontmatter = ( '---\n' + @@ -546,7 +569,7 @@ function buildOpenclawSkillMd({ endpoint, workspaceId, token, agentName, channel '---\n\n' ); - return frontmatter + identity + directive + '\n' + collab + '\n' + body + '\n' + buildGuardrails(); + return frontmatter + directive + '\n' + body + '\n' + buildGuardrails(); } /** @@ -566,8 +589,11 @@ function buildOpenCodeSystemPrompt({ agentName, workspaceId, channelName, endpoi */ function buildOpenCodeSkillMd({ endpoint, workspaceId, token, agentName, channelName, disabledModules }) { const api = buildApiSkillsPrompt({ - endpoint, workspaceId, token, agentName, - channelName: channelName || 'general', + endpoint, + workspaceId: '$OPENAGENTS_WORKSPACE_ID', + token: '$OA_WORKSPACE_TOKEN', + agentName: '$OPENAGENTS_AGENT_NAME', + channelName: '$OPENAGENTS_CHANNEL_NAME', disabledModules, mode: 'execute', }); @@ -578,11 +604,7 @@ function buildOpenCodeSkillMd({ endpoint, workspaceId, token, agentName, channel 'description: OpenAgents Workspace API — shared files, browser, and agent collaboration\n' + '---\n\n'; - const identity = - `You are agent '${agentName}' connected to OpenAgents workspace ${workspaceId}.\n` + - 'Use these APIs via bash + curl to interact with the workspace.\n\n'; - - return frontmatter + identity + api + '\n' + buildGuardrails(); + return frontmatter + api + '\n' + buildGuardrails(); } /** @@ -594,15 +616,16 @@ function buildOpenCodeSkillMd({ endpoint, workspaceId, token, agentName, channel */ function buildClaudeSkillMd({ endpoint, workspaceId, token, agentName, channelName, disabledModules, browserEnabled = false }) { const api = buildApiSkillsPrompt({ - endpoint, workspaceId, token, agentName, - channelName: channelName || 'general', + endpoint, + workspaceId: '$OPENAGENTS_WORKSPACE_ID', + token: '$OA_WORKSPACE_TOKEN', + agentName: '$OPENAGENTS_AGENT_NAME', + channelName: '$OPENAGENTS_CHANNEL_NAME', disabledModules, mode: 'execute', }); - const identity = buildWorkspaceIdentity(agentName, workspaceId, channelName, 'execute'); const directive = buildBrowserDirective(browserEnabled); - const collab = buildCollaborationPrompt(); const frontmatter = '---\n' + @@ -614,26 +637,27 @@ function buildClaudeSkillMd({ endpoint, workspaceId, token, agentName, channelNa ' or collaborating with other agents via @mentions.\n' + '---\n\n'; - return frontmatter + identity + directive + '\n' + collab + '\n' + api + '\n' + buildGuardrails(); + return frontmatter + directive + '\n' + api + '\n' + buildGuardrails(); } /** * Build a SKILL.md file for Cursor CLI's skill auto-discovery. * - * Written to .cursor/skills/openagents-workspace.md before each CLI spawn. + * Written to .cursor/skills/openagents-workspace/SKILL.md before each CLI spawn. * Cursor discovers skills from the .cursor/skills/ directory automatically. */ function buildCursorSkillMd({ endpoint, workspaceId, token, agentName, channelName, disabledModules, browserEnabled = false }) { const api = buildApiSkillsPrompt({ - endpoint, workspaceId, token, agentName, - channelName: channelName || 'general', + endpoint, + workspaceId: '$OPENAGENTS_WORKSPACE_ID', + token: '$OA_WORKSPACE_TOKEN', + agentName: '$OPENAGENTS_AGENT_NAME', + channelName: '$OPENAGENTS_CHANNEL_NAME', disabledModules, mode: 'execute', }); - const identity = buildWorkspaceIdentity(agentName, workspaceId, channelName, 'execute'); const directive = buildBrowserDirective(browserEnabled); - const collab = buildCollaborationPrompt(); const frontmatter = '---\n' + @@ -645,10 +669,11 @@ function buildCursorSkillMd({ endpoint, workspaceId, token, agentName, channelNa ' or collaborating with other agents via @mentions.\n' + '---\n\n'; - return frontmatter + identity + directive + '\n' + collab + '\n' + api + '\n' + buildGuardrails(); + return frontmatter + directive + '\n' + api + '\n' + buildGuardrails(); } module.exports = { + buildAgentBootstrapPrompt, buildWorkspaceIdentity, buildBrowserDirective, buildCollaborationPrompt, diff --git a/packages/agent-connector/src/workspace-client.js b/packages/agent-connector/src/workspace-client.js index 6a0e908c6..d261d71a8 100644 --- a/packages/agent-connector/src/workspace-client.js +++ b/packages/agent-connector/src/workspace-client.js @@ -274,7 +274,7 @@ class WorkspaceClient { async pollPending(workspaceId, agentName, token, { after, limit = 500 } = {}) { const params = new URLSearchParams({ network: workspaceId, - type: 'workspace.message.posted', + type: 'workspace.', limit: String(limit), }); if (after) params.set('after', after); @@ -303,11 +303,25 @@ class WorkspaceClient { // non-empty and matches no real agent. const messages = []; for (const e of events) { + const eventType = e.type || ''; const source = e.source || ''; + const target = e.target || ''; const meta = e.metadata || {}; const targetAgents = meta.target_agents; const hasTargetList = Array.isArray(targetAgents); + if (eventType === 'workspace.agent.bootstrap') { + if ( + target === `openagents:${agentName}` || + (hasTargetList && targetAgents.includes(agentName)) + ) { + messages.push(this._eventToMessage(e)); + } + continue; + } + + if (eventType !== 'workspace.message.posted') continue; + // Skip own messages if (source === `openagents:${agentName}`) continue; @@ -667,15 +681,20 @@ class WorkspaceClient { _eventToMessage(event) { const source = event.source || ''; const isHuman = source.startsWith('human:'); + const isSystem = source.startsWith('system:'); const senderName = source.replace('openagents:', '').replace('human:', ''); const payload = event.payload || {}; const target = event.target || ''; const ts = event.timestamp; + const eventType = event.type || ''; const msg = { messageId: event.id || '', - sessionId: target.startsWith('channel/') ? target.replace('channel/', '') : target, - senderType: isHuman ? 'human' : 'agent', + eventType, + sessionId: eventType === 'workspace.agent.bootstrap' + ? (payload.channel || payload.channel_name || '') + : (target.startsWith('channel/') ? target.replace('channel/', '') : target), + senderType: isSystem ? 'system' : (isHuman ? 'human' : 'agent'), senderName, content: (payload.content || event.content || ''), mentions: payload.mentions || [], diff --git a/packages/agent-connector/test/bootstrap-event.test.js b/packages/agent-connector/test/bootstrap-event.test.js new file mode 100644 index 000000000..cc1faae2e --- /dev/null +++ b/packages/agent-connector/test/bootstrap-event.test.js @@ -0,0 +1,77 @@ +'use strict'; + +const { describe, it } = require('node:test'); +const assert = require('node:assert/strict'); + +const BaseAdapter = require('../src/adapters/base'); + +function makeAdapter() { + const adapter = new BaseAdapter({ + workspaceId: 'workspace-123', + channelName: 'general', + token: 'token-123', + agentName: 'agent-1', + agentType: 'cursor', + }); + adapter._log = () => {}; + return adapter; +} + +describe('workspace.agent.bootstrap events', () => { + it('requires concrete adapters to implement bootstrap', async () => { + const adapter = makeAdapter(); + + await assert.rejects( + () => adapter._processChannelItem('thread-abc', { _bootstrap: true }), + /must implement _bootstrapChannel/ + ); + }); + + it('dispatches bootstrap before a same-batch message in the same channel', async () => { + const adapter = makeAdapter(); + const calls = []; + + adapter.client.pollPending = async () => ({ + cursor: 'event-2', + messages: [ + { + messageId: 'event-2', + eventType: 'workspace.message.posted', + sessionId: 'thread-abc', + senderType: 'human', + senderName: 'user', + content: 'do the task', + messageType: 'chat', + metadata: { target_agents: ['agent-1'] }, + }, + { + messageId: 'event-1', + eventType: 'workspace.agent.bootstrap', + sessionId: 'thread-abc', + senderType: 'system', + content: '', + metadata: { target_agents: ['agent-1'] }, + }, + ], + }); + adapter.client.pollToolResults = async () => ({ events: [], cursor: null }); + adapter._bootstrapChannel = async (channel) => { + calls.push(`bootstrap:${channel}`); + }; + adapter._handleMessage = async (msg) => { + calls.push(`message:${msg.sessionId}:${msg.content}`); + adapter.stop(); + }; + adapter._sleep = async () => { + await new Promise((resolve) => setImmediate(resolve)); + }; + adapter._running = true; + + await adapter._pollLoop(); + + assert.deepEqual(calls, [ + 'bootstrap:thread-abc', + 'message:thread-abc:do the task', + ]); + }); +}); diff --git a/packages/agent-connector/test/cursor.test.js b/packages/agent-connector/test/cursor.test.js new file mode 100644 index 000000000..463f89915 --- /dev/null +++ b/packages/agent-connector/test/cursor.test.js @@ -0,0 +1,81 @@ +'use strict'; + +const { describe, it } = require('node:test'); +const assert = require('node:assert/strict'); +const fs = require('fs'); +const os = require('os'); +const path = require('path'); + +const CursorAdapter = require('../src/adapters/cursor'); + +function makeAdapter(tmpDir) { + const adapter = new CursorAdapter({ + workspaceId: 'workspace-123', + channelName: 'thread-abc', + token: 'token-123', + agentName: 'cursor-agent', + agentType: 'cursor', + workingDir: tmpDir, + agentEnv: {}, + }); + adapter._findCursorBinary = () => '/usr/bin/agent'; + return adapter; +} + +describe('CursorAdapter', () => { + it('writes the workspace skill as .cursor/skills/openagents-workspace/SKILL.md', () => { + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'oa-cursor-')); + try { + const adapter = makeAdapter(tmpDir); + adapter._writeSkillFile('thread-abc'); + + const newPath = path.join(tmpDir, '.cursor', 'skills', 'openagents-workspace', 'SKILL.md'); + const oldPath = path.join(tmpDir, '.cursor', 'skills', 'openagents-workspace.md'); + assert.equal(fs.existsSync(newPath), true); + assert.equal(fs.existsSync(oldPath), false); + + const skill = fs.readFileSync(newPath, 'utf-8'); + assert.ok(!skill.includes('workspace-123')); + assert.ok(!skill.includes('token-123')); + assert.ok(!skill.includes('cursor-agent')); + assert.ok(skill.includes('$OA_WORKSPACE_TOKEN')); + } finally { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } + }); + + it('keeps normal Cursor prompts free of bootstrap context', () => { + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'oa-cursor-')); + try { + const adapter = makeAdapter(tmpDir); + + const freshCmd = adapter._buildCursorCmd('do the task', 'thread-abc'); + const freshPrompt = freshCmd[freshCmd.indexOf('-p') + 1]; + assert.equal(freshPrompt, 'do the task'); + assert.ok(!freshPrompt.includes('OpenAgents Workspace Runtime Context')); + + adapter._channelSessions['thread-abc'] = 'cursor-session-id'; + const resumedCmd = adapter._buildCursorCmd('continue task', 'thread-abc'); + const resumedPrompt = resumedCmd[resumedCmd.indexOf('-p') + 1]; + assert.equal(resumedPrompt, 'continue task'); + assert.ok(resumedCmd.includes('--resume')); + } finally { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } + }); + + it('builds a separate Cursor bootstrap prompt for session seeding', () => { + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'oa-cursor-')); + try { + const adapter = makeAdapter(tmpDir); + const cmd = adapter._buildCursorBootstrapCmd('thread-abc'); + const prompt = cmd[cmd.indexOf('-p') + 1]; + + assert.ok(prompt.includes('thread-abc')); + assert.ok(prompt.includes('cursor-agent')); + assert.ok(!prompt.includes('do the task')); + } finally { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/agent-connector/test/gemini.test.js b/packages/agent-connector/test/gemini.test.js new file mode 100644 index 000000000..78cc24d2a --- /dev/null +++ b/packages/agent-connector/test/gemini.test.js @@ -0,0 +1,28 @@ +'use strict'; + +const { describe, it } = require('node:test'); +const assert = require('node:assert/strict'); + +const GeminiAdapter = require('../src/adapters/gemini'); + +describe('GeminiAdapter', () => { + it('builds runtime env for Gemini subprocesses', () => { + const adapter = new GeminiAdapter({ + workspaceId: 'workspace-123', + channelName: 'general', + token: 'token-123', + agentName: 'gemini-agent', + agentType: 'gemini', + agentEnv: { GEMINI_API_KEY: 'key-123' }, + }); + + const env = adapter._buildGeminiEnv('thread-abc'); + + assert.equal(env.GEMINI_API_KEY, 'key-123'); + assert.equal(env.OPENAGENTS_WORKSPACE_ID, 'workspace-123'); + assert.equal(env.OPENAGENTS_CHANNEL_NAME, 'thread-abc'); + assert.equal(env.OPENAGENTS_AGENT_NAME, 'gemini-agent'); + assert.equal(env.OPENAGENTS_AGENT_TYPE, 'gemini'); + assert.equal(env.OA_WORKSPACE_TOKEN, 'token-123'); + }); +}); diff --git a/packages/agent-connector/test/skill-paths.test.js b/packages/agent-connector/test/skill-paths.test.js new file mode 100644 index 000000000..e0a9a2cb9 --- /dev/null +++ b/packages/agent-connector/test/skill-paths.test.js @@ -0,0 +1,88 @@ +'use strict'; + +const { describe, it } = require('node:test'); +const assert = require('node:assert/strict'); +const fs = require('fs'); +const os = require('os'); +const path = require('path'); + +const ClaudeAdapter = require('../src/adapters/claude'); +const OpenCodeAdapter = require('../src/adapters/opencode'); +const OpenClawAdapter = require('../src/adapters/openclaw'); + +describe('adapter skill paths', () => { + it('writes Claude skill as .claude/skills/openagents-workspace/SKILL.md', () => { + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'oa-claude-skill-')); + try { + const adapter = new ClaudeAdapter({ + workspaceId: 'workspace-123', + channelName: 'thread-abc', + token: 'token-123', + agentName: 'claude-agent', + agentType: 'claude', + workingDir: tmpDir, + agentEnv: {}, + toolMode: 'skills', + }); + adapter._findClaudeBinary = () => '/usr/bin/claude'; + + adapter._buildClaudeCmd('prompt', 'thread-abc'); + + const newPath = path.join(tmpDir, '.claude', 'skills', 'openagents-workspace', 'SKILL.md'); + const oldPath = path.join(tmpDir, '.claude', 'skills', 'openagents-workspace.md'); + assert.equal(fs.existsSync(newPath), true); + assert.equal(fs.existsSync(oldPath), false); + } finally { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } + }); + + it('writes OpenCode skill as .opencode/skills/openagents-workspace/SKILL.md', () => { + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'oa-opencode-skill-')); + try { + const agentName = `opencode-agent-${path.basename(tmpDir)}`; + const adapter = new OpenCodeAdapter({ + workspaceId: 'workspace-123', + channelName: 'thread-abc', + token: 'token-123', + agentName, + agentType: 'opencode', + agentEnv: {}, + }); + adapter.agentHome = tmpDir; + + adapter._ensureWorkspaceSkill('thread-abc'); + + const newPath = path.join(tmpDir, '.opencode', 'skills', 'openagents-workspace', 'SKILL.md'); + const oldPath = path.join(tmpDir, '.opencode', 'skills', 'openagents-workspace.md'); + assert.equal(fs.existsSync(newPath), true); + assert.equal(fs.existsSync(oldPath), false); + } finally { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } + }); + + it('writes OpenClaw skill to a fixed openagents-workspace directory', () => { + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'oa-openclaw-skill-')); + try { + const adapter = new OpenClawAdapter({ + workspaceId: 'workspace-123', + channelName: 'thread-abc', + token: 'token-123', + agentName: 'openclaw-agent', + agentType: 'openclaw', + agentEnv: {}, + }); + adapter._resolveOpenclawWorkspace = () => tmpDir; + + adapter._installWorkspaceSkill(); + + const skillPath = path.join(tmpDir, 'skills', 'openagents-workspace', 'SKILL.md'); + const agentSpecificPath = path.join(tmpDir, 'skills', 'openagents-workspace-openclaw-agent', 'SKILL.md'); + assert.equal(fs.existsSync(skillPath), true); + assert.equal(fs.existsSync(agentSpecificPath), false); + } finally { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/agent-connector/test/workspace-client.test.js b/packages/agent-connector/test/workspace-client.test.js index 662606951..7930dabf4 100644 --- a/packages/agent-connector/test/workspace-client.test.js +++ b/packages/agent-connector/test/workspace-client.test.js @@ -103,4 +103,22 @@ describe('WorkspaceClient', () => { assert.equal(capturedBody.session_id, 'sess-abc'); }); }); + + it('maps bootstrap events to their payload channel', () => { + const client = new WorkspaceClient(); + const msg = client._eventToMessage({ + id: 'evt-bootstrap', + type: 'workspace.agent.bootstrap', + source: 'system:workspace', + target: 'openagents:agent-alpha', + payload: { channel: 'thread-abc', reason: 'channel_join' }, + metadata: { target_agents: ['agent-alpha'] }, + timestamp: 1700000000000, + }); + + assert.equal(msg.eventType, 'workspace.agent.bootstrap'); + assert.equal(msg.sessionId, 'thread-abc'); + assert.equal(msg.senderType, 'system'); + assert.deepEqual(msg.metadata.target_agents, ['agent-alpha']); + }); }); diff --git a/workspace/backend/alembic/versions/016_migrate_legacy_routine_channels.py b/workspace/backend/alembic/versions/016_migrate_legacy_routine_channels.py new file mode 100644 index 000000000..6f48500c4 --- /dev/null +++ b/workspace/backend/alembic/versions/016_migrate_legacy_routine_channels.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- +"""Migrate legacy per-agent routine channels to per-routine channels. + +Existing deployments of this branch may still point routines at the old shared +channel format ``routines:``. Current code creates one dedicated channel +per routine (``routine:``), so active legacy routines need their own +channel before they fire again. + +Revision ID: 016 +Revises: 015 +Create Date: 2026-05-25 +""" + +import uuid + +from alembic import op +import sqlalchemy as sa + + +revision = "016" +down_revision = "015" +branch_labels = None +depends_on = None + + +def _bare_agent(created_by: str) -> str: + if created_by and created_by.startswith("openagents:"): + return created_by[len("openagents:"):] + return created_by or "" + + +def migrate_existing_routine_channels(conn) -> None: + """Move active routines from ``routines:`` to ``routine:``.""" + # Only active routines can still be picked up by the scheduler. Leave + # cancelled/history rows untouched so this migration does not rewrite + # historical channel references that will never fire again. + routines = conn.execute( + sa.text( + "SELECT id, workspace_id, created_by, name " + "FROM routines " + "WHERE status = 'active' AND channel_name LIKE 'routines:%'" + ) + ).fetchall() + + for routine_id, workspace_id, created_by, routine_name in routines: + agent = _bare_agent(created_by) + if not agent: + continue + + channel_name = f"routine:{routine_id}" + channel_row = conn.execute( + sa.text( + "SELECT id FROM channels " + "WHERE workspace_id = :ws AND name = :n" + ), + {"ws": workspace_id, "n": channel_name}, + ).first() + + if channel_row is None: + channel_id = str(uuid.uuid4()) + conn.execute( + sa.text( + "INSERT INTO channels " + "(id, workspace_id, name, title, master_agent, created_by, status) " + "VALUES (:id, :ws, :n, :t, :ma, 'system:routine', 'active')" + ), + { + "id": channel_id, + "ws": workspace_id, + "n": channel_name, + "t": f"Routine: {routine_name}" if routine_name else "Routine", + "ma": agent, + }, + ) + else: + channel_id = channel_row[0] + + existing_member = conn.execute( + sa.text( + "SELECT 1 FROM channel_members " + "WHERE channel_id = :cid AND agent_name = :a" + ), + {"cid": channel_id, "a": agent}, + ).first() + if existing_member is None: + conn.execute( + sa.text( + "INSERT INTO channel_members (channel_id, agent_name) " + "VALUES (:cid, :a)" + ), + {"cid": channel_id, "a": agent}, + ) + + conn.execute( + sa.text( + "UPDATE routines " + "SET channel_name = :n, created_by = :a " + "WHERE id = :id" + ), + {"n": channel_name, "a": agent, "id": routine_id}, + ) + + +def upgrade() -> None: + migrate_existing_routine_channels(op.get_bind()) + + +def downgrade() -> None: + # No-op: we don't know each routine's original shared per-agent channel. + pass diff --git a/workspace/backend/app/mods/workspace_mod.py b/workspace/backend/app/mods/workspace_mod.py index c24a37dfa..61fd28a64 100644 --- a/workspace/backend/app/mods/workspace_mod.py +++ b/workspace/backend/app/mods/workspace_mod.py @@ -15,15 +15,18 @@ import logging import re +import uuid from datetime import datetime, timezone from typing import List, Optional from sqlalchemy import select +from app.models import EventRecord from openagents.core.onm_events import Event, WorkspaceEventTypes from openagents.core.onm_mods import EventRejected, PipelineContext, TransformMod logger = logging.getLogger(__name__) +WORKSPACE_AGENT_BOOTSTRAP = "workspace.agent.bootstrap" # Lazy-initialized LLM client for the router _llm_client = None @@ -48,6 +51,20 @@ async def process(self, event: Event, context: PipelineContext) -> Optional[Even # Per-type handlers # --------------------------------------------------------------------------- +def emit_agent_bootstrap_event(db, workspace_id: str, channel_name: str, agent_name: str, *, timestamp: int, reason: str) -> None: + """Persist a one-time bootstrap event for an agent before user work is routed.""" + db.add(EventRecord( + id=str(uuid.uuid4()), + network_id=str(workspace_id), + type=WORKSPACE_AGENT_BOOTSTRAP, + source="system:workspace", + target=f"openagents:{agent_name}", + payload={"channel": channel_name, "reason": reason}, + metadata_={"target_agents": [agent_name], "channel": channel_name}, + timestamp=max(0, int(timestamp) - 1), + visibility="system", + )) + async def _handle_agent_join(event: Event, ctx: PipelineContext) -> Optional[Event]: """network.agent.join → upsert WorkspaceMember, set online, rotate session.""" import uuid as _uuid @@ -301,6 +318,14 @@ async def _handle_channel_create(event: Event, ctx: PipelineContext) -> Optional participants = payload.get("participants", []) for agent_name in participants: db.add(ChannelMember(channel_id=channel.id, agent_name=agent_name)) + emit_agent_bootstrap_event( + db, + workspace.id, + channel.name, + agent_name, + timestamp=event.timestamp, + reason="channel_create", + ) db.flush() @@ -383,6 +408,14 @@ async def _handle_channel_join(event: Event, ctx: PipelineContext) -> Optional[E if not existing: db.add(ChannelMember(channel_id=channel.id, agent_name=agent_name)) + emit_agent_bootstrap_event( + db, + workspace.id, + channel.name, + agent_name, + timestamp=event.timestamp, + reason="channel_join", + ) # Auto-promote first agent to channel master if none set if not channel.master_agent: channel.master_agent = agent_name @@ -595,7 +628,6 @@ async def _route_with_llm(channel, new_event: Event, db, workspace) -> List[str] Falls back to empty list on any error. """ from app.config import config - from app.models import EventRecord if not _get_router_api_key(): logger.warning("LLM router: no API key set (ROUTER_LLM_API_KEY or ANTHROPIC_API_KEY), defaulting to stop") @@ -898,6 +930,14 @@ async def _handle_message_posted(event: Event, ctx: PipelineContext) -> Optional continue if agent_name not in existing: db.add(ChannelMember(channel_id=channel.id, agent_name=agent_name)) + emit_agent_bootstrap_event( + db, + workspace.id, + channel.name, + agent_name, + timestamp=event.timestamp, + reason="message_routing", + ) existing.add(agent_name) db.flush() diff --git a/workspace/backend/app/routers/routines.py b/workspace/backend/app/routers/routines.py index ced5081b7..16c190e9f 100644 --- a/workspace/backend/app/routers/routines.py +++ b/workspace/backend/app/routers/routines.py @@ -17,6 +17,7 @@ from sqlalchemy.orm import Session from app.database import get_db +from app.mods.workspace_mod import emit_agent_bootstrap_event from app.models import Channel, ChannelMember, RoutineRecord, Workspace, WorkspaceMember from app.response import ResponseCode, json_response, success_response from app.routers.network import _resolve_workspace, _verify_workspace_access @@ -99,6 +100,14 @@ def _get_or_create_routine_channel( db.add(channel) db.flush() db.add(ChannelMember(channel_id=channel.id, agent_name=agent)) + emit_agent_bootstrap_event( + db, + workspace.id, + channel.name, + agent, + timestamp=int(datetime.now(timezone.utc).timestamp() * 1000), + reason="routine_create", + ) return channel diff --git a/workspace/backend/app/routers/workspaces.py b/workspace/backend/app/routers/workspaces.py index dbe8f2912..0f22df276 100644 --- a/workspace/backend/app/routers/workspaces.py +++ b/workspace/backend/app/routers/workspaces.py @@ -26,6 +26,7 @@ from app.config import config from app.database import get_db +from app.mods.workspace_mod import emit_agent_bootstrap_event from app.models import ( Channel, ChannelMember, @@ -232,6 +233,15 @@ async def create_workspace( ) db.add(participant) + emit_agent_bootstrap_event( + db, + workspace.id, + channel.name, + body.agent_name, + timestamp=int(now.timestamp() * 1000), + reason="workspace_create", + ) + db.commit() db.refresh(workspace) diff --git a/workspace/backend/tests/test_channel_membership.py b/workspace/backend/tests/test_channel_membership.py index 5325db96e..b714ff726 100644 --- a/workspace/backend/tests/test_channel_membership.py +++ b/workspace/backend/tests/test_channel_membership.py @@ -23,6 +23,37 @@ def _post_event(client, workspace, *, etype, source, channel, agent_name): class TestChannelJoinAuth: + def test_workspace_create_emits_initial_agent_bootstrap_event(self, client): + resp = client.post("/v1/workspaces", json={ + "name": "Bootstrap Workspace", + "agent_name": "agent-initial", + "agent_type": "cursor", + }) + assert resp.status_code == 200, resp.text + workspace = resp.json()["data"] + channel = workspace["channel"]["name"] + + events_resp = client.get( + "/v1/events", + params={ + "network": workspace["workspaceId"], + "type": "workspace.agent.bootstrap", + "limit": 20, + }, + headers=_headers({"token": workspace["token"]}), + ) + assert events_resp.status_code == 200, events_resp.text + events = events_resp.json()["data"]["events"] + bootstraps = [ + e for e in events + if e["type"] == "workspace.agent.bootstrap" + and e["target"] == "openagents:agent-initial" + ] + assert bootstraps + bootstrap = bootstraps[0] + assert bootstrap["payload"]["channel"] == channel + assert bootstrap["metadata"]["target_agents"] == ["agent-initial"] + def test_human_can_invite(self, client, workspace): channel = workspace["channel"]["name"] resp = _post_event( @@ -34,6 +65,38 @@ def test_human_can_invite(self, client, workspace): ) assert resp.status_code == 200, resp.text + def test_join_emits_agent_bootstrap_event(self, client, workspace): + channel = workspace["channel"]["name"] + resp = _post_event( + client, workspace, + etype="network.channel.join", + source="human:user", + channel=channel, + agent_name="agent-bootstrap", + ) + assert resp.status_code == 200, resp.text + + events_resp = client.get( + "/v1/events", + params={ + "network": workspace["id"], + "type": "workspace.agent.bootstrap", + "limit": 20, + }, + headers=_headers(workspace), + ) + assert events_resp.status_code == 200, events_resp.text + events = events_resp.json()["data"]["events"] + bootstraps = [ + e for e in events + if e["type"] == "workspace.agent.bootstrap" + and e["target"] == "openagents:agent-bootstrap" + ] + assert bootstraps + bootstrap = bootstraps[0] + assert bootstrap["payload"]["channel"] == channel + assert bootstrap["metadata"]["target_agents"] == ["agent-bootstrap"] + def test_unrelated_agent_cannot_invite(self, client, workspace): """Random openagents source can't add an agent to a channel they don't own.""" channel = workspace["channel"]["name"] diff --git a/workspace/backend/tests/test_events.py b/workspace/backend/tests/test_events.py index e8bc2fca4..809bacf9b 100644 --- a/workspace/backend/tests/test_events.py +++ b/workspace/backend/tests/test_events.py @@ -320,6 +320,15 @@ def test_poll_cursor_pagination(self, client, workspace): assert len(data2["events"]) == 1 assert data2["has_more"] is False + def test_poll_accepts_connector_default_limit(self, client, workspace): + """Agent connector polls with limit=500 by default.""" + resp = client.get("/v1/events", params={ + "network": workspace["id"], + "type": "workspace.", + "limit": 500, + }, headers={"X-Workspace-Token": workspace["token"]}) + assert resp.status_code == 200 + def test_poll_invalid_network(self, client): """Polling nonexistent network returns 404.""" resp = client.get("/v1/events", params={"network": "nonexistent"}) diff --git a/workspace/backend/tests/test_routine_channel_migration.py b/workspace/backend/tests/test_routine_channel_migration.py new file mode 100644 index 000000000..5597ac02b --- /dev/null +++ b/workspace/backend/tests/test_routine_channel_migration.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- +""" +Tests for migrating legacy per-agent routine channels to per-routine channels. +""" + +import importlib.util +from pathlib import Path + +import sqlalchemy as sa + + +def _load_migration(): + migration_path = ( + Path(__file__).resolve().parents[1] + / "alembic" + / "versions" + / "016_migrate_legacy_routine_channels.py" + ) + spec = importlib.util.spec_from_file_location("routine_channel_migration_016", migration_path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +def test_migrates_active_legacy_routines_to_per_routine_channels(): + migration = _load_migration() + engine = sa.create_engine("sqlite://") + + metadata = sa.MetaData() + sa.Table( + "routines", + metadata, + sa.Column("id", sa.Text(), primary_key=True), + sa.Column("workspace_id", sa.Text(), nullable=False), + sa.Column("channel_name", sa.Text(), nullable=False), + sa.Column("created_by", sa.Text(), nullable=False), + sa.Column("name", sa.Text(), nullable=True), + sa.Column("status", sa.Text(), nullable=False), + ) + sa.Table( + "channels", + metadata, + sa.Column("id", sa.Text(), primary_key=True), + sa.Column("workspace_id", sa.Text(), nullable=False), + sa.Column("name", sa.Text(), nullable=False), + sa.Column("title", sa.Text(), nullable=True), + sa.Column("master_agent", sa.Text(), nullable=True), + sa.Column("created_by", sa.Text(), nullable=True), + sa.Column("status", sa.Text(), nullable=True), + ) + sa.Table( + "channel_members", + metadata, + sa.Column("channel_id", sa.Text(), nullable=False), + sa.Column("agent_name", sa.Text(), nullable=False), + ) + metadata.create_all(engine) + + with engine.begin() as conn: + conn.execute(sa.text( + "INSERT INTO channels " + "(id, workspace_id, name, title, master_agent, created_by, status) " + "VALUES ('legacy-channel', 'ws-1', 'routines:agent-alpha', 'agent-alpha', " + "'agent-alpha', 'system:routine', 'active')" + )) + conn.execute(sa.text( + "INSERT INTO routines " + "(id, workspace_id, channel_name, created_by, name, status) " + "VALUES " + "('routine-1', 'ws-1', 'routines:agent-alpha', " + "'openagents:agent-alpha', 'Daily A', 'active'), " + "('routine-2', 'ws-1', 'routines:agent-alpha', " + "'agent-alpha', 'Daily B', 'active'), " + "('routine-3', 'ws-1', 'routine:routine-3', " + "'agent-alpha', 'Already migrated', 'active'), " + "('routine-4', 'ws-1', 'routines:agent-alpha', " + "'agent-alpha', 'Cancelled', 'cancelled')" + )) + + migration.migrate_existing_routine_channels(conn) + + routines = conn.execute(sa.text( + "SELECT id, channel_name, created_by FROM routines ORDER BY id" + )).fetchall() + channels = conn.execute(sa.text( + "SELECT name, title, master_agent, created_by, status FROM channels ORDER BY name" + )).fetchall() + members = conn.execute(sa.text( + "SELECT c.name, cm.agent_name " + "FROM channel_members cm JOIN channels c ON c.id = cm.channel_id " + "ORDER BY c.name" + )).fetchall() + + assert routines == [ + ("routine-1", "routine:routine-1", "agent-alpha"), + ("routine-2", "routine:routine-2", "agent-alpha"), + ("routine-3", "routine:routine-3", "agent-alpha"), + ("routine-4", "routines:agent-alpha", "agent-alpha"), + ] + assert ( + "routine:routine-1", "Routine: Daily A", "agent-alpha", "system:routine", "active", + ) in channels + assert ( + "routine:routine-2", "Routine: Daily B", "agent-alpha", "system:routine", "active", + ) in channels + assert ("routine:routine-1", "agent-alpha") in members + assert ("routine:routine-2", "agent-alpha") in members diff --git a/workspace/backend/tests/test_routines.py b/workspace/backend/tests/test_routines.py index fdc62ca27..dc1a16904 100644 --- a/workspace/backend/tests/test_routines.py +++ b/workspace/backend/tests/test_routines.py @@ -16,6 +16,7 @@ def _create_payload(workspace, **overrides): base = { "name": "Test routine", "message": "ping", + "context": "routine background", "network": workspace["id"], "channel": workspace["channel"]["name"], "source": "openagents:agent-alpha", @@ -43,6 +44,35 @@ def test_daily_mode_unchanged(self): class TestCreateRoutine: + def test_create_routine_emits_bootstrap_for_routine_channel(self, client, workspace): + resp = client.post( + "/v1/routines", + json=_create_payload(workspace, interval_minutes=15), + headers=_headers(workspace), + ) + assert resp.status_code == 200, resp.text + data = resp.json()["data"] + + events_resp = client.get( + "/v1/events", + params={ + "network": workspace["id"], + "type": "workspace.agent.bootstrap", + "limit": 20, + }, + headers=_headers(workspace), + ) + assert events_resp.status_code == 200, events_resp.text + events = events_resp.json()["data"]["events"] + bootstraps = [ + e for e in events + if e["type"] == "workspace.agent.bootstrap" + and e["target"] == "openagents:agent-alpha" + and e["payload"]["channel"] == data["channel_name"] + ] + assert bootstraps + assert bootstraps[0]["payload"]["reason"] == "routine_create" + def test_create_daily_mode(self, client, workspace): resp = client.post( "/v1/routines", @@ -68,7 +98,7 @@ def test_create_interval_mode(self, client, workspace): assert data["schedule_minute"] is None def test_routes_into_per_agent_channel(self, client, workspace): - """Routine should land in routines:, regardless of caller's channel.""" + """Routine should land in a dedicated routine: channel, regardless of caller's channel.""" resp = client.post( "/v1/routines", json=_create_payload( @@ -81,7 +111,8 @@ def test_routes_into_per_agent_channel(self, client, workspace): ) assert resp.status_code == 200, resp.text data = resp.json()["data"] - assert data["channel_name"] == "routines:agent-alpha" + assert data["channel_name"].startswith("routine:") + assert data["channel_name"].endswith(data["id"]) assert data["created_by"] == "agent-alpha" # bare, no prefix def test_bare_source_also_normalized(self, client, workspace): @@ -97,11 +128,12 @@ def test_bare_source_also_normalized(self, client, workspace): ) assert resp.status_code == 200, resp.text data = resp.json()["data"] - assert data["channel_name"] == "routines:agent-alpha" + assert data["channel_name"].startswith("routine:") + assert data["channel_name"].endswith(data["id"]) assert data["created_by"] == "agent-alpha" - def test_routine_channel_reused_across_routines(self, client, workspace): - """Two routines for the same agent share one channel.""" + def test_routine_channel_is_dedicated_per_routine(self, client, workspace): + """Two routines for the same agent get separate routine channels.""" for i in range(2): client.post( "/v1/routines", @@ -114,7 +146,10 @@ def test_routine_channel_reused_across_routines(self, client, workspace): ) routines = resp.json()["data"]["routines"] assert len(routines) == 2 - assert {r["channel_name"] for r in routines} == {"routines:agent-alpha"} + channels = {r["channel_name"] for r in routines} + assert len(channels) == 2 + assert all(ch.startswith("routine:") for ch in channels) + assert channels == {f"routine:{r['id']}" for r in routines} def test_reject_both_modes(self, client, workspace): resp = client.post(