diff --git a/agents/e2e/base2-free-summary-format.e2e.test.ts b/agents/e2e/base2-free-summary-format.e2e.test.ts index 51df280b89..00f0a7b0db 100644 --- a/agents/e2e/base2-free-summary-format.e2e.test.ts +++ b/agents/e2e/base2-free-summary-format.e2e.test.ts @@ -324,6 +324,7 @@ describe('Base2-Free Summary Format Compliance', () => { }) const runStateWithMessages = withMessageHistory({ runState: { + traceSessionId: 'test-trace-session', sessionState, output: { type: 'error', message: '' }, }, @@ -461,6 +462,7 @@ describe('Base2-Free Summary Format Compliance', () => { }) const runStateWithMessages = withMessageHistory({ runState: { + traceSessionId: 'test-trace-session', sessionState, output: { type: 'error', message: '' }, }, diff --git a/agents/e2e/context-pruner.e2e.test.ts b/agents/e2e/context-pruner.e2e.test.ts index 26a1f4ad07..0ac2c5ec2f 100644 --- a/agents/e2e/context-pruner.e2e.test.ts +++ b/agents/e2e/context-pruner.e2e.test.ts @@ -151,7 +151,11 @@ Do not do anything else. Just spawn context-pruner and then report the result.`, // Create initial session state with the large message history const sessionState = await initialSessionState({}) const runStateWithMessages = withMessageHistory({ - runState: { sessionState, output: { type: 'error', message: '' } }, + runState: { + traceSessionId: 'test-trace-session', + sessionState, + output: { type: 'error', message: '' }, + }, messages: initialMessages, }) @@ -277,7 +281,11 @@ Do not do anything else. Just spawn context-pruner and then report the result.`, const sessionState = await initialSessionState({}) const runStateWithMessages = withMessageHistory({ - runState: { sessionState, output: { type: 'error', message: '' } }, + runState: { + traceSessionId: 'test-trace-session', + sessionState, + output: { type: 'error', message: '' }, + }, messages: initialMessages, }) diff --git a/agents/e2e/context-pruning-threshold.e2e.test.ts b/agents/e2e/context-pruning-threshold.e2e.test.ts index e62d213461..fbb2988ce3 100644 --- a/agents/e2e/context-pruning-threshold.e2e.test.ts +++ b/agents/e2e/context-pruning-threshold.e2e.test.ts @@ -324,7 +324,11 @@ describe('Context Pruning Threshold E2E', () => { const sessionState = await initialSessionState({}) const runStateWithMessages = withMessageHistory({ - runState: { sessionState, output: { type: 'error', message: '' } }, + runState: { + traceSessionId: 'test-trace-session', + sessionState, + output: { type: 'error', message: '' }, + }, messages, }) @@ -398,7 +402,11 @@ describe('Context Pruning Threshold E2E', () => { const sessionState = await initialSessionState({}) const runStateWithMessages = withMessageHistory({ - runState: { sessionState, output: { type: 'error', message: '' } }, + runState: { + traceSessionId: 'test-trace-session', + sessionState, + output: { type: 'error', message: '' }, + }, messages, }) @@ -494,6 +502,7 @@ describe('Context Pruning Threshold E2E', () => { const sessionStateCal = await initialSessionState({}) const runStateCal = withMessageHistory({ runState: { + traceSessionId: 'test-trace-session', sessionState: sessionStateCal, output: { type: 'error', message: '' }, }, @@ -544,7 +553,11 @@ describe('Context Pruning Threshold E2E', () => { // ========================================================================= const sessionState = await initialSessionState({}) const runStateWithMessages = withMessageHistory({ - runState: { sessionState, output: { type: 'error', message: '' } }, + runState: { + traceSessionId: 'test-trace-session', + sessionState, + output: { type: 'error', message: '' }, + }, messages, }) diff --git a/cli/src/hooks/helpers/__tests__/send-message.test.ts b/cli/src/hooks/helpers/__tests__/send-message.test.ts index 87430e7765..e40659d825 100644 --- a/cli/src/hooks/helpers/__tests__/send-message.test.ts +++ b/cli/src/hooks/helpers/__tests__/send-message.test.ts @@ -28,12 +28,15 @@ ensureEnv() const { useChatStore } = await import('../../../state/chat-store') const { createStreamController } = await import('../../stream-state') -const { setupStreamingContext, handleRunCompletion, handleRunError, finalizeQueueState, resetEarlyReturnState } = await import( - '../send-message' -) -const { createBatchedMessageUpdater } = await import( - '../../../utils/message-updater' -) +const { + setupStreamingContext, + handleRunCompletion, + handleRunError, + finalizeQueueState, + resetEarlyReturnState, +} = await import('../send-message') +const { createBatchedMessageUpdater } = + await import('../../../utils/message-updater') import { createPaymentRequiredError } from '@codebuff/sdk' import type { RunState } from '@codebuff/sdk' @@ -351,6 +354,7 @@ describe('handleRunCompletion', () => { let updateChainInProgressCalled = false const runState = { + traceSessionId: 'trace-test', sessionState: undefined, output: { type: 'lastMessage' as const, value: [] }, } @@ -363,10 +367,21 @@ describe('handleRunCompletion', () => { updater, aiMessageId: 'ai-1', wasAbortedByUser: true, - setStreamStatus: (status: StreamStatus) => { setStreamStatusCalled = true; streamStatus = status }, - setCanProcessQueue: (can: boolean) => { setCanProcessQueueCalled = true; canProcessQueue = can }, - updateChainInProgress: (value: boolean) => { updateChainInProgressCalled = true; chainInProgress = value }, - setHasReceivedPlanResponse: (value: boolean) => { hasReceivedPlanResponse = value }, + setStreamStatus: (status: StreamStatus) => { + setStreamStatusCalled = true + streamStatus = status + }, + setCanProcessQueue: (can: boolean) => { + setCanProcessQueueCalled = true + canProcessQueue = can + }, + updateChainInProgress: (value: boolean) => { + updateChainInProgressCalled = true + chainInProgress = value + }, + setHasReceivedPlanResponse: (value: boolean) => { + hasReceivedPlanResponse = value + }, isProcessingQueueRef, isQueuePausedRef, }) @@ -388,10 +403,16 @@ describe('handleRunCompletion', () => { let hasReceivedPlanResponse = false const runState = { + traceSessionId: 'trace-test', sessionState: undefined, output: { type: 'lastMessage' as const, - value: [{ type: 'text' as const, text: 'Server response that should be ignored' }], + value: [ + { + type: 'text' as const, + text: 'Server response that should be ignored', + }, + ], }, } @@ -406,7 +427,9 @@ describe('handleRunCompletion', () => { setStreamStatus: () => {}, setCanProcessQueue: () => {}, updateChainInProgress: () => {}, - setHasReceivedPlanResponse: (value: boolean) => { hasReceivedPlanResponse = value }, + setHasReceivedPlanResponse: (value: boolean) => { + hasReceivedPlanResponse = value + }, }) // Should NOT set plan response (abort path returns early before processing output) @@ -428,6 +451,7 @@ describe('handleRunCompletion', () => { let canProcessQueueCalled = false const runState = { + traceSessionId: 'trace-test', sessionState: undefined, output: { type: 'lastMessage' as const, value: [] }, } @@ -441,10 +465,14 @@ describe('handleRunCompletion', () => { aiMessageId: 'ai-1', wasAbortedByUser: true, setStreamStatus: () => {}, - setCanProcessQueue: () => { canProcessQueueCalled = true }, + setCanProcessQueue: () => { + canProcessQueueCalled = true + }, updateChainInProgress: () => {}, setHasReceivedPlanResponse: () => {}, - resumeQueue: () => { resumeQueueCalled = true }, + resumeQueue: () => { + resumeQueueCalled = true + }, }) // Neither should be called - abort handler already handled cleanup @@ -462,9 +490,15 @@ describe('finalizeQueueState', () => { const isProcessingQueueRef = { current: true } finalizeQueueState({ - setStreamStatus: (status) => { streamStatus = status }, - setCanProcessQueue: (can) => { canProcessQueue = can }, - updateChainInProgress: (value) => { chainInProgress = value }, + setStreamStatus: (status) => { + streamStatus = status + }, + setCanProcessQueue: (can) => { + canProcessQueue = can + }, + updateChainInProgress: (value) => { + chainInProgress = value + }, isProcessingQueueRef, }) @@ -481,10 +515,18 @@ describe('finalizeQueueState', () => { let chainInProgress = true finalizeQueueState({ - setStreamStatus: (status) => { streamStatus = status }, - setCanProcessQueue: () => { canProcessQueueCalled = true }, - updateChainInProgress: (value) => { chainInProgress = value }, - resumeQueue: () => { resumeQueueCalled = true }, + setStreamStatus: (status) => { + streamStatus = status + }, + setCanProcessQueue: () => { + canProcessQueueCalled = true + }, + updateChainInProgress: (value) => { + chainInProgress = value + }, + resumeQueue: () => { + resumeQueueCalled = true + }, }) expect(streamStatus).toBe('idle') @@ -499,7 +541,9 @@ describe('finalizeQueueState', () => { finalizeQueueState({ setStreamStatus: () => {}, - setCanProcessQueue: (can) => { canProcessQueue = can }, + setCanProcessQueue: (can) => { + canProcessQueue = can + }, updateChainInProgress: () => {}, isQueuePausedRef, }) @@ -736,12 +780,12 @@ describe('handleRunError', () => { // Create an error that matches the real AI_APICallError structure const contextLengthError = Object.assign( new Error( - "This endpoint's maximum context length is 200000 tokens. However, you requested about 201209 tokens (158536 of text input, 10673 of tool input, 32000 in the output). Please reduce the length of either one, or use the \"middle-out\" transform to compress your prompt automatically." + 'This endpoint\'s maximum context length is 200000 tokens. However, you requested about 201209 tokens (158536 of text input, 10673 of tool input, 32000 in the output). Please reduce the length of either one, or use the "middle-out" transform to compress your prompt automatically.', ), { name: 'AI_APICallError', statusCode: 400, - } + }, ) let streamStatus = 'streaming' as StreamStatus @@ -774,10 +818,14 @@ describe('handleRunError', () => { expect(aiMessage!.content).toBe('Partial streamed content before error') // Blocks should be preserved - expect(aiMessage!.blocks).toEqual([{ type: 'text', content: 'some block content' }]) + expect(aiMessage!.blocks).toEqual([ + { type: 'text', content: 'some block content' }, + ]) // Error should be stored in userError (displayed in UserErrorBanner) - expect(aiMessage!.userError).toContain('maximum context length is 200000 tokens') + expect(aiMessage!.userError).toContain( + 'maximum context length is 200000 tokens', + ) expect(aiMessage!.userError).toContain('201209 tokens') // Message should be marked complete @@ -884,13 +932,19 @@ describe('CLI-level race condition: abort run A, attempt run B before A resolves // --- Shared mutable state (simulates React refs and state in the CLI) --- let streamStatus: StreamStatus = 'idle' let canProcessQueue = false - let chainInProgress = true // Set true at start of sendMessage + let chainInProgress = true // Set true at start of sendMessage const isProcessingQueueRef = { current: false } const isQueuePausedRef = { current: false } - const setStreamStatus = (status: StreamStatus) => { streamStatus = status } - const setCanProcessQueue = (can: boolean) => { canProcessQueue = can } - const updateChainInProgress = (value: boolean) => { chainInProgress = value } + const setStreamStatus = (status: StreamStatus) => { + streamStatus = status + } + const setCanProcessQueue = (can: boolean) => { + canProcessQueue = can + } + const updateChainInProgress = (value: boolean) => { + chainInProgress = value + } // --- PHASE 1: Start run A (setupStreamingContext) --- let messagesA = createBaseMessages() @@ -898,20 +952,23 @@ describe('CLI-level race condition: abort run A, attempt run B before A resolves const timerControllerA = createMockTimerController() const abortControllerRefA = { current: null as AbortController | null } - const { updater: updaterA, abortController: abortControllerA } = setupStreamingContext({ - aiMessageId: 'ai-1', - timerController: timerControllerA, - setMessages: (fn: any) => { messagesA = fn(messagesA) }, - streamRefs: streamRefsA, - abortControllerRef: abortControllerRefA, - setStreamStatus, - setCanProcessQueue, - isQueuePausedRef, - isProcessingQueueRef, - updateChainInProgress, - setIsRetrying: () => {}, - setStreamingAgents: () => {}, - }) + const { updater: updaterA, abortController: abortControllerA } = + setupStreamingContext({ + aiMessageId: 'ai-1', + timerController: timerControllerA, + setMessages: (fn: any) => { + messagesA = fn(messagesA) + }, + streamRefs: streamRefsA, + abortControllerRef: abortControllerRefA, + setStreamStatus, + setCanProcessQueue, + isQueuePausedRef, + isProcessingQueueRef, + updateChainInProgress, + setIsRetrying: () => {}, + setStreamingAgents: () => {}, + }) // Simulate streaming has started streamStatus = 'streaming' @@ -964,6 +1021,7 @@ describe('CLI-level race condition: abort run A, attempt run B before A resolves let updateChainInProgressCallCount = 0 const runState: RunState = { + traceSessionId: 'trace-test', sessionState: {} as any, output: { type: 'lastMessage' as const, value: [] }, } @@ -976,9 +1034,15 @@ describe('CLI-level race condition: abort run A, attempt run B before A resolves updater, aiMessageId: 'ai-1', wasAbortedByUser: true, - setStreamStatus: () => { setStreamStatusCallCount++ }, - setCanProcessQueue: (can: boolean) => { canProcessQueue = can }, - updateChainInProgress: () => { updateChainInProgressCallCount++ }, + setStreamStatus: () => { + setStreamStatusCallCount++ + }, + setCanProcessQueue: (can: boolean) => { + canProcessQueue = can + }, + updateChainInProgress: () => { + updateChainInProgressCallCount++ + }, setHasReceivedPlanResponse: () => {}, isProcessingQueueRef, isQueuePausedRef, @@ -1015,14 +1079,22 @@ describe('CLI-level race condition: abort run A, attempt run B before A resolves const { abortController: abortA } = setupStreamingContext({ aiMessageId: 'ai-run-a', timerController: timerA, - setMessages: (fn: any) => { messagesA = fn(messagesA) }, + setMessages: (fn: any) => { + messagesA = fn(messagesA) + }, streamRefs: sharedStreamRefs, abortControllerRef: abortRefA, - setStreamStatus: (status: StreamStatus) => { streamStatus = status }, - setCanProcessQueue: (can: boolean) => { canProcessQueue = can }, + setStreamStatus: (status: StreamStatus) => { + streamStatus = status + }, + setCanProcessQueue: (can: boolean) => { + canProcessQueue = can + }, isQueuePausedRef, isProcessingQueueRef, - updateChainInProgress: (value: boolean) => { chainInProgress = value }, + updateChainInProgress: (value: boolean) => { + chainInProgress = value + }, setIsRetrying: () => {}, setStreamingAgents: () => {}, }) @@ -1090,9 +1162,15 @@ describe('CLI-level race condition: abort run A, attempt run B before A resolves timerController: createMockTimerController(), updater: createBatchedMessageUpdater('ai-1', () => {}), setIsRetrying: () => {}, - setStreamStatus: (status: StreamStatus) => { streamStatus = status }, - setCanProcessQueue: (can: boolean) => { canProcessQueue = can }, - updateChainInProgress: (value: boolean) => { chainInProgress = value }, + setStreamStatus: (status: StreamStatus) => { + streamStatus = status + }, + setCanProcessQueue: (can: boolean) => { + canProcessQueue = can + }, + updateChainInProgress: (value: boolean) => { + chainInProgress = value + }, isProcessingQueueRef, isQueuePausedRef, }) @@ -1121,9 +1199,15 @@ describe('CLI-level race condition: abort run A, attempt run B before A resolves timerController: createMockTimerController(), updater: createBatchedMessageUpdater('ai-1', (fn: any) => {}), setIsRetrying: () => {}, - setStreamStatus: (status: StreamStatus) => { streamStatus = status }, - setCanProcessQueue: (can: boolean) => { canProcessQueue = can }, - updateChainInProgress: (value: boolean) => { chainInProgress = value }, + setStreamStatus: (status: StreamStatus) => { + streamStatus = status + }, + setCanProcessQueue: (can: boolean) => { + canProcessQueue = can + }, + updateChainInProgress: (value: boolean) => { + chainInProgress = value + }, isProcessingQueueRef, isQueuePausedRef, }) @@ -1148,9 +1232,15 @@ describe('CLI-level race condition: abort run A, attempt run B before A resolves const isQueuePausedRef = { current: false } let previousRunState: RunState | null = null - const setStreamStatus = (status: StreamStatus) => { streamStatus = status } - const setCanProcessQueue = (can: boolean) => { canProcessQueue = can } - const updateChainInProgress = (value: boolean) => { chainInProgress = value } + const setStreamStatus = (status: StreamStatus) => { + streamStatus = status + } + const setCanProcessQueue = (can: boolean) => { + canProcessQueue = can + } + const updateChainInProgress = (value: boolean) => { + chainInProgress = value + } // CRITICAL: Use a single shared streamRefs instance, just like production. // In production, streamRefsRef is created once via useRef and reused. @@ -1161,20 +1251,23 @@ describe('CLI-level race condition: abort run A, attempt run B before A resolves const timerA = createMockTimerController() const abortRefA = { current: null as AbortController | null } - const { updater: updaterA, abortController: abortA } = setupStreamingContext({ - aiMessageId: 'ai-run-a', - timerController: timerA, - setMessages: (fn: any) => { messagesA = fn(messagesA) }, - streamRefs: sharedStreamRefs, - abortControllerRef: abortRefA, - setStreamStatus, - setCanProcessQueue, - isQueuePausedRef, - isProcessingQueueRef, - updateChainInProgress, - setIsRetrying: () => {}, - setStreamingAgents: () => {}, - }) + const { updater: updaterA, abortController: abortA } = + setupStreamingContext({ + aiMessageId: 'ai-run-a', + timerController: timerA, + setMessages: (fn: any) => { + messagesA = fn(messagesA) + }, + streamRefs: sharedStreamRefs, + abortControllerRef: abortRefA, + setStreamStatus, + setCanProcessQueue, + isQueuePausedRef, + isProcessingQueueRef, + updateChainInProgress, + setIsRetrying: () => {}, + setStreamingAgents: () => {}, + }) streamStatus = 'streaming' @@ -1189,27 +1282,36 @@ describe('CLI-level race condition: abort run A, attempt run B before A resolves canProcessQueue = false let messagesB: ChatMessage[] = [ - { id: 'ai-run-b', variant: 'ai', content: '', blocks: [], timestamp: 'now' }, + { + id: 'ai-run-b', + variant: 'ai', + content: '', + blocks: [], + timestamp: 'now', + }, ] const timerB = createMockTimerController() const abortRefB = { current: null as AbortController | null } // Run B's setupStreamingContext calls sharedStreamRefs.reset(), // which clears wasAbortedByUser. This is the key race condition. - const { updater: updaterB, abortController: abortB } = setupStreamingContext({ - aiMessageId: 'ai-run-b', - timerController: timerB, - setMessages: (fn: any) => { messagesB = fn(messagesB) }, - streamRefs: sharedStreamRefs, - abortControllerRef: abortRefB, - setStreamStatus, - setCanProcessQueue, - isQueuePausedRef, - isProcessingQueueRef, - updateChainInProgress, - setIsRetrying: () => {}, - setStreamingAgents: () => {}, - }) + const { updater: updaterB, abortController: abortB } = + setupStreamingContext({ + aiMessageId: 'ai-run-b', + timerController: timerB, + setMessages: (fn: any) => { + messagesB = fn(messagesB) + }, + streamRefs: sharedStreamRefs, + abortControllerRef: abortRefB, + setStreamStatus, + setCanProcessQueue, + isQueuePausedRef, + isProcessingQueueRef, + updateChainInProgress, + setIsRetrying: () => {}, + setStreamingAgents: () => {}, + }) // After B starts, shared streamRefs.wasAbortedByUser is reset to false. // This is why we use per-run abortController.signal.aborted instead. @@ -1219,6 +1321,7 @@ describe('CLI-level race condition: abort run A, attempt run B before A resolves // handleRunCompletion uses the per-run wasAbortedByUser boolean (from abortA.signal.aborted), // NOT the shared streamRefs, so it correctly knows A was aborted. const runStateA: RunState = { + traceSessionId: 'trace-test-a', sessionState: { id: 'session-abc', messages: [ @@ -1252,6 +1355,7 @@ describe('CLI-level race condition: abort run A, attempt run B before A resolves // Simulate run B completing normally const runStateB: RunState = { + traceSessionId: 'trace-test-b', sessionState: { id: 'session-abc', messages: [ @@ -1261,7 +1365,10 @@ describe('CLI-level race condition: abort run A, attempt run B before A resolves { role: 'assistant', content: 'full response to second message' }, ], } as any, - output: { type: 'lastMessage' as const, value: [{ type: 'text' as const, text: 'full response' }] }, + output: { + type: 'lastMessage' as const, + value: [{ type: 'text' as const, text: 'full response' }], + }, } previousRunState = runStateB @@ -1309,7 +1416,9 @@ describe('resetEarlyReturnState', () => { let chainInProgress = true resetEarlyReturnState({ - updateChainInProgress: (value) => { chainInProgress = value }, + updateChainInProgress: (value) => { + chainInProgress = value + }, setCanProcessQueue: () => {}, }) @@ -1322,7 +1431,9 @@ describe('resetEarlyReturnState', () => { resetEarlyReturnState({ updateChainInProgress: () => {}, - setCanProcessQueue: (can) => { canProcessQueue = can }, + setCanProcessQueue: (can) => { + canProcessQueue = can + }, isQueuePausedRef, }) @@ -1335,7 +1446,9 @@ describe('resetEarlyReturnState', () => { resetEarlyReturnState({ updateChainInProgress: () => {}, - setCanProcessQueue: (can) => { canProcessQueue = can }, + setCanProcessQueue: (can) => { + canProcessQueue = can + }, isQueuePausedRef, }) @@ -1369,7 +1482,9 @@ describe('resetEarlyReturnState', () => { resetEarlyReturnState({ updateChainInProgress: () => {}, - setCanProcessQueue: (can) => { canProcessQueue = can }, + setCanProcessQueue: (can) => { + canProcessQueue = can + }, // No isQueuePausedRef - should default to !undefined = true }) @@ -1385,8 +1500,12 @@ describe('resetEarlyReturnState', () => { const isQueuePausedRef = { current: false } resetEarlyReturnState({ - updateChainInProgress: (value) => { chainInProgress = value }, - setCanProcessQueue: (can) => { canProcessQueue = can }, + updateChainInProgress: (value) => { + chainInProgress = value + }, + setCanProcessQueue: (can) => { + canProcessQueue = can + }, isProcessingQueueRef, isQueuePausedRef, }) @@ -1403,8 +1522,12 @@ describe('resetEarlyReturnState', () => { const isQueuePausedRef = { current: true } resetEarlyReturnState({ - updateChainInProgress: (value) => { chainInProgress = value }, - setCanProcessQueue: (can) => { canProcessQueue = can }, + updateChainInProgress: (value) => { + chainInProgress = value + }, + setCanProcessQueue: (can) => { + canProcessQueue = can + }, isProcessingQueueRef, isQueuePausedRef, }) @@ -1424,8 +1547,12 @@ describe('resetEarlyReturnState', () => { // Simulating what happens after catching validation exception resetEarlyReturnState({ - updateChainInProgress: (value) => { chainInProgress = value }, - setCanProcessQueue: (can) => { canProcessQueue = can }, + updateChainInProgress: (value) => { + chainInProgress = value + }, + setCanProcessQueue: (can) => { + canProcessQueue = can + }, isProcessingQueueRef, isQueuePausedRef, }) @@ -1442,7 +1569,9 @@ describe('resetEarlyReturnState', () => { resetEarlyReturnState({ updateChainInProgress: () => {}, - setCanProcessQueue: (can) => { canProcessQueue = can }, + setCanProcessQueue: (can) => { + canProcessQueue = can + }, isProcessingQueueRef, isQueuePausedRef, }) @@ -1464,8 +1593,12 @@ describe('resetEarlyReturnState', () => { // After exception, reset is called resetEarlyReturnState({ - updateChainInProgress: (value) => { chainInProgress = value }, - setCanProcessQueue: (can) => { canProcessQueue = can }, + updateChainInProgress: (value) => { + chainInProgress = value + }, + setCanProcessQueue: (can) => { + canProcessQueue = can + }, isProcessingQueueRef, isQueuePausedRef, }) @@ -1484,8 +1617,12 @@ describe('resetEarlyReturnState', () => { const isQueuePausedRef = { current: false } resetEarlyReturnState({ - updateChainInProgress: (value) => { chainInProgress = value }, - setCanProcessQueue: (can) => { canProcessQueue = can }, + updateChainInProgress: (value) => { + chainInProgress = value + }, + setCanProcessQueue: (can) => { + canProcessQueue = can + }, isProcessingQueueRef, isQueuePausedRef, }) @@ -1504,8 +1641,12 @@ describe('resetEarlyReturnState', () => { const isQueuePausedRef = { current: false } resetEarlyReturnState({ - updateChainInProgress: (value) => { chainInProgress = value }, - setCanProcessQueue: (can) => { canProcessQueue = can }, + updateChainInProgress: (value) => { + chainInProgress = value + }, + setCanProcessQueue: (can) => { + canProcessQueue = can + }, isProcessingQueueRef, isQueuePausedRef, }) @@ -1525,8 +1666,12 @@ describe('resetEarlyReturnState', () => { const isQueuePausedRef = { current: true } // User explicitly paused resetEarlyReturnState({ - updateChainInProgress: (value) => { chainInProgress = value }, - setCanProcessQueue: (can) => { canProcessQueue = can }, + updateChainInProgress: (value) => { + chainInProgress = value + }, + setCanProcessQueue: (can) => { + canProcessQueue = can + }, isProcessingQueueRef, isQueuePausedRef, }) @@ -1551,13 +1696,15 @@ describe('freebuff gate errors', () => { return updater } - const baseMessage = (): ChatMessage[] => [{ - id: 'ai-1', - variant: 'ai', - content: '', - blocks: [], - timestamp: 'now', - }] + const baseMessage = (): ChatMessage[] => [ + { + id: 'ai-1', + variant: 'ai', + content: '', + blocks: [], + timestamp: 'now', + }, + ] const gateError = (kind: string, statusCode: number) => ({ error: kind, @@ -1660,6 +1807,7 @@ describe('freebuff gate errors', () => { const messages = baseMessage() const updater = makeUpdater(messages) const runState: RunState = { + traceSessionId: 'trace-test', sessionState: undefined as any, output: { type: 'error', diff --git a/cli/src/utils/run-state-storage.ts b/cli/src/utils/run-state-storage.ts index 3591538089..8ca9168127 100644 --- a/cli/src/utils/run-state-storage.ts +++ b/cli/src/utils/run-state-storage.ts @@ -1,7 +1,12 @@ import * as fs from 'fs' import path from 'path' +import { randomUUID } from 'node:crypto' -import { getCurrentChatDir, getMostRecentChatDir, getProjectDataDir } from '../project-files' +import { + getCurrentChatDir, + getMostRecentChatDir, + getProjectDataDir, +} from '../project-files' import { logger } from './logger' import type { ChatMessage, ContentBlock } from '../types/chat' @@ -21,9 +26,9 @@ type SavedChatState = { */ function extractToggleIds(blocks: ContentBlock[] | undefined): string[] { if (!blocks) return [] - + const ids: string[] = [] - + for (const block of blocks) { if (block.type === 'agent') { ids.push(block.agentId) @@ -33,7 +38,7 @@ function extractToggleIds(blocks: ContentBlock[] | undefined): string[] { ids.push(block.toolCallId) } } - + return ids } @@ -42,11 +47,11 @@ function extractToggleIds(blocks: ContentBlock[] | undefined): string[] { */ export function getAllToggleIdsFromMessages(messages: ChatMessage[]): string[] { const ids: string[] = [] - + for (const message of messages) { ids.push(...extractToggleIds(message.blocks)) } - + return ids } @@ -69,11 +74,14 @@ export function getChatMessagesPath(): string { /** * Save both the RunState and ChatMessage[] to disk */ -export function saveChatState(runState: RunState, messages: ChatMessage[]): void { +export function saveChatState( + runState: RunState, + messages: ChatMessage[], +): void { try { const runStatePath = getRunStatePath() const messagesPath = getChatMessagesPath() - + fs.writeFileSync(runStatePath, JSON.stringify(runState, null, 2)) fs.writeFileSync(messagesPath, JSON.stringify(messages, null, 2)) } catch (error) { @@ -92,14 +100,19 @@ export function saveChatState(runState: RunState, messages: ChatMessage[]): void * recently modified chat directory is used. * Returns null if no previous chat exists or files can't be parsed. */ -export function loadMostRecentChatState(chatId?: string): SavedChatState | null { +export function loadMostRecentChatState( + chatId?: string, +): SavedChatState | null { try { let chatDir: string | null = null if (chatId && chatId.trim().length > 0) { const baseDir = path.join(getProjectDataDir(), 'chats') const candidateDir = path.join(baseDir, chatId.trim()) - if (fs.existsSync(candidateDir) && fs.statSync(candidateDir).isDirectory()) { + if ( + fs.existsSync(candidateDir) && + fs.statSync(candidateDir).isDirectory() + ) { chatDir = candidateDir } else { logger.debug( @@ -133,12 +146,18 @@ export function loadMostRecentChatState(chatId?: string): SavedChatState | null const messagesContent = fs.readFileSync(messagesPath, 'utf8') const runState = JSON.parse(runStateContent) as RunState + runState.traceSessionId ??= randomUUID() const messages = JSON.parse(messagesContent) as ChatMessage[] const resolvedChatId = path.basename(chatDir) logger.info( - { runStatePath, messagesPath, messageCount: messages.length, chatId: resolvedChatId }, + { + runStatePath, + messagesPath, + messageCount: messages.length, + chatId: resolvedChatId, + }, 'Loaded chat state from chat directory', ) @@ -161,18 +180,15 @@ export function clearChatState(): void { try { const runStatePath = getRunStatePath() const messagesPath = getChatMessagesPath() - + if (fs.existsSync(runStatePath)) { fs.unlinkSync(runStatePath) } if (fs.existsSync(messagesPath)) { fs.unlinkSync(messagesPath) } - - logger.debug( - { runStatePath, messagesPath }, - 'Cleared chat state files' - ) + + logger.debug({ runStatePath, messagesPath }, 'Cleared chat state files') } catch (error) { logger.error( { diff --git a/common/src/types/contracts/bigquery.ts b/common/src/types/contracts/bigquery.ts index c996995bdb..36f6c896dc 100644 --- a/common/src/types/contracts/bigquery.ts +++ b/common/src/types/contracts/bigquery.ts @@ -21,3 +21,35 @@ export type InsertMessageBigqueryFn = (params: { dataset?: string logger: Logger }) => Promise + +export type ChatCompletionTraceRow = { + id: string + user_id: string + client_id?: string | null + trace_session_id: string + trace_lineage_id: string + run_id: string + agent_id: string + created_at: Date + model: string + cost_mode?: string | null + request: unknown + message_count: number + message_start_index: number + message_delta_count: number + previous_message_count?: number | null + common_prefix_length: number + cache_hit: boolean + full_snapshot: boolean + messages: unknown[] + delta_message_hashes: string[] + tool_count: number + tools?: unknown[] | null + tools_omitted: boolean +} + +export type InsertChatCompletionTraceBigqueryFn = (params: { + row: ChatCompletionTraceRow + dataset?: string + logger: Logger +}) => Promise diff --git a/packages/bigquery/src/client.ts b/packages/bigquery/src/client.ts index 006e0a5adc..358269366c 100644 --- a/packages/bigquery/src/client.ts +++ b/packages/bigquery/src/client.ts @@ -2,10 +2,18 @@ import { IS_PROD } from '@codebuff/common/env' import { getErrorObject } from '@codebuff/common/util/error' import { BigQuery } from '@google-cloud/bigquery' -import { MESSAGE_SCHEMA, RELABELS_SCHEMA, TRACES_SCHEMA } from './schema' +import { + CHAT_COMPLETION_TRACES_SCHEMA, + MESSAGE_SCHEMA, + RELABELS_SCHEMA, + TRACES_SCHEMA, +} from './schema' import type { BaseTrace, GetRelevantFilesTrace, Relabel, Trace } from './schema' -import type { MessageRow } from '@codebuff/common/types/contracts/bigquery' +import type { + ChatCompletionTraceRow, + MessageRow, +} from '@codebuff/common/types/contracts/bigquery' import type { Logger } from '@codebuff/common/types/contracts/logger' const DATASET = IS_PROD ? 'codebuff_data' : 'codebuff_data_dev' @@ -13,6 +21,7 @@ const DATASET = IS_PROD ? 'codebuff_data' : 'codebuff_data_dev' const TRACES_TABLE = 'traces' const RELABELS_TABLE = 'relabels' const MESSAGE_TABLE = 'message' +const CHAT_COMPLETION_TRACES_TABLE = 'chat_completion_traces' // Create a single BigQuery client instance to be used by all functions let client: BigQuery | null = null @@ -77,6 +86,17 @@ export async function setupBigQuery({ fields: ['user_id'], }, }) + await ds.table(CHAT_COMPLETION_TRACES_TABLE).get({ + autoCreate: true, + schema: CHAT_COMPLETION_TRACES_SCHEMA, + timePartitioning: { + type: 'MONTH', + field: 'created_at', + }, + clustering: { + fields: ['user_id', 'trace_session_id', 'trace_lineage_id'], + }, + }) } catch (error) { const err = error as Error & { code?: string; details?: unknown } logger.error( @@ -94,6 +114,53 @@ export async function setupBigQuery({ } } +export async function insertChatCompletionTraceBigquery({ + row, + dataset, + logger, +}: { + row: ChatCompletionTraceRow + dataset?: string + logger: Logger +}) { + const resolvedDataset = dataset ?? DATASET + try { + await getClient() + .dataset(resolvedDataset) + .table(CHAT_COMPLETION_TRACES_TABLE) + .insert({ + ...row, + request: JSON.stringify(row.request), + messages: JSON.stringify(row.messages), + delta_message_hashes: JSON.stringify(row.delta_message_hashes), + tools: row.tools ? JSON.stringify(row.tools) : null, + }) + + logger.debug( + { + traceId: row.id, + userId: row.user_id, + clientId: row.client_id, + traceSessionId: row.trace_session_id, + traceLineageId: row.trace_lineage_id, + runId: row.run_id, + messageStartIndex: row.message_start_index, + messageDeltaCount: row.message_delta_count, + fullSnapshot: row.full_snapshot, + }, + 'Inserted chat completion trace into BigQuery', + ) + return true + } catch (error) { + logger.error( + { error: getErrorObject(error), traceId: row.id }, + 'Failed to insert chat completion trace into BigQuery', + ) + + return false + } +} + export async function insertMessageBigquery({ row, dataset, diff --git a/packages/bigquery/src/schema.ts b/packages/bigquery/src/schema.ts index 697a7c19ac..873c1d2bae 100644 --- a/packages/bigquery/src/schema.ts +++ b/packages/bigquery/src/schema.ts @@ -143,3 +143,31 @@ export const MESSAGE_SCHEMA: TableSchema = { { name: 'cache_read_input_tokens', type: 'INTEGER', mode: 'NULLABLE' }, ], } + +export const CHAT_COMPLETION_TRACES_SCHEMA: TableSchema = { + fields: [ + { name: 'id', type: 'STRING', mode: 'REQUIRED' }, + { name: 'user_id', type: 'STRING', mode: 'REQUIRED' }, + { name: 'client_id', type: 'STRING', mode: 'NULLABLE' }, + { name: 'trace_session_id', type: 'STRING', mode: 'REQUIRED' }, + { name: 'trace_lineage_id', type: 'STRING', mode: 'REQUIRED' }, + { name: 'run_id', type: 'STRING', mode: 'REQUIRED' }, + { name: 'agent_id', type: 'STRING', mode: 'REQUIRED' }, + { name: 'created_at', type: 'TIMESTAMP', mode: 'REQUIRED' }, + { name: 'model', type: 'STRING', mode: 'REQUIRED' }, + { name: 'cost_mode', type: 'STRING', mode: 'NULLABLE' }, + { name: 'request', type: 'JSON', mode: 'REQUIRED' }, + { name: 'message_count', type: 'INTEGER', mode: 'REQUIRED' }, + { name: 'message_start_index', type: 'INTEGER', mode: 'REQUIRED' }, + { name: 'message_delta_count', type: 'INTEGER', mode: 'REQUIRED' }, + { name: 'previous_message_count', type: 'INTEGER', mode: 'NULLABLE' }, + { name: 'common_prefix_length', type: 'INTEGER', mode: 'REQUIRED' }, + { name: 'cache_hit', type: 'BOOLEAN', mode: 'REQUIRED' }, + { name: 'full_snapshot', type: 'BOOLEAN', mode: 'REQUIRED' }, + { name: 'messages', type: 'JSON', mode: 'REQUIRED' }, + { name: 'delta_message_hashes', type: 'JSON', mode: 'REQUIRED' }, + { name: 'tool_count', type: 'INTEGER', mode: 'REQUIRED' }, + { name: 'tools', type: 'JSON', mode: 'NULLABLE' }, + { name: 'tools_omitted', type: 'BOOLEAN', mode: 'REQUIRED' }, + ], +} diff --git a/sdk/src/run-state.ts b/sdk/src/run-state.ts index 86f19b8383..7fcc35a42b 100644 --- a/sdk/src/run-state.ts +++ b/sdk/src/run-state.ts @@ -62,6 +62,7 @@ export function selectHighestPriorityKnowledgeFile( export type RunState = { sessionState?: SessionState output: AgentOutput + traceSessionId: string } export type InitialSessionStateOptions = { @@ -630,6 +631,7 @@ export async function generateInitialRunState({ fs: CodebuffFileSystem }): Promise { return { + traceSessionId: crypto.randomUUID(), sessionState: await initialSessionState({ cwd, skillsDir, diff --git a/sdk/src/run.ts b/sdk/src/run.ts index f5794a7def..b492443c39 100644 --- a/sdk/src/run.ts +++ b/sdk/src/run.ts @@ -177,6 +177,8 @@ export async function run(options: RunExecutionOptions): Promise { const abortError = createAbortError(signal) return { sessionState: options.previousRun?.sessionState, + traceSessionId: + options.previousRun?.traceSessionId ?? crypto.randomUUID(), output: { type: 'error', message: abortError.message, @@ -269,6 +271,7 @@ async function runOnce({ logger, }) } + const traceSessionId = previousRun?.traceSessionId ?? crypto.randomUUID() let resolve: (value: RunReturnType) => any = () => {} let _reject: (error: any) => any = () => {} @@ -322,6 +325,7 @@ async function runOnce({ message = message ?? 'Run cancelled by user.' return { sessionState: getCancelledSessionState(message), + traceSessionId, output: { type: 'error', message, @@ -460,6 +464,7 @@ async function runOnce({ resolve, onError, initialSessionState: sessionState, + traceSessionId, }) return } @@ -469,6 +474,7 @@ async function runOnce({ resolve, onError, initialSessionState: sessionState, + traceSessionId, }) return } @@ -530,7 +536,10 @@ async function runOnce({ repoId: undefined, clientSessionId: promptId, userId, - extraCodebuffMetadata, + extraCodebuffMetadata: { + ...(extraCodebuffMetadata ?? {}), + trace_session_id: traceSessionId, + }, signal: signal ?? new AbortController().signal, }).catch((error) => { let errorMessage = @@ -550,6 +559,7 @@ async function runOnce({ resolve({ sessionState: getCancelledSessionState(errorMessage), + traceSessionId, output: { type: 'error', message: errorMessage, @@ -825,11 +835,13 @@ async function handlePromptResponse({ resolve, onError, initialSessionState, + traceSessionId, }: { action: ServerAction<'prompt-response'> | ServerAction<'prompt-error'> resolve: (value: RunReturnType) => any onError: (error: { message: string }) => void initialSessionState: SessionState + traceSessionId: string }) { if (action.type === 'prompt-error') { onError({ message: action.message }) @@ -837,6 +849,7 @@ async function handlePromptResponse({ const statusCode = extractStatusCodeFromMessage(action.message) resolve({ sessionState: initialSessionState, + traceSessionId, output: { type: 'error', message: action.message, @@ -856,6 +869,7 @@ async function handlePromptResponse({ onError({ message }) resolve({ sessionState: initialSessionState, + traceSessionId, output: { type: 'error', message, @@ -867,6 +881,7 @@ async function handlePromptResponse({ const state: RunState = { sessionState, + traceSessionId, output: output ?? { type: 'error', message: 'No output from agent', @@ -880,6 +895,7 @@ async function handlePromptResponse({ }) resolve({ sessionState: initialSessionState, + traceSessionId, output: { type: 'error', message: 'Internal error: prompt response type not handled', diff --git a/web/src/app/api/v1/chat/completions/_post.ts b/web/src/app/api/v1/chat/completions/_post.ts index 0a48fce0bc..0d18570d28 100644 --- a/web/src/app/api/v1/chat/completions/_post.ts +++ b/web/src/app/api/v1/chat/completions/_post.ts @@ -18,7 +18,10 @@ import { env } from '@codebuff/internal/env' import { NextResponse } from 'next/server' import type { TrackEventFn } from '@codebuff/common/types/contracts/analytics' -import type { InsertMessageBigqueryFn } from '@codebuff/common/types/contracts/bigquery' +import type { + InsertChatCompletionTraceBigqueryFn, + InsertMessageBigqueryFn, +} from '@codebuff/common/types/contracts/bigquery' import type { GetUserUsageDataFn } from '@codebuff/common/types/contracts/billing' import type { GetAgentRunFromIdFn, @@ -43,6 +46,7 @@ import type { NextRequest } from 'next/server' import type { ChatCompletionRequestBody } from '@/llm-api/types' +import { recordChatCompletionTrace } from '@/llm-api/chat-completion-trace' import { createRequestAuditRecord } from '@/llm-api/helpers' import { CanopyWaveError, @@ -180,6 +184,7 @@ export async function postChatCompletions(params: { getAgentRunFromId: GetAgentRunFromIdFn fetch: typeof globalThis.fetch insertMessageBigquery: InsertMessageBigqueryFn + insertChatCompletionTraceBigquery?: InsertChatCompletionTraceBigqueryFn ensureSubscriberBlockGrant?: (params: { userId: string logger: Logger @@ -203,6 +208,7 @@ export async function postChatCompletions(params: { getAgentRunFromId, fetch, insertMessageBigquery, + insertChatCompletionTraceBigquery, ensureSubscriberBlockGrant, getUserPreferences, checkSessionAdmissible: checkSession = checkSessionAdmissible, @@ -701,6 +707,15 @@ export async function postChatCompletions(params: { const openrouterApiKey = req.headers.get(BYOK_OPENROUTER_HEADER) const providerLogger = sampleSuccessLogger(logger, sampleFreebuffSuccess) + recordChatCompletionTrace({ + body: typedBody, + userId, + agentId, + ancestorRunIds, + logger: providerLogger, + insertChatCompletionTraceBigquery, + }) + // Handle streaming vs non-streaming try { if (bodyStream) { diff --git a/web/src/app/api/v1/chat/completions/route.ts b/web/src/app/api/v1/chat/completions/route.ts index a6a4ace378..2fbdfd5fa8 100644 --- a/web/src/app/api/v1/chat/completions/route.ts +++ b/web/src/app/api/v1/chat/completions/route.ts @@ -1,4 +1,7 @@ -import { insertMessageBigquery } from '@codebuff/bigquery' +import { + insertChatCompletionTraceBigquery, + insertMessageBigquery, +} from '@codebuff/bigquery' import { ensureSubscriberBlockGrant } from '@codebuff/billing/subscription' import { getUserUsageData } from '@codebuff/billing/usage-service' import { trackEvent } from '@codebuff/common/analytics' @@ -36,6 +39,7 @@ export async function POST(req: NextRequest) { getAgentRunFromId, fetch, insertMessageBigquery, + insertChatCompletionTraceBigquery, ensureSubscriberBlockGrant, getUserPreferences, }) diff --git a/web/src/llm-api/__tests__/chat-completion-trace.test.ts b/web/src/llm-api/__tests__/chat-completion-trace.test.ts new file mode 100644 index 0000000000..42648ad0ba --- /dev/null +++ b/web/src/llm-api/__tests__/chat-completion-trace.test.ts @@ -0,0 +1,277 @@ +import { + beforeAll, + beforeEach, + describe, + expect, + it, + jest, +} from '@jest/globals' + +jest.mock('@codebuff/bigquery', () => ({ + setupBigQuery: jest.fn(), +})) + +import type { ChatCompletionTraceRow } from '@codebuff/common/types/contracts/bigquery' +import type { ChatCompletionRequestBody } from '../types' +import type { + recordChatCompletionTrace as recordChatCompletionTraceType, + resetChatCompletionTraceCacheForTests as resetChatCompletionTraceCacheForTestsType, +} from '../chat-completion-trace' + +const testLogger = { + debug: () => {}, + info: () => {}, + warn: () => {}, + error: () => {}, +} + +const baseBody = ( + messages: ChatCompletionRequestBody['messages'], +): ChatCompletionRequestBody => ({ + model: 'deepseek/deepseek-v4-pro', + stream: true, + messages, + tools: [ + { + type: 'function', + function: { name: 'read_files', parameters: {} }, + }, + ], + codebuff_metadata: { + client_id: 'client-1', + run_id: 'run-1', + trace_session_id: 'session-1', + trace_request_id: 'trace-1', + cost_mode: 'free', + }, +}) + +describe('buildChatCompletionTraceRow', () => { + let recordChatCompletionTrace: typeof recordChatCompletionTraceType + let resetChatCompletionTraceCacheForTests: typeof resetChatCompletionTraceCacheForTestsType + let rows: ChatCompletionTraceRow[] + let traceWriteTasks: Promise[] + + beforeAll(async () => { + const traceModule = await import('../chat-completion-trace') + recordChatCompletionTrace = traceModule.recordChatCompletionTrace + resetChatCompletionTraceCacheForTests = + traceModule.resetChatCompletionTraceCacheForTests + }) + + beforeEach(() => { + resetChatCompletionTraceCacheForTests() + rows = [] + traceWriteTasks = [] + }) + + const scheduleTraceWrite = (task: () => Promise) => { + traceWriteTasks.push(task()) + } + + const flushTraceWrites = async () => { + const tasks = traceWriteTasks + traceWriteTasks = [] + await Promise.all(tasks) + } + + const record = async (params: { + body: ChatCompletionRequestBody + userId?: string + agentId?: string + ancestorRunIds?: string[] + }) => { + recordChatCompletionTrace({ + body: params.body, + userId: params.userId ?? 'user-1', + agentId: params.agentId ?? 'base2-free-deepseek', + ancestorRunIds: params.ancestorRunIds ?? [], + logger: testLogger, + insertChatCompletionTraceBigquery: async ({ row }) => { + rows.push(row) + return true + }, + scheduleTraceWrite, + }) + await flushTraceWrites() + return rows.at(-1)! + } + + it('stores a full snapshot when the trace cache is cold', async () => { + const row = await record({ + body: baseBody([{ role: 'user', content: 'hello' }]), + }) + + expect(row.trace_session_id).toBe('session-1') + expect(row.trace_lineage_id).toBe('session-1') + expect(row.message_start_index).toBe(0) + expect(row.message_delta_count).toBe(1) + expect(row.messages).toEqual([{ role: 'user', content: 'hello' }]) + expect(row.cache_hit).toBe(false) + expect(row.full_snapshot).toBe(true) + expect(row.tools_omitted).toBe(false) + expect(row.tools).toHaveLength(1) + }) + + it('stores only the appended suffix for the same conversation', async () => { + await record({ + body: baseBody([{ role: 'user', content: 'hello' }]), + }) + + const row = await record({ + body: baseBody([ + { role: 'user', content: 'hello' }, + { role: 'assistant', content: 'hi' }, + { role: 'user', content: 'again' }, + ]), + }) + + expect(row.message_start_index).toBe(1) + expect(row.common_prefix_length).toBe(1) + expect(row.message_delta_count).toBe(2) + expect(row.messages).toEqual([ + { role: 'assistant', content: 'hi' }, + { role: 'user', content: 'again' }, + ]) + expect(row.cache_hit).toBe(true) + expect(row.full_snapshot).toBe(false) + expect(row.tools_omitted).toBe(true) + expect(row.tools).toBeNull() + }) + + it('uses trace_session_id to keep root-agent history incremental across user prompts', async () => { + await record({ + body: baseBody([{ role: 'user', content: 'hello' }]), + }) + + const otherRunBody = baseBody([ + { role: 'user', content: 'hello' }, + { role: 'assistant', content: 'hi from next prompt' }, + ]) + otherRunBody.codebuff_metadata = { + ...otherRunBody.codebuff_metadata, + client_id: 'client-2', + run_id: 'run-2', + trace_request_id: 'trace-2', + } + + const row = await record({ + body: otherRunBody, + }) + + expect(row.trace_lineage_id).toBe('session-1') + expect(row.cache_hit).toBe(true) + expect(row.message_start_index).toBe(1) + expect(row.messages).toEqual([ + { role: 'assistant', content: 'hi from next prompt' }, + ]) + }) + + it('keeps child runs isolated even when trace_session_id matches', async () => { + await record({ + body: baseBody([{ role: 'user', content: 'hello' }]), + agentId: 'reviewer', + ancestorRunIds: ['root-run-1'], + }) + + const otherRunBody = baseBody([{ role: 'user', content: 'hello' }]) + otherRunBody.codebuff_metadata = { + ...otherRunBody.codebuff_metadata, + run_id: 'run-2', + trace_request_id: 'trace-2', + } + + const row = await record({ + body: otherRunBody, + agentId: 'reviewer', + ancestorRunIds: ['root-run-1'], + }) + + expect(row.trace_lineage_id).toBe('run-2') + expect(row.cache_hit).toBe(false) + expect(row.message_start_index).toBe(0) + expect(row.messages).toEqual([{ role: 'user', content: 'hello' }]) + }) + + it('does not advance the prefix cache when BigQuery insert fails', async () => { + recordChatCompletionTrace({ + body: baseBody([{ role: 'user', content: 'hello' }]), + userId: 'user-1', + agentId: 'base2-free-deepseek', + ancestorRunIds: [], + logger: testLogger, + insertChatCompletionTraceBigquery: async () => false, + scheduleTraceWrite, + }) + await flushTraceWrites() + + const row = await record({ + body: baseBody([ + { role: 'user', content: 'hello' }, + { role: 'assistant', content: 'hi' }, + ]), + }) + + expect(row.cache_hit).toBe(false) + expect(row.message_start_index).toBe(0) + expect(row.messages).toEqual([ + { role: 'user', content: 'hello' }, + { role: 'assistant', content: 'hi' }, + ]) + }) + + it('skips the new table for old clients without trace_session_id', async () => { + const body = baseBody([{ role: 'user', content: 'hello' }]) + body.codebuff_metadata = { + client_id: 'client-1', + run_id: 'run-1', + cost_mode: 'free', + } + + const traceRequestId = recordChatCompletionTrace({ + body, + userId: 'user-1', + agentId: 'base2-free-deepseek', + ancestorRunIds: [], + logger: testLogger, + insertChatCompletionTraceBigquery: async ({ row }) => { + rows.push(row) + return true + }, + scheduleTraceWrite, + }) + + expect(traceRequestId).toBeNull() + expect(rows).toHaveLength(0) + expect(body.codebuff_metadata?.trace_request_id).toBeUndefined() + }) + + it('schedules BigQuery work off the caller stack', async () => { + let scheduledTask: (() => Promise) | undefined + const body = baseBody([{ role: 'user', content: 'hello' }]) + + const traceRequestId = recordChatCompletionTrace({ + body, + userId: 'user-1', + agentId: 'base2-free-deepseek', + ancestorRunIds: [], + logger: testLogger, + insertChatCompletionTraceBigquery: async ({ row }) => { + rows.push(row) + return true + }, + scheduleTraceWrite: (task) => { + scheduledTask = task + }, + }) + + expect(typeof traceRequestId).toBe('string') + expect(body.codebuff_metadata?.trace_request_id).toBe(traceRequestId) + expect(rows).toHaveLength(0) + + await scheduledTask?.() + + expect(rows).toHaveLength(1) + expect(rows[0]?.id).toBe(traceRequestId) + }) +}) diff --git a/web/src/llm-api/chat-completion-trace.ts b/web/src/llm-api/chat-completion-trace.ts new file mode 100644 index 0000000000..1cad72d76b --- /dev/null +++ b/web/src/llm-api/chat-completion-trace.ts @@ -0,0 +1,262 @@ +import { createHash, randomUUID } from 'node:crypto' + +import { setupBigQuery } from '@codebuff/bigquery' + +import { createRequestAuditRecord } from './request-audit' + +import type { + ChatCompletionTraceRow, + InsertChatCompletionTraceBigqueryFn, +} from '@codebuff/common/types/contracts/bigquery' +import type { Logger } from '@codebuff/common/types/contracts/logger' +import type { ChatCompletionRequestBody } from './types' + +type TraceCacheEntry = { + messageHashes: string[] + toolsHash: string | null +} + +const MAX_TRACE_CACHE_ENTRIES = 10_000 +const MAX_TRACE_CACHE_MESSAGE_HASHES = 250_000 +const traceCache = new Map() +let traceCacheMessageHashCount = 0 + +type ScheduleTraceWrite = (task: () => Promise) => void + +function stableJsonHash(value: unknown): string { + const json = JSON.stringify(value) + return createHash('sha256') + .update(json ?? 'undefined') + .digest('hex') +} + +function getTraceCacheKey(params: { + userId: string + traceLineageId: string + agentId: string +}) { + const { userId, traceLineageId, agentId } = params + return [userId, traceLineageId, agentId].join(':') +} + +function countCommonPrefix(left: string[], right: string[]) { + const max = Math.min(left.length, right.length) + for (let i = 0; i < max; i++) { + if (left[i] !== right[i]) return i + } + return max +} + +function rememberTraceCacheEntry(key: string, entry: TraceCacheEntry) { + if (traceCache.has(key)) { + forgetTraceCacheEntry(key) + } + traceCache.set(key, entry) + traceCacheMessageHashCount += entry.messageHashes.length + + while ( + traceCache.size > MAX_TRACE_CACHE_ENTRIES || + traceCacheMessageHashCount > MAX_TRACE_CACHE_MESSAGE_HASHES + ) { + const oldestKey = traceCache.keys().next().value + if (!oldestKey) break + forgetTraceCacheEntry(oldestKey) + } +} + +function forgetTraceCacheEntry(key: string) { + const entry = traceCache.get(key) + if (!entry) return + traceCache.delete(key) + traceCacheMessageHashCount -= entry.messageHashes.length +} + +function buildChatCompletionTraceRecord(params: { + body: ChatCompletionRequestBody + userId: string + agentId: string + ancestorRunIds: string[] + traceRequestId: string + createdAt: Date +}): { + row: ChatCompletionTraceRow + cacheKey: string + cacheEntry: TraceCacheEntry +} { + const { body, userId, agentId, ancestorRunIds, traceRequestId, createdAt } = + params + const messages = Array.isArray(body.messages) ? body.messages : [] + const tools = Array.isArray(body.tools) ? body.tools : undefined + const metadata = body.codebuff_metadata + const clientId = + typeof metadata?.client_id === 'string' ? metadata.client_id : null + const runId = typeof metadata?.run_id === 'string' ? metadata.run_id : '' + const traceSessionId = + typeof metadata?.trace_session_id === 'string' + ? metadata.trace_session_id + : undefined + if (!traceSessionId) { + throw new Error('trace_session_id is required for chat completion traces') + } + const traceLineageId = ancestorRunIds.length === 0 ? traceSessionId : runId + const costMode = + typeof metadata?.cost_mode === 'string' ? metadata.cost_mode : null + const cacheKey = getTraceCacheKey({ userId, traceLineageId, agentId }) + const cached = traceCache.get(cacheKey) + const messageHashes = messages.map(stableJsonHash) + const commonPrefixLength = cached + ? countCommonPrefix(cached.messageHashes, messageHashes) + : 0 + const deltaMessages = messages.slice(commonPrefixLength) + const deltaMessageHashes = messageHashes.slice(commonPrefixLength) + const toolsHash = tools ? stableJsonHash(tools) : null + const shouldIncludeTools = !!tools && cached?.toolsHash !== toolsHash + + const cacheEntry = { + messageHashes, + toolsHash, + } + + return { + cacheKey, + cacheEntry, + row: { + id: traceRequestId, + user_id: userId, + client_id: clientId, + trace_session_id: traceSessionId, + trace_lineage_id: traceLineageId, + run_id: runId, + agent_id: agentId, + created_at: createdAt, + model: body.model, + cost_mode: costMode, + request: createRequestAuditRecord(body), + message_count: messages.length, + message_start_index: commonPrefixLength, + message_delta_count: deltaMessages.length, + previous_message_count: cached?.messageHashes.length ?? null, + common_prefix_length: commonPrefixLength, + cache_hit: !!cached, + full_snapshot: commonPrefixLength === 0, + messages: deltaMessages, + delta_message_hashes: deltaMessageHashes, + tool_count: tools?.length ?? 0, + tools: shouldIncludeTools ? tools : null, + tools_omitted: !!tools && !shouldIncludeTools, + }, + } +} + +export function buildChatCompletionTraceRow( + params: Parameters[0], +): ChatCompletionTraceRow { + return buildChatCompletionTraceRecord(params).row +} + +export async function insertChatCompletionTraceToBigQuery(params: { + row: ChatCompletionTraceRow + logger: Logger + insertChatCompletionTraceBigquery: InsertChatCompletionTraceBigqueryFn +}) { + const { row, logger, insertChatCompletionTraceBigquery } = params + + await setupBigQuery({ logger }) + const success = await insertChatCompletionTraceBigquery({ + row, + logger, + }) + if (!success) { + logger.error( + { + traceId: row.id, + userId: row.user_id, + clientId: row.client_id, + runId: row.run_id, + messageDeltaCount: row.message_delta_count, + }, + 'Failed to insert chat completion trace into BigQuery', + ) + } + return success +} + +export function recordChatCompletionTrace(params: { + body: ChatCompletionRequestBody + userId: string + agentId: string + ancestorRunIds: string[] + logger: Logger + insertChatCompletionTraceBigquery?: InsertChatCompletionTraceBigqueryFn + scheduleTraceWrite?: ScheduleTraceWrite +}) { + const { + body, + userId, + agentId, + ancestorRunIds, + logger, + insertChatCompletionTraceBigquery, + scheduleTraceWrite = (task) => { + setTimeout(() => { + void task() + }, 0) + }, + } = params + if (typeof body.codebuff_metadata?.trace_session_id !== 'string') { + return null + } + if (!insertChatCompletionTraceBigquery) { + return null + } + + const traceRequestId = randomUUID() + body.codebuff_metadata = { + ...(body.codebuff_metadata ?? {}), + trace_request_id: traceRequestId, + } + + scheduleTraceWrite(() => { + let traceRecord: ReturnType + try { + traceRecord = buildChatCompletionTraceRecord({ + body, + userId, + agentId, + ancestorRunIds, + traceRequestId, + createdAt: new Date(), + }) + } catch (error) { + logger.error( + { error, traceId: traceRequestId }, + 'Failed to build chat completion trace row', + ) + return Promise.resolve() + } + + return insertChatCompletionTraceToBigQuery({ + row: traceRecord.row, + logger, + insertChatCompletionTraceBigquery, + }) + .then((success) => { + if (success) { + rememberTraceCacheEntry(traceRecord.cacheKey, traceRecord.cacheEntry) + } + }) + .catch((error) => { + logger.error( + { error, traceId: traceRecord.row.id }, + 'Failed to insert chat completion trace into BigQuery', + ) + }) + }) + + return traceRequestId +} + +export function resetChatCompletionTraceCacheForTests() { + traceCache.clear() + traceCacheMessageHashCount = 0 +} diff --git a/web/src/llm-api/helpers.ts b/web/src/llm-api/helpers.ts index 80920c77e9..ee804bf9d4 100644 --- a/web/src/llm-api/helpers.ts +++ b/web/src/llm-api/helpers.ts @@ -10,11 +10,15 @@ import { } from '@codebuff/common/constants/free-agents' import { PROFIT_MARGIN } from '@codebuff/common/old-constants' +import { createRequestAuditRecord } from './request-audit' + import type { InsertMessageBigqueryFn } from '@codebuff/common/types/contracts/bigquery' import type { Logger } from '@codebuff/common/types/contracts/logger' import type { ChatCompletionRequestBody } from './types' +export { createRequestAuditRecord } from './request-audit' + export type UsageData = { inputTokens: number outputTokens: number @@ -23,85 +27,6 @@ export type UsageData = { cost: number } -export function createRequestAuditRecord(body: unknown) { - // TODO: Add a separate append-only message_request BigQuery table for full - // raw request bodies, inserted before streaming starts. Keeping only this - // summary here avoids retaining huge chat requests until provider streams end. - if (typeof body !== 'object' || body === null || Array.isArray(body)) { - return { invalid_request_shape: true } - } - - const typedBody = body as Partial - const messages = Array.isArray(typedBody.messages) - ? typedBody.messages - : undefined - const tools = Array.isArray(typedBody.tools) ? typedBody.tools : undefined - - const messageRoleCounts = messages?.reduce>( - (counts, message) => { - const role = - typeof message === 'object' && message !== null && 'role' in message - ? String(message.role) - : 'unknown' - counts[role] = (counts[role] ?? 0) + 1 - return counts - }, - {}, - ) - - return { - model: typeof typedBody.model === 'string' ? typedBody.model : undefined, - stream: - typeof typedBody.stream === 'boolean' ? typedBody.stream : undefined, - temperature: - typeof typedBody.temperature === 'number' - ? typedBody.temperature - : undefined, - max_tokens: - typeof typedBody.max_tokens === 'number' - ? typedBody.max_tokens - : undefined, - max_completion_tokens: - typeof typedBody.max_completion_tokens === 'number' - ? typedBody.max_completion_tokens - : undefined, - top_p: typeof typedBody.top_p === 'number' ? typedBody.top_p : undefined, - reasoning_effort: - typeof typedBody.reasoning_effort === 'string' - ? typedBody.reasoning_effort - : undefined, - reasoning_enabled: - typeof typedBody.reasoning?.enabled === 'boolean' - ? typedBody.reasoning.enabled - : undefined, - reasoning_effort_nested: - typeof typedBody.reasoning?.effort === 'string' - ? typedBody.reasoning.effort - : undefined, - usage_include: - typeof typedBody.usage?.include === 'boolean' - ? typedBody.usage.include - : undefined, - codebuff_metadata: - typeof typedBody.codebuff_metadata === 'object' && - typedBody.codebuff_metadata !== null - ? { ...typedBody.codebuff_metadata } - : undefined, - message_count: messages?.length ?? 0, - message_role_counts: messageRoleCounts, - messages_omitted: !!messages, - tool_count: tools?.length ?? 0, - tool_names: tools - ?.map((tool) => - typeof tool === 'object' && tool !== null - ? tool.function?.name - : undefined, - ) - .filter((name): name is string => typeof name === 'string'), - tools_omitted: !!tools, - } -} - export function extractRequestMetadata(params: { body: unknown logger: Logger diff --git a/web/src/llm-api/request-audit.ts b/web/src/llm-api/request-audit.ts new file mode 100644 index 0000000000..88c77fb48d --- /dev/null +++ b/web/src/llm-api/request-audit.ts @@ -0,0 +1,77 @@ +import type { ChatCompletionRequestBody } from './types' + +export function createRequestAuditRecord(body: unknown) { + if (typeof body !== 'object' || body === null || Array.isArray(body)) { + return { invalid_request_shape: true } + } + + const typedBody = body as Partial + const messages = Array.isArray(typedBody.messages) + ? typedBody.messages + : undefined + const tools = Array.isArray(typedBody.tools) ? typedBody.tools : undefined + + const messageRoleCounts = messages?.reduce>( + (counts, message) => { + const role = + typeof message === 'object' && message !== null && 'role' in message + ? String(message.role) + : 'unknown' + counts[role] = (counts[role] ?? 0) + 1 + return counts + }, + {}, + ) + + return { + model: typeof typedBody.model === 'string' ? typedBody.model : undefined, + stream: + typeof typedBody.stream === 'boolean' ? typedBody.stream : undefined, + temperature: + typeof typedBody.temperature === 'number' + ? typedBody.temperature + : undefined, + max_tokens: + typeof typedBody.max_tokens === 'number' + ? typedBody.max_tokens + : undefined, + max_completion_tokens: + typeof typedBody.max_completion_tokens === 'number' + ? typedBody.max_completion_tokens + : undefined, + top_p: typeof typedBody.top_p === 'number' ? typedBody.top_p : undefined, + reasoning_effort: + typeof typedBody.reasoning_effort === 'string' + ? typedBody.reasoning_effort + : undefined, + reasoning_enabled: + typeof typedBody.reasoning?.enabled === 'boolean' + ? typedBody.reasoning.enabled + : undefined, + reasoning_effort_nested: + typeof typedBody.reasoning?.effort === 'string' + ? typedBody.reasoning.effort + : undefined, + usage_include: + typeof typedBody.usage?.include === 'boolean' + ? typedBody.usage.include + : undefined, + codebuff_metadata: + typeof typedBody.codebuff_metadata === 'object' && + typedBody.codebuff_metadata !== null + ? { ...typedBody.codebuff_metadata } + : undefined, + message_count: messages?.length ?? 0, + message_role_counts: messageRoleCounts, + messages_omitted: !!messages, + tool_count: tools?.length ?? 0, + tool_names: tools + ?.map((tool) => + typeof tool === 'object' && tool !== null + ? tool.function?.name + : undefined, + ) + .filter((name): name is string => typeof name === 'string'), + tools_omitted: !!tools, + } +} diff --git a/web/src/llm-api/types.ts b/web/src/llm-api/types.ts index 3c8500bdbb..38cdc4a0f8 100644 --- a/web/src/llm-api/types.ts +++ b/web/src/llm-api/types.ts @@ -4,6 +4,8 @@ import type { InsertMessageBigqueryFn } from '@codebuff/common/types/contracts/b export interface CodebuffMetadata { client_id?: string run_id?: string + trace_session_id?: string + trace_request_id?: string n?: number cost_mode?: string /** Server-issued session instance id (see /api/v1/freebuff/session). Required @@ -112,6 +114,10 @@ export function isCodebuffMetadata(value: unknown): value is CodebuffMetadata { return ( (v.client_id === undefined || typeof v.client_id === 'string') && (v.run_id === undefined || typeof v.run_id === 'string') && + (v.trace_session_id === undefined || + typeof v.trace_session_id === 'string') && + (v.trace_request_id === undefined || + typeof v.trace_request_id === 'string') && (v.n === undefined || typeof v.n === 'number') && (v.cost_mode === undefined || typeof v.cost_mode === 'string') && (v.freebuff_instance_id === undefined ||