diff --git a/apps/fixtures/README.md b/apps/fixtures/README.md index 5bc2c15dd..1453d512d 100644 --- a/apps/fixtures/README.md +++ b/apps/fixtures/README.md @@ -6,5 +6,6 @@ surface. - `weather-agent` backs root `pnpm dev`, manual weather-agent smokes, and bundle analysis. - `agent-tui-client` backs the non-e2e TUI smoke scripts in `packages/eve/test/tui-client`. +- `agent-idle-stream-repro` backs the stale eve client stream repro. When adding fixture behavior, prefer extending an existing fixture unless the new behavior needs incompatible app-level configuration. diff --git a/apps/fixtures/agent-idle-stream-repro/.gitignore b/apps/fixtures/agent-idle-stream-repro/.gitignore new file mode 100644 index 000000000..c8a733615 --- /dev/null +++ b/apps/fixtures/agent-idle-stream-repro/.gitignore @@ -0,0 +1,2 @@ +.vercel +.env*.local diff --git a/apps/fixtures/agent-idle-stream-repro/agent/agent.ts b/apps/fixtures/agent-idle-stream-repro/agent/agent.ts new file mode 100644 index 000000000..0d0339898 --- /dev/null +++ b/apps/fixtures/agent-idle-stream-repro/agent/agent.ts @@ -0,0 +1,5 @@ +import { defineAgent } from "eve"; + +export default defineAgent({ + model: "openai/gpt-5.4-mini", +}); diff --git a/apps/fixtures/agent-idle-stream-repro/agent/instructions.md b/apps/fixtures/agent-idle-stream-repro/agent/instructions.md new file mode 100644 index 000000000..996ba090a --- /dev/null +++ b/apps/fixtures/agent-idle-stream-repro/agent/instructions.md @@ -0,0 +1 @@ +You are an idle stream repro assistant. Call the `idle_stream_repro` tool when the user asks for it. After the tool returns, reply with the returned label and status. diff --git a/apps/fixtures/agent-idle-stream-repro/agent/tools/idle_stream_repro.ts b/apps/fixtures/agent-idle-stream-repro/agent/tools/idle_stream_repro.ts new file mode 100644 index 000000000..74b744c21 --- /dev/null +++ b/apps/fixtures/agent-idle-stream-repro/agent/tools/idle_stream_repro.ts @@ -0,0 +1,20 @@ +import { defineTool } from "eve/tools"; +import { z } from "zod"; + +export default defineTool({ + description: + "Repro fixture: waits before returning a deterministic image marker. Only call when the user explicitly asks for idle_stream_repro.", + inputSchema: z.object({ + delayMs: z.coerce.number().int().min(0).max(30_000).default(8_000), + label: z.string(), + }), + async execute(input) { + await new Promise((resolve) => setTimeout(resolve, input.delayMs)); + + return { + image: "repro-image.png", + label: input.label, + status: "completed", + }; + }, +}); diff --git a/apps/fixtures/agent-idle-stream-repro/package.json b/apps/fixtures/agent-idle-stream-repro/package.json new file mode 100644 index 000000000..5fa01877c --- /dev/null +++ b/apps/fixtures/agent-idle-stream-repro/package.json @@ -0,0 +1,15 @@ +{ + "name": "agent-idle-stream-repro", + "private": true, + "type": "module", + "scripts": { + "dev": "eve dev", + "build": "eve build", + "start": "eve start", + "typecheck": "tsc --noEmit -p tsconfig.json" + }, + "dependencies": { + "eve": "workspace:*", + "zod": "catalog:" + } +} diff --git a/apps/fixtures/agent-idle-stream-repro/tsconfig.json b/apps/fixtures/agent-idle-stream-repro/tsconfig.json new file mode 100644 index 000000000..b68dbf176 --- /dev/null +++ b/apps/fixtures/agent-idle-stream-repro/tsconfig.json @@ -0,0 +1,27 @@ +{ + "$schema": "https://json.schemastore.org/tsconfig", + "compilerOptions": { + "target": "ES2024", + "lib": ["ES2024"], + "module": "NodeNext", + "moduleResolution": "NodeNext", + "moduleDetection": "force", + "verbatimModuleSyntax": true, + "noEmit": true, + "erasableSyntaxOnly": true, + "strict": true, + "isolatedModules": true, + "forceConsistentCasingInFileNames": true, + "noUncheckedIndexedAccess": true, + "noFallthroughCasesInSwitch": true, + "noImplicitOverride": true, + "useUnknownInCatchVariables": true, + "resolveJsonModule": true, + "skipLibCheck": true, + "types": ["node"], + "allowJs": true, + "rootDir": "." + }, + "include": ["agent/**/*", ".eve/**/*.d.ts"], + "exclude": ["node_modules", "dist", "build", ".turbo", ".vercel"] +} diff --git a/packages/eve/src/internal/testing/scenario-apps/idle-stream-repro.ts b/packages/eve/src/internal/testing/scenario-apps/idle-stream-repro.ts new file mode 100644 index 000000000..aaac84721 --- /dev/null +++ b/packages/eve/src/internal/testing/scenario-apps/idle-stream-repro.ts @@ -0,0 +1,79 @@ +import type { ScenarioAppDescriptor } from "#internal/testing/scenario-app.js"; + +const IDLE_STREAM_REPRO_TSCONFIG_SOURCE = `${JSON.stringify( + { + $schema: "https://json.schemastore.org/tsconfig", + compilerOptions: { + allowJs: true, + erasableSyntaxOnly: true, + forceConsistentCasingInFileNames: true, + isolatedModules: true, + lib: ["ES2024"], + module: "NodeNext", + moduleDetection: "force", + moduleResolution: "NodeNext", + noEmit: true, + noFallthroughCasesInSwitch: true, + noImplicitOverride: true, + noUncheckedIndexedAccess: true, + resolveJsonModule: true, + rootDir: ".", + skipLibCheck: true, + strict: true, + target: "ES2024", + types: ["node"], + useUnknownInCatchVariables: true, + verbatimModuleSyntax: true, + }, + exclude: ["node_modules", "dist", "build", ".turbo", ".vercel"], + include: ["agent/**/*"], + }, + null, + 2, +)}\n`; + +/** + * Scenario-tier eve app for reproducing a stale client stream after an inline + * tool starts. The tool waits before returning, which gives a proxy enough + * time to stop forwarding the live response while durable tail events remain + * replayable from the session stream route. + */ +export const IDLE_STREAM_REPRO_DESCRIPTOR: ScenarioAppDescriptor = { + dependencies: { + zod: "^4.3.6", + }, + files: { + "agent/agent.ts": `import { defineAgent } from "eve"; + +export default defineAgent({ + model: "openai/gpt-5.4-mini", +}); +`, + "agent/instructions.md": `You are an idle stream repro assistant. Call the \`idle_stream_repro\` tool when the user asks for it. After the tool returns, reply with the returned label and status. +`, + "agent/tools/idle_stream_repro.ts": `import { defineTool } from "eve/tools"; +import { z } from "zod"; + +export default defineTool({ + description: + "Repro fixture: waits before returning a deterministic image marker. Only call when the user explicitly asks for idle_stream_repro.", + inputSchema: z.object({ + delayMs: z.coerce.number().int().min(0).max(30_000).default(8_000), + label: z.string(), + }), + async execute(input) { + await new Promise((resolve) => setTimeout(resolve, input.delayMs)); + + return { + image: "repro-image.png", + label: input.label, + status: "completed", + }; + }, +}); +`, + "tsconfig.json": IDLE_STREAM_REPRO_TSCONFIG_SOURCE, + }, + installDependencies: true, + name: "agent-idle-stream-repro", +}; diff --git a/packages/eve/src/internal/testing/scenario-apps/index.ts b/packages/eve/src/internal/testing/scenario-apps/index.ts index 08f5107c0..98110c8fc 100644 --- a/packages/eve/src/internal/testing/scenario-apps/index.ts +++ b/packages/eve/src/internal/testing/scenario-apps/index.ts @@ -2,6 +2,7 @@ export { CUSTOM_CHANNEL_PORTABILITY_DESCRIPTOR } from "#internal/testing/scenari export { DISCORD_ROUTE_PORTABILITY_DESCRIPTOR } from "#internal/testing/scenario-apps/discord-route-portability.js"; export { EXTENSION_AGENT_DESCRIPTOR } from "#internal/testing/scenario-apps/extension-agent.js"; export { GITHUB_ROUTE_PORTABILITY_DESCRIPTOR } from "#internal/testing/scenario-apps/github-route-portability.js"; +export { IDLE_STREAM_REPRO_DESCRIPTOR } from "#internal/testing/scenario-apps/idle-stream-repro.js"; export { EVE_ROUTE_PORTABILITY_DESCRIPTOR } from "#internal/testing/scenario-apps/eve-route-portability.js"; export { SANDBOX_BUNDLING_DESCRIPTOR } from "#internal/testing/scenario-apps/sandbox-bundling.js"; export { SANDBOX_WORKSPACES_DESCRIPTOR } from "#internal/testing/scenario-apps/sandbox-workspaces.js"; diff --git a/packages/eve/test/scenarios/dev-server.scenario.test.ts b/packages/eve/test/scenarios/dev-server.scenario.test.ts index be1d48aa3..38e2fb387 100644 --- a/packages/eve/test/scenarios/dev-server.scenario.test.ts +++ b/packages/eve/test/scenarios/dev-server.scenario.test.ts @@ -1,10 +1,18 @@ import { spawn, type ChildProcessByStdio } from "node:child_process"; +import { createServer, request as httpRequest, type IncomingMessage, type ServerResponse } from "node:http"; +import type { AddressInfo, Socket } from "node:net"; import { join } from "node:path"; import type { Readable } from "node:stream"; import { describe, expect, it } from "vitest"; -import { EVE_HEALTH_ROUTE_PATH } from "../../src/protocol/routes.js"; +import { Client } from "../../src/client/index.js"; +import type { HandleMessageStreamEvent } from "../../src/protocol/message.js"; +import { + EVE_HEALTH_ROUTE_PATH, + createEveMessageStreamRoutePath, +} from "../../src/protocol/routes.js"; +import { IDLE_STREAM_REPRO_DESCRIPTOR } from "../../src/internal/testing/scenario-apps/idle-stream-repro.js"; import { WEATHER_AGENT_DESCRIPTOR } from "../../src/internal/testing/scenario-apps/weather-agent.js"; import { type ScenarioAppDescriptor, @@ -12,6 +20,7 @@ import { } from "../../src/internal/testing/scenario-app.js"; import { sendDevelopmentMessage } from "../dev-client-harness/send-message.js"; import { createDevelopmentSessionState } from "../dev-client-harness/session.js"; +import { readMessageStreamEvents } from "../dev-client-harness/stream.js"; // Keep the dev TUI's glyph set deterministic across CI hosts so the // screen assertions below remain stable. @@ -171,6 +180,309 @@ async function waitForServerUrl(input: { }); } +interface Deferred { + readonly promise: Promise; + reject(error: unknown): void; + resolve(value: T): void; +} + +interface ProxySocketHangUp { + readonly events: readonly HandleMessageStreamEvent[]; + readonly logLine: string; + readonly streamUrl: string; +} + +interface RunningSocketHangUpProxy { + readonly logs: () => readonly string[]; + readonly url: string; + stop(): Promise; +} + +function createDeferred(): Deferred { + let resolve!: (value: T) => void; + let reject!: (error: unknown) => void; + const promise = new Promise((promiseResolve, promiseReject) => { + resolve = promiseResolve; + reject = promiseReject; + }); + + return { + promise, + reject, + resolve, + }; +} + +function isSessionStreamUrl(url: URL): boolean { + return /\/eve\/v1\/session\/[^/]+\/stream$/u.test(url.pathname); +} + +async function startSocketHangUpProxy(input: { + readonly idleAfterActionsRequestedMs: number; + readonly onProxyError: (error: unknown) => void; + readonly onSocketHangUp: (failure: ProxySocketHangUp) => void; + readonly targetUrl: string; +}): Promise { + const logs: string[] = []; + const sockets = new Set(); + const timers = new Set>(); + let interceptedStream = false; + const server = createServer((clientRequest, clientResponse) => { + proxyRequest({ + clientRequest, + clientResponse, + idleAfterActionsRequestedMs: input.idleAfterActionsRequestedMs, + interceptStream: !interceptedStream, + logs, + onIntercepted: () => { + interceptedStream = true; + }, + onProxyError: input.onProxyError, + onSocketHangUp: input.onSocketHangUp, + targetUrl: input.targetUrl, + timers, + }); + }); + + server.on("connection", (socket) => { + sockets.add(socket); + socket.once("close", () => { + sockets.delete(socket); + }); + }); + + await new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(0, "127.0.0.1", () => { + server.off("error", reject); + resolve(); + }); + }); + + const address = server.address() as AddressInfo; + + return { + logs: () => logs, + async stop() { + for (const timer of timers) { + clearTimeout(timer); + } + timers.clear(); + + for (const socket of sockets) { + socket.destroy(); + } + + await new Promise((resolve) => { + server.close(() => { + resolve(); + }); + }); + }, + url: `http://127.0.0.1:${address.port}`, + }; +} + +function proxyRequest(input: { + readonly clientRequest: IncomingMessage; + readonly clientResponse: ServerResponse; + readonly idleAfterActionsRequestedMs: number; + readonly interceptStream: boolean; + readonly logs: string[]; + readonly onIntercepted: () => void; + readonly onProxyError: (error: unknown) => void; + readonly onSocketHangUp: (failure: ProxySocketHangUp) => void; + readonly targetUrl: string; + readonly timers: Set>; +}): void { + const targetUrl = new URL(input.clientRequest.url ?? "/", input.targetUrl); + const shouldIntercept = input.interceptStream && isSessionStreamUrl(targetUrl); + const upstreamRequest = httpRequest( + targetUrl, + { + headers: { + ...input.clientRequest.headers, + host: targetUrl.host, + }, + method: input.clientRequest.method, + }, + (upstreamResponse) => { + if (!shouldIntercept) { + writeProxyResponseHead(input.clientResponse, upstreamResponse); + upstreamResponse.pipe(input.clientResponse); + return; + } + + input.onIntercepted(); + proxyStreamUntilSocketHangUp({ + clientResponse: input.clientResponse, + idleAfterActionsRequestedMs: input.idleAfterActionsRequestedMs, + logs: input.logs, + onProxyError: input.onProxyError, + onSocketHangUp: input.onSocketHangUp, + targetUrl, + timers: input.timers, + upstreamRequest, + upstreamResponse, + }); + }, + ); + + upstreamRequest.on("error", (error) => { + if (shouldIntercept && input.clientResponse.headersSent && !input.clientResponse.writableEnded) { + return; + } + + input.onProxyError(error); + if (!input.clientResponse.headersSent) { + input.clientResponse.writeHead(502); + } + input.clientResponse.end(String(error)); + }); + + input.clientRequest.pipe(upstreamRequest); +} + +function writeProxyResponseHead( + clientResponse: ServerResponse, + upstreamResponse: IncomingMessage, +): void { + clientResponse.writeHead( + upstreamResponse.statusCode ?? 502, + upstreamResponse.statusMessage, + upstreamResponse.headers, + ); +} + +function proxyStreamUntilSocketHangUp(input: { + readonly clientResponse: ServerResponse; + readonly idleAfterActionsRequestedMs: number; + readonly logs: string[]; + readonly onProxyError: (error: unknown) => void; + readonly onSocketHangUp: (failure: ProxySocketHangUp) => void; + readonly targetUrl: URL; + readonly timers: Set>; + readonly upstreamRequest: ReturnType; + readonly upstreamResponse: IncomingMessage; +}): void { + writeProxyResponseHead(input.clientResponse, input.upstreamResponse); + + const decoder = new TextDecoder(); + const events: HandleMessageStreamEvent[] = []; + let buffer = ""; + let failedProxy = false; + + const failProxy = () => { + if (failedProxy) { + return; + } + + failedProxy = true; + const logLine = `Failed to proxy <${input.targetUrl.toString()}> Error: socket hang up (ECONNRESET)`; + const error = Object.assign(new Error("socket hang up"), { + code: "ECONNRESET", + }); + + input.logs.push(logLine); + input.onSocketHangUp({ + events: [...events], + logLine, + streamUrl: input.targetUrl.toString(), + }); + input.upstreamResponse.destroy(error); + input.upstreamRequest.destroy(error); + }; + + const scheduleFailure = () => { + const timer = setTimeout(() => { + input.timers.delete(timer); + failProxy(); + }, input.idleAfterActionsRequestedMs); + input.timers.add(timer); + }; + + input.upstreamResponse.on("data", (chunk: Buffer) => { + if (failedProxy) { + return; + } + + try { + buffer += decoder.decode(chunk, { stream: true }); + let newlineIndex = buffer.indexOf("\n"); + + while (newlineIndex !== -1) { + const line = buffer.slice(0, newlineIndex).trim(); + buffer = buffer.slice(newlineIndex + 1); + + if (line.length > 0) { + const event = JSON.parse(line) as HandleMessageStreamEvent; + events.push(event); + input.clientResponse.write(`${line}\n`); + + if (event.type === "actions.requested") { + input.upstreamResponse.pause(); + scheduleFailure(); + return; + } + } + + newlineIndex = buffer.indexOf("\n"); + } + } catch (error) { + input.onProxyError(error); + input.clientResponse.destroy(error instanceof Error ? error : undefined); + input.upstreamRequest.destroy(); + } + }); + + input.upstreamResponse.once("end", () => { + if (!failedProxy) { + input.onProxyError(new Error("Upstream stream ended before actions.requested.")); + input.clientResponse.end(); + } + }); + + input.upstreamResponse.once("error", (error) => { + if (!failedProxy) { + input.onProxyError(error); + input.clientResponse.destroy(error); + } + }); +} + +async function readDurableTailEvents(input: { + readonly fetch: typeof fetch; + readonly serverUrl: string; + readonly sessionId: string; + readonly startIndex: number; +}): Promise { + const tailUrl = new URL(createEveMessageStreamRoutePath(input.sessionId), input.serverUrl); + tailUrl.searchParams.set("startIndex", String(input.startIndex)); + const response = await input.fetch(tailUrl); + + if (!response.ok) { + const body = await response.text(); + throw new Error(body || `Durable stream returned ${response.status}.`); + } + + return await readMessageStreamEvents({ response }); +} + +async function hasSettledWithin(promise: Promise, ms: number): Promise { + let settled = false; + void promise.then( + () => { + settled = true; + }, + () => { + settled = true; + }, + ); + + await wait(ms); + return settled; +} + async function startEveDev(appRoot: string): Promise { const eveBinPath = join(appRoot, "node_modules", "eve", "bin", "eve.js"); const child = spawn( @@ -294,4 +606,86 @@ describe("eve dev server", () => { }, DEV_SERVER_SCENARIO_TIMEOUT_MS, ); + + it( + "reproduces a proxy socket hang up during a long inline tool while the durable tail completes", + async () => { + const app = await scenarioApp(IDLE_STREAM_REPRO_DESCRIPTOR); + const server = await startEveDev(app.appRoot); + const abortController = new AbortController(); + const proxyFailure = createDeferred(); + let proxy: RunningSocketHangUpProxy | undefined; + let resultPromise: Promise | undefined; + + try { + const originClient = new Client({ + host: server.url, + preserveCompletedSessions: true, + }); + const originSession = originClient.session(); + const firstResponse = await originSession.send("Prime the session before the repro turn."); + await firstResponse.result(); + + const baselineState = originSession.state; + expect(baselineState.sessionId).toBeDefined(); + expect(baselineState.streamIndex).toBeGreaterThan(0); + + proxy = await startSocketHangUpProxy({ + idleAfterActionsRequestedMs: 750, + onProxyError: proxyFailure.reject, + onSocketHangUp: proxyFailure.resolve, + targetUrl: server.url, + }); + + const proxiedClient = new Client({ + host: proxy.url, + preserveCompletedSessions: true, + }); + const proxiedSession = proxiedClient.session(baselineState); + const response = await proxiedSession.send({ + message: "Use idle_stream_repro with label `tail-marker` and delayMs `8000`.", + signal: abortController.signal, + }); + resultPromise = response.result(); + + const failure = await proxyFailure.promise; + const forwardedTypes = failure.events.map((event) => event.type); + expect(forwardedTypes).toContain("actions.requested"); + expect(failure.streamUrl).toContain("/stream?startIndex="); + expect(failure.streamUrl).toContain(`startIndex=${baselineState.streamIndex}`); + expect(failure.logLine).toContain("Failed to proxy event.type); + + expect(tailTypes).toEqual( + expect.arrayContaining([ + "action.result", + "message.completed", + "turn.completed", + "session.waiting", + ]), + ); + expect(tailTypes).not.toContain("actions.requested"); + expect(proxy.logs()).toContain(failure.logLine); + expect(await hasSettledWithin(resultPromise, 250)).toBe(false); + + abortController.abort(); + await resultPromise.catch(() => undefined); + } finally { + abortController.abort(); + await proxy?.stop(); + await resultPromise?.catch(() => undefined); + await server.stop(); + } + }, + DEV_SERVER_SCENARIO_TIMEOUT_MS, + ); + }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ee4bbab74..ca20be124 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -256,6 +256,15 @@ importers: specifier: 6.0.3 version: 6.0.3 + apps/fixtures/agent-idle-stream-repro: + dependencies: + eve: + specifier: workspace:* + version: link:../../../packages/eve + zod: + specifier: 'catalog:' + version: 4.4.3 + apps/fixtures/agent-tui-client: dependencies: '@opentelemetry/core':