diff --git a/web/src/claude.ts b/web/src/claude.ts index d2c8bb2..6803bec 100755 --- a/web/src/claude.ts +++ b/web/src/claude.ts @@ -1,160 +1,20 @@ -// Edge-native Claude executor — Anthropic Messages API with tool loop -// Replaces the Agent SDK query() for edge deployment +// Edge-native Claude executor — provider-backed tool loop +// Replaces direct Anthropic transport with @stackbilt/llm-providers. +import { createLLMProviderFactory, type LLMMessage, type Tool, type ToolCall, type ToolResult as LLMToolResult } from '@stackbilt/llm-providers'; import { McpClient } from './mcp-client.js'; import { budgetConversationHistory } from './kernel/memory/index.js'; +import { buildLLMProviderFactory } from './kernel/provider-factory.js'; import { buildContext, handleInProcessTool, resolveMcpTool, - getModelCostRates, type ClaudeConfig, - type ContentBlock, - type Message, - type ApiResponse, } from './claude-tools/index.js'; // Re-export for external consumers export { buildContext, handleInProcessTool, resolveMcpTool, type ClaudeConfig } from './claude-tools/index.js'; -// ─── Anthropic SSE streaming ───────────────────────────────── - -type StreamBlockState = { - type: 'text' | 'tool_use'; - text: string; - id: string; - name: string; - inputJson: string; -}; - -async function* parseAnthropicSSE(body: ReadableStream): AsyncGenerator> { - const reader = body.getReader(); - const decoder = new TextDecoder(); - let buffer = ''; - - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - buffer += decoder.decode(value, { stream: true }); - - let newlineIdx: number; - while ((newlineIdx = buffer.indexOf('\n')) >= 0) { - const line = buffer.slice(0, newlineIdx).trim(); - buffer = buffer.slice(newlineIdx + 1); - - if (!line.startsWith('data: ')) continue; - const payload = line.slice(6).trim(); - if (payload === '[DONE]') return; - - try { - yield JSON.parse(payload); - } catch { - // Malformed chunk — skip - } - } - } - } finally { - reader.releaseLock(); - } -} - -async function callAnthropicStream( - apiKey: string, - model: string, - system: string, - messages: Message[], - tools: unknown[], - onDelta: (text: string) => void, - baseUrl?: string, -): Promise<{ content: ContentBlock[]; stopReason: string; inputTokens: number; outputTokens: number }> { - const url = `${baseUrl || 'https://api.anthropic.com'}/v1/messages`; - const response = await fetch(url, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'x-api-key': apiKey, - 'anthropic-version': '2023-06-01', - }, - body: JSON.stringify({ - model, - max_tokens: 4096, - system, - tools, - messages, - stream: true, - }), - }); - - if (!response.ok) { - const errText = await response.text(); - throw new Error(`Anthropic streaming error ${response.status}: ${errText}`); - } - - if (!response.body) throw new Error('No response body for streaming'); - - const content: ContentBlock[] = []; - let currentBlock: StreamBlockState | null = null; - let stopReason = 'end_turn'; - let inputTokens = 0; - let outputTokens = 0; - - for await (const event of parseAnthropicSSE(response.body)) { - const type = event.type as string; - - if (type === 'content_block_start') { - const cb = event.content_block as Record; - currentBlock = { - type: ((cb.type as string) ?? 'text') as 'text' | 'tool_use', - text: (cb.text as string) ?? '', - id: (cb.id as string) ?? '', - name: (cb.name as string) ?? '', - inputJson: '', - }; - } else if (type === 'content_block_delta') { - const delta = event.delta as Record; - if (delta.type === 'text_delta' && currentBlock) { - currentBlock.text += (delta.text as string) ?? ''; - onDelta((delta.text as string) ?? ''); - } else if (delta.type === 'input_json_delta' && currentBlock) { - currentBlock.inputJson += (delta.partial_json as string) ?? ''; - } - } else if (type === 'content_block_stop' && currentBlock) { - if (currentBlock.type === 'text') { - content.push({ type: 'text', text: currentBlock.text }); - } else { - let input: Record = {}; - try { input = JSON.parse(currentBlock.inputJson || '{}'); } catch { /* empty input */ } - content.push({ type: 'tool_use', id: currentBlock.id, name: currentBlock.name, input }); - } - currentBlock = null; - } else if (type === 'message_delta') { - const md = event.delta as Record | undefined; - if (md?.stop_reason) stopReason = md.stop_reason as string; - } else if (type === 'message_start') { - const msg = event.message as Record | undefined; - const usage = msg?.usage as Record | undefined; - if (usage) inputTokens = usage.input_tokens ?? 0; - } else if (type === 'message_delta') { - const usage = event.usage as Record | undefined; - if (usage) outputTokens = usage.output_tokens ?? 0; - } - } - - // Flush any remaining block - if (currentBlock) { - if (currentBlock.type === 'text') { - content.push({ type: 'text', text: currentBlock.text }); - } else { - let input: Record = {}; - try { input = JSON.parse(currentBlock.inputJson || '{}'); } catch { /* empty input */ } - content.push({ type: 'tool_use', id: currentBlock.id, name: currentBlock.name, input }); - } - } - - return { content, stopReason, inputTokens, outputTokens }; -} - // ─── MCP Tool Health Tracker ───────────────────────────────── // Tracks per-tool call outcomes so the heartbeat can surface degradation. @@ -237,101 +97,138 @@ export async function callMcpWithRetry( return `Tool unavailable: ${name}`; } +type AnthropicToolDef = { + name?: unknown; + description?: unknown; + input_schema?: unknown; +}; + +function toolParameters(inputSchema: unknown): Tool['function']['parameters'] { + if (inputSchema && typeof inputSchema === 'object' && !Array.isArray(inputSchema)) { + const schema = inputSchema as Record; + if (schema.type === 'object' && schema.properties && typeof schema.properties === 'object' && !Array.isArray(schema.properties)) { + return { + type: 'object', + properties: schema.properties as Record, + required: Array.isArray(schema.required) ? schema.required.filter((v): v is string => typeof v === 'string') : undefined, + }; + } + } + + return { type: 'object', properties: {} }; +} + +function toProviderTools(tools: unknown[]): Tool[] { + return (tools as AnthropicToolDef[]) + .filter(tool => typeof tool.name === 'string') + .map(tool => ({ + type: 'function' as const, + function: { + name: tool.name as string, + description: typeof tool.description === 'string' ? tool.description : '', + parameters: toolParameters(tool.input_schema), + }, + })); +} + +function buildClaudeProviderFactory(config: ClaudeConfig) { + if (config.edgeEnv) return buildLLMProviderFactory(config.edgeEnv); + return createLLMProviderFactory({ + anthropic: { + apiKey: config.apiKey, + baseUrl: config.baseUrl, + }, + fallbackRules: [], + enableCircuitBreaker: true, + enableRetries: true, + }); +} + +function initialMessages(conversationHistory: Array<{ role: 'user' | 'assistant'; content: string }>, userText: string): LLMMessage[] { + return [ + ...budgetConversationHistory(conversationHistory).map(message => ({ + role: message.role, + content: message.content, + })), + { role: 'user', content: userText }, + ]; +} + +function parseToolArgs(call: ToolCall): Record { + try { + const parsed = JSON.parse(call.function.arguments || '{}'); + return parsed && typeof parsed === 'object' && !Array.isArray(parsed) ? parsed as Record : {}; + } catch { + return {}; + } +} + +async function executeToolCall(config: ClaudeConfig, anthropicConfig: { apiKey: string; model: string; baseUrl: string }, call: ToolCall): Promise { + const args = parseToolArgs(call); + const inProcess = await handleInProcessTool( + config.db, + call.function.name, + args, + config.githubToken, + config.githubRepo, + config.braveApiKey, + config.roundtableDb, + anthropicConfig, + config.memoryBinding, + config.resendApiKeys, + config.edgeEnv, + ); + + if (inProcess !== null) return { id: call.id, output: inProcess }; + + const resolved = resolveMcpTool(call.function.name, config.mcpClient, config.mcpRegistry); + if (resolved) { + return { id: call.id, output: await callMcpWithRetry(resolved.client, resolved.mcpName, args) }; + } + + return { id: call.id, output: `Unknown tool: ${call.function.name}` }; +} + export async function executeClaudeChat( config: ClaudeConfig, userText: string, ): Promise<{ text: string; cost: number }> { config.userQuery = userText; const { systemPrompt, tools, conversationHistory } = await buildContext(config, config.roundtableDb); - const anthropicBase = config.baseUrl || 'https://api.anthropic.com'; - const anthropicConfig = { apiKey: config.apiKey, model: config.model, baseUrl: anthropicBase }; - - const messages: Message[] = [ - ...budgetConversationHistory(conversationHistory), - { role: 'user', content: userText }, - ]; + const anthropicConfig = { apiKey: config.apiKey, model: config.model, baseUrl: config.baseUrl || 'https://api.anthropic.com' }; + const factory = buildClaudeProviderFactory(config); + const providerTools = toProviderTools(tools); + const messages = initialMessages(conversationHistory, userText); let totalCost = 0; const MAX_TOOL_ROUNDS = 10; for (let round = 0; round < MAX_TOOL_ROUNDS; round++) { - const response = await fetch(`${anthropicBase}/v1/messages`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'x-api-key': config.apiKey, - 'anthropic-version': '2023-06-01', - }, - body: JSON.stringify({ - model: config.model, - max_tokens: 4096, - system: systemPrompt, - tools, - messages, - }), + const result = await factory.generateResponse({ + messages: [...messages], + model: config.model, + systemPrompt, + tools: providerTools, + maxTokens: 4096, }); - if (!response.ok) { - const errText = await response.text(); - throw new Error(`Anthropic API error ${response.status}: ${errText}`); - } - - const data = await response.json(); - - const rates = getModelCostRates(config.model); - totalCost += (data.usage.input_tokens * rates.input + data.usage.output_tokens * rates.output) / 1_000_000; - - // Check if we're done (no tool use) - if (data.stop_reason === 'end_turn' || data.stop_reason === 'max_tokens') { - const textBlocks = data.content.filter(b => b.type === 'text'); - return { - text: textBlocks.map(b => b.text ?? '').join('') || '(no response)', - cost: totalCost, - }; - } - - // Handle tool use - if (data.stop_reason === 'tool_use') { - // Add assistant message with all content blocks - messages.push({ role: 'assistant', content: data.content }); - - // Process each tool use - const toolResults: ContentBlock[] = []; - - for (const block of data.content) { - if (block.type !== 'tool_use' || !block.id || !block.name) continue; - - let result: string; - const inProcess = await handleInProcessTool(config.db, block.name, block.input ?? {}, config.githubToken, config.githubRepo, config.braveApiKey, config.roundtableDb, anthropicConfig, config.memoryBinding, config.resendApiKeys); - if (inProcess !== null) { - result = inProcess; - } else { - const resolved = resolveMcpTool(block.name, config.mcpClient, config.mcpRegistry); - if (resolved) { - result = await callMcpWithRetry(resolved.client, resolved.mcpName, block.input ?? {}); - } else { - result = `Unknown tool: ${block.name}`; - } - } - - toolResults.push({ type: 'tool_result', tool_use_id: block.id, content: result } as unknown as ContentBlock); - } + totalCost += result.usage.cost; - messages.push({ role: 'user', content: toolResults }); - continue; + if (!result.toolCalls || result.toolCalls.length === 0) { + return { text: result.message || '(no response)', cost: totalCost }; } - const textBlocks = data.content.filter(b => b.type === 'text'); - return { text: textBlocks.map(b => b.text ?? '').join('') || '(no response)', cost: totalCost }; + const toolResults = await Promise.all(result.toolCalls.map(call => executeToolCall(config, anthropicConfig, call))); + messages.push({ role: 'assistant', content: result.message, toolCalls: result.toolCalls }); + messages.push({ role: 'user', content: '', toolResults }); } return { text: '(reached maximum tool rounds)', cost: totalCost }; } // ─── Streaming variant ─────────────────────────────────────── -// Tool-use rounds are non-streaming (fast); only the final end_turn text streams. -// onDelta is buffered per round and flushed only on end_turn, so intermediate -// "I'll check the dashboard..." text never leaks to the UI. +// Tool-use rounds use the provider factory. The final answer is emitted as one +// delta so intermediate tool-planning text stays invisible to the UI. export async function executeClaudeChatStream( config: ClaudeConfig, @@ -340,66 +237,34 @@ export async function executeClaudeChatStream( ): Promise<{ text: string; cost: number }> { config.userQuery = userText; const { systemPrompt, tools, conversationHistory } = await buildContext(config, config.roundtableDb); - const anthropicBaseStream = config.baseUrl || 'https://api.anthropic.com'; - const anthropicConfigStream = { apiKey: config.apiKey, model: config.model, baseUrl: anthropicBaseStream }; - - const messages: Message[] = [...budgetConversationHistory(conversationHistory), { role: 'user', content: userText }]; + const anthropicConfig = { apiKey: config.apiKey, model: config.model, baseUrl: config.baseUrl || 'https://api.anthropic.com' }; + const factory = buildClaudeProviderFactory(config); + const providerTools = toProviderTools(tools); + const messages = initialMessages(conversationHistory, userText); let totalCost = 0; const MAX_TOOL_ROUNDS = 10; for (let round = 0; round < MAX_TOOL_ROUNDS; round++) { - // Buffer deltas — only flush to onDelta if this round ends with end_turn - const roundBuffer: string[] = []; - - const { content, stopReason, inputTokens, outputTokens } = await callAnthropicStream( - config.apiKey, config.model, systemPrompt, messages, tools, - (delta) => roundBuffer.push(delta), - config.baseUrl, - ); - - const rates = getModelCostRates(config.model); - totalCost += (inputTokens * rates.input + outputTokens * rates.output) / 1_000_000; - - if (stopReason === 'end_turn' || stopReason === 'max_tokens') { - // Flush buffered deltas to caller now that we know it's the final round - for (const delta of roundBuffer) onDelta(delta); - const text = content.filter(b => b.type === 'text').map(b => b.text ?? '').join('') || '(no response)'; - return { text, cost: totalCost }; - } + const result = await factory.generateResponse({ + messages: [...messages], + model: config.model, + systemPrompt, + tools: providerTools, + maxTokens: 4096, + }); - if (stopReason === 'tool_use') { - // Discard roundBuffer — tool-use intermediate text stays invisible - messages.push({ role: 'assistant', content }); - const toolResults: ContentBlock[] = []; - - for (const block of content) { - if (block.type !== 'tool_use' || !block.id || !block.name) continue; - let result: string; - - const inProcess = await handleInProcessTool(config.db, block.name, block.input ?? {}, config.githubToken, config.githubRepo, config.braveApiKey, config.roundtableDb, anthropicConfigStream, config.memoryBinding); - if (inProcess !== null) { - result = inProcess; - } else { - const resolved = resolveMcpTool(block.name, config.mcpClient, config.mcpRegistry); - if (resolved) { - result = await callMcpWithRetry(resolved.client, resolved.mcpName, block.input ?? {}); - } else { - result = `Unknown tool: ${block.name}`; - } - } - - toolResults.push({ type: 'tool_result', tool_use_id: block.id, content: result } as unknown as ContentBlock); - } + totalCost += result.usage.cost; - messages.push({ role: 'user', content: toolResults }); - continue; + if (!result.toolCalls || result.toolCalls.length === 0) { + const text = result.message || '(no response)'; + onDelta(text); + return { text, cost: totalCost }; } - // Unexpected stop reason — flush whatever we have - for (const delta of roundBuffer) onDelta(delta); - const text = content.filter(b => b.type === 'text').map(b => b.text ?? '').join('') || '(no response)'; - return { text, cost: totalCost }; + const toolResults = await Promise.all(result.toolCalls.map(call => executeToolCall(config, anthropicConfig, call))); + messages.push({ role: 'assistant', content: result.message, toolCalls: result.toolCalls }); + messages.push({ role: 'user', content: '', toolResults }); } return { text: '(reached maximum tool rounds)', cost: totalCost }; diff --git a/web/src/kernel/executors/claude.ts b/web/src/kernel/executors/claude.ts index a994a17..109927a 100755 --- a/web/src/kernel/executors/claude.ts +++ b/web/src/kernel/executors/claude.ts @@ -28,6 +28,7 @@ function buildClaudeConfig(env: EdgeEnv, intent: KernelIntent, model: string, re braveApiKey: env.braveApiKey, roundtableDb: env.roundtableDb, memoryBinding: env.memoryBinding, + edgeEnv: env, }; } diff --git a/web/src/kernel/executors/workers-ai.ts b/web/src/kernel/executors/workers-ai.ts index f696033..59db97c 100755 --- a/web/src/kernel/executors/workers-ai.ts +++ b/web/src/kernel/executors/workers-ai.ts @@ -53,7 +53,11 @@ export async function executeGptOss( githubToken: env.githubToken, githubRepo: env.githubRepo, braveApiKey: env.braveApiKey, + roundtableDb: env.roundtableDb, + memoryBinding: env.memoryBinding, + resendApiKeys: { resendApiKey: env.resendApiKey, resendApiKeyPersonal: env.resendApiKeyPersonal }, userQuery: intent.raw, + edgeEnv: env, }; const { systemPrompt, tools: anthropicTools } = await buildContext(pseudoConfig); // toOpenAiTools output matches factory Tool shape exactly @@ -105,6 +109,7 @@ export async function executeGptOss( env.githubToken, env.githubRepo, env.braveApiKey, undefined, undefined, env.memoryBinding, { resendApiKey: env.resendApiKey, resendApiKeyPersonal: env.resendApiKeyPersonal }, + env, ); if (inProcess !== null) { diff --git a/web/tests/claude-provider.test.ts b/web/tests/claude-provider.test.ts new file mode 100644 index 0000000..cc6f0dc --- /dev/null +++ b/web/tests/claude-provider.test.ts @@ -0,0 +1,155 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import type { ClaudeConfig } from '../src/claude-tools/index.js'; + +const mocks = vi.hoisted(() => ({ + buildContext: vi.fn(), + handleInProcessTool: vi.fn(), + resolveMcpTool: vi.fn(), + buildLLMProviderFactory: vi.fn(), + generateResponse: vi.fn(), +})); + +vi.mock('../src/kernel/memory/index.js', () => ({ + budgetConversationHistory: vi.fn((history: Array<{ role: 'user' | 'assistant'; content: string }>) => history), +})); + +vi.mock('../src/kernel/provider-factory.js', () => ({ + buildLLMProviderFactory: mocks.buildLLMProviderFactory, +})); + +vi.mock('../src/claude-tools/index.js', () => ({ + buildContext: mocks.buildContext, + handleInProcessTool: mocks.handleInProcessTool, + resolveMcpTool: mocks.resolveMcpTool, +})); + +const { executeClaudeChat, executeClaudeChatStream } = await import('../src/claude.js'); + +function makeConfig(): ClaudeConfig { + return { + apiKey: 'anthropic-test-key', + model: 'claude-sonnet-4-20250514', + baseUrl: 'https://anthropic.test', + mcpClient: {} as ClaudeConfig['mcpClient'], + db: {} as D1Database, + channel: 'web', + conversationId: 'thread-1', + githubToken: 'gh-test', + githubRepo: 'Stackbilt-dev/aegis-oss', + braveApiKey: 'brave-test', + memoryBinding: {} as ClaudeConfig['memoryBinding'], + resendApiKeys: { resendApiKey: 'resend-main', resendApiKeyPersonal: 'resend-personal' }, + edgeEnv: { marker: 'edge-env' } as unknown as ClaudeConfig['edgeEnv'], + }; +} + +function providerResponse(message: string, cost: number, toolCalls?: Array<{ id: string; function: { name: string; arguments: string } }>) { + return { + message, + usage: { cost, inputTokens: 1, outputTokens: 1, totalTokens: 2 }, + model: 'claude-sonnet-4-20250514', + provider: 'anthropic', + responseTime: 10, + toolCalls: toolCalls?.map(call => ({ ...call, type: 'function' as const })), + }; +} + +describe('Claude provider executor', () => { + beforeEach(() => { + vi.clearAllMocks(); + mocks.buildContext.mockResolvedValue({ + systemPrompt: 'system prompt', + tools: [ + { + name: 'lookup_cc_session', + description: 'Look up session digests', + input_schema: { + type: 'object', + properties: { days: { type: 'number' } }, + required: ['days'], + }, + }, + ], + conversationHistory: [{ role: 'assistant', content: 'prior answer' }], + }); + mocks.buildLLMProviderFactory.mockReturnValue({ generateResponse: mocks.generateResponse }); + mocks.handleInProcessTool.mockResolvedValue('tool output'); + mocks.resolveMcpTool.mockReturnValue(null); + }); + + it('runs Claude tool rounds through the LLM provider factory', async () => { + const config = makeConfig(); + mocks.generateResponse + .mockResolvedValueOnce(providerResponse('', 0.01, [ + { id: 'tool-1', function: { name: 'lookup_cc_session', arguments: '{"days":1}' } }, + ])) + .mockResolvedValueOnce(providerResponse('final answer', 0.02)); + + const result = await executeClaudeChat(config, 'what happened?'); + + expect(result).toEqual({ text: 'final answer', cost: 0.03 }); + expect(mocks.buildLLMProviderFactory).toHaveBeenCalledWith(config.edgeEnv); + expect(mocks.generateResponse).toHaveBeenCalledTimes(2); + expect(mocks.generateResponse).toHaveBeenNthCalledWith(1, expect.objectContaining({ + model: 'claude-sonnet-4-20250514', + systemPrompt: 'system prompt', + maxTokens: 4096, + tools: [ + { + type: 'function', + function: { + name: 'lookup_cc_session', + description: 'Look up session digests', + parameters: { + type: 'object', + properties: { days: { type: 'number' } }, + required: ['days'], + }, + }, + }, + ], + messages: [ + { role: 'assistant', content: 'prior answer' }, + { role: 'user', content: 'what happened?' }, + ], + })); + expect(mocks.handleInProcessTool).toHaveBeenCalledWith( + config.db, + 'lookup_cc_session', + { days: 1 }, + config.githubToken, + config.githubRepo, + config.braveApiKey, + config.roundtableDb, + { apiKey: config.apiKey, model: config.model, baseUrl: config.baseUrl }, + config.memoryBinding, + config.resendApiKeys, + config.edgeEnv, + ); + expect(mocks.generateResponse).toHaveBeenNthCalledWith(2, expect.objectContaining({ + messages: expect.arrayContaining([ + expect.objectContaining({ + role: 'assistant', + toolCalls: [ + { + id: 'tool-1', + type: 'function', + function: { name: 'lookup_cc_session', arguments: '{"days":1}' }, + }, + ], + }), + { role: 'user', content: '', toolResults: [{ id: 'tool-1', output: 'tool output' }] }, + ]), + })); + }); + + it('emits the provider final response through the streaming callback', async () => { + const deltas: string[] = []; + mocks.generateResponse.mockResolvedValueOnce(providerResponse('streamed final', 0.04)); + + const result = await executeClaudeChatStream(makeConfig(), 'stream this', delta => deltas.push(delta)); + + expect(result).toEqual({ text: 'streamed final', cost: 0.04 }); + expect(deltas).toEqual(['streamed final']); + }); +});