From 052e0ff8bef284b39efa2816ecc90155ae137cb6 Mon Sep 17 00:00:00 2001 From: Andrew Barba Date: Sat, 27 Jun 2026 09:37:55 -0400 Subject: [PATCH 1/2] fix(eve): client streams - reconnect idle streams Signed-off-by: Andrew Barba --- .changeset/quick-stream-reconnect.md | 5 + docs/guides/client/overview.mdx | 2 +- docs/guides/client/streaming.mdx | 10 +- docs/guides/frontend/overview.mdx | 7 +- docs/guides/frontend/use-eve-agent-svelte.mdx | 5 +- docs/guides/frontend/use-eve-agent-vue.mdx | 5 +- packages/eve/src/client/client.test.ts | 33 +++++ packages/eve/src/client/client.ts | 18 ++- packages/eve/src/client/eve-agent-store.ts | 7 +- packages/eve/src/client/index.ts | 2 +- packages/eve/src/client/message-response.ts | 27 +++- packages/eve/src/client/ndjson.ts | 49 ++++++- packages/eve/src/client/open-stream.ts | 8 +- packages/eve/src/client/session.test.ts | 130 +++++++++++++++++- packages/eve/src/client/session.ts | 51 ++++++- packages/eve/src/client/types.ts | 28 +++- packages/eve/src/execution/workflow-steps.ts | 13 +- packages/eve/src/react/use-eve-agent.ts | 10 +- packages/eve/src/svelte/use-eve-agent.ts | 12 +- packages/eve/src/vue/use-eve-agent.ts | 5 +- 20 files changed, 390 insertions(+), 37 deletions(-) create mode 100644 .changeset/quick-stream-reconnect.md diff --git a/.changeset/quick-stream-reconnect.md b/.changeset/quick-stream-reconnect.md new file mode 100644 index 000000000..f8e15833b --- /dev/null +++ b/.changeset/quick-stream-reconnect.md @@ -0,0 +1,5 @@ +--- +"eve": patch +--- + +The TypeScript client now reconnects idle message streams after 30 seconds by default, resuming from the last consumed event index instead of waiting for the platform to close the stream. The default reconnect budget now tolerates more than five minutes of silent stream time, and a new `streamIdleTimeoutMs` option tunes or disables idle reconnects for `Client`, `send`, `stream`, and frontend bindings. diff --git a/docs/guides/client/overview.mdx b/docs/guides/client/overview.mdx index fb2246bd9..e33280df5 100644 --- a/docs/guides/client/overview.mdx +++ b/docs/guides/client/overview.mdx @@ -9,7 +9,7 @@ For browser chat UIs, start with [`useEveAgent`](../frontend/overview). For wire ## Create a client -A `Client` binds one host, auth policy, header policy, and stream reconnection budget: +A `Client` binds one host, auth policy, header policy, and stream behavior: ```ts import { Client } from "eve/client"; diff --git a/docs/guides/client/streaming.mdx b/docs/guides/client/streaming.mdx index 4de31ec78..e3e7f682d 100644 --- a/docs/guides/client/streaming.mdx +++ b/docs/guides/client/streaming.mdx @@ -89,11 +89,17 @@ The client reconnects after transient stream disconnects. It resumes from the nu ```ts const client = new Client({ host: "https://agent.example.com", - maxReconnectAttempts: 5, + maxReconnectAttempts: 10, + streamIdleTimeoutMs: 30_000, }); ``` -`maxReconnectAttempts` is per turn. The default is `3`. +`streamIdleTimeoutMs` controls how long the client waits for the next event +before reopening the stream from the last consumed event index. The default is +`30000`; set it to `0` to disable idle reconnects. `maxReconnectAttempts` +limits consecutive reconnects that do not produce another event. The default is +`10`, so a fully silent stream is tolerated for more than five minutes before +the client fails the turn. ## Open a stream manually diff --git a/docs/guides/frontend/overview.mdx b/docs/guides/frontend/overview.mdx index 64b620826..e72e702d2 100644 --- a/docs/guides/frontend/overview.mdx +++ b/docs/guides/frontend/overview.mdx @@ -187,10 +187,11 @@ const agent = useEveAgent({ }); ``` -Two more options tune turn behavior: +Three more options tune turn behavior: - `optimistic` (default `true`): projects submitted user messages into `data` before eve confirms them with a `message.received` event. These are reducer-facing projection events only. `events` stays the authoritative eve stream. -- `maxReconnectAttempts` (default `3`): stream reconnection budget per turn. +- `streamIdleTimeoutMs` (default `30000`): milliseconds of stream silence before the client reconnects from the last event index. Set to `0` to disable idle reconnects. +- `maxReconnectAttempts` (default `10`): consecutive reconnect budget for a turn when reconnects do not produce another event. ## Custom reducer @@ -250,7 +251,7 @@ const agent = useEveAgent({ Store the full `session` object (`sessionId`, `continuationToken`, `streamIndex`), not a single field. The session cursor lets eve continue the durable conversation; the event log lets your UI render historical messages without replaying the whole stream. A database-backed chat app should usually persist stream events as they arrive with `onEvent` and then save a final snapshot in `onFinish`. -For multiple chat threads, keep one saved event log and session cursor per thread. `host`, `reducer`, `session`, `initialEvents`, `initialSession`, `auth`, `headers`, `maxReconnectAttempts`, and `optimistic` are read when the hook creates its store, so remount the chat component when switching threads, for example with `key={chat.id}`. +For multiple chat threads, keep one saved event log and session cursor per thread. `host`, `reducer`, `session`, `initialEvents`, `initialSession`, `auth`, `headers`, `streamIdleTimeoutMs`, `maxReconnectAttempts`, and `optimistic` are read when the hook creates its store, so remount the chat component when switching threads, for example with `key={chat.id}`. If the user can refresh or navigate immediately after pressing send, create your app-level chat row and store the pending user message before calling `send()`. After the request starts, persist the session state as soon as it contains a `sessionId`, then reconnect an interrupted in-flight turn with `session.stream({ startIndex: savedEvents.length })` from the lower-level client. diff --git a/docs/guides/frontend/use-eve-agent-svelte.mdx b/docs/guides/frontend/use-eve-agent-svelte.mdx index 83c4a9955..a76d99b72 100644 --- a/docs/guides/frontend/use-eve-agent-svelte.mdx +++ b/docs/guides/frontend/use-eve-agent-svelte.mdx @@ -167,10 +167,11 @@ const agent = useEveAgent({ }); ``` -Two more options tune turn behavior: +Three more options tune turn behavior: - `optimistic` (default `true`): projects submitted user messages into `data` before eve confirms them with a `message.received` event. These are reducer-facing projection events only; `events` stays the authoritative eve stream. -- `maxReconnectAttempts` (default `3`): stream reconnection budget per turn. +- `streamIdleTimeoutMs` (default `30000`): milliseconds of stream silence before the client reconnects from the last event index. Set to `0` to disable idle reconnects. +- `maxReconnectAttempts` (default `10`): consecutive reconnect budget for a turn when reconnects do not produce another event. ## Custom reducer diff --git a/docs/guides/frontend/use-eve-agent-vue.mdx b/docs/guides/frontend/use-eve-agent-vue.mdx index 105be0d2e..1bcf199de 100644 --- a/docs/guides/frontend/use-eve-agent-vue.mdx +++ b/docs/guides/frontend/use-eve-agent-vue.mdx @@ -180,10 +180,11 @@ const agent = useEveAgent({ }); ``` -Two more options tune turn behavior: +Three more options tune turn behavior: - `optimistic` (default `true`): projects submitted user messages into `data` before eve confirms them with a `message.received` event. These are reducer-facing projection events only; `events` stays the authoritative eve stream. -- `maxReconnectAttempts` (default `3`): stream reconnection budget per turn. +- `streamIdleTimeoutMs` (default `30000`): milliseconds of stream silence before the client reconnects from the last event index. Set to `0` to disable idle reconnects. +- `maxReconnectAttempts` (default `10`): consecutive reconnect budget for a turn when reconnects do not produce another event. ## Custom reducer diff --git a/packages/eve/src/client/client.test.ts b/packages/eve/src/client/client.test.ts index baf602b31..ea5aa6a82 100644 --- a/packages/eve/src/client/client.test.ts +++ b/packages/eve/src/client/client.test.ts @@ -40,11 +40,44 @@ const AGENT_INFO: AgentInfoResult = { }; afterEach(() => { + vi.useRealTimers(); vi.restoreAllMocks(); vi.unstubAllEnvs(); }); describe("Client request policy", () => { + it("tolerates more than five minutes of default idle stream reconnects", async () => { + vi.useFakeTimers(); + let streamCancels = 0; + const fetchMock = vi.spyOn(globalThis, "fetch").mockImplementation(async (_request, init) => { + if ((init?.method ?? "GET") === "POST") { + return Response.json({ continuationToken: "eve:test", sessionId: "session_1" }); + } + + return new Response( + new ReadableStream({ + cancel() { + streamCancels += 1; + }, + }), + ); + }); + const client = new Client({ host: "https://eve.test" }); + + const resultPromise = (await client.session().send("hello")).result(); + const expectation = expect(resultPromise).rejects.toThrow( + 'Message stream for session "session_1" closed before the turn boundary after 0 event(s); last event: none.', + ); + + for (let i = 0; i < 11; i += 1) { + await vi.advanceTimersByTimeAsync(30_000); + } + + await expectation; + expect(fetchMock).toHaveBeenCalledTimes(12); + expect(streamCancels).toBe(11); + }); + it("enforces its redirect policy for info, health, raw fetch, and sessions", async () => { const fetchMock = vi .spyOn(globalThis, "fetch") diff --git a/packages/eve/src/client/client.ts b/packages/eve/src/client/client.ts index 87aec69b7..76b5118a0 100644 --- a/packages/eve/src/client/client.ts +++ b/packages/eve/src/client/client.ts @@ -17,6 +17,9 @@ import type { } from "#client/types.js"; import { VERCEL_TRUSTED_OIDC_IDP_TOKEN_HEADER } from "#client/types.js"; +const DEFAULT_MAX_RECONNECT_ATTEMPTS = 10; +const DEFAULT_STREAM_IDLE_TIMEOUT_MS = 30_000; + /** * HTTP client for talking to a deployed eve agent. * @@ -31,14 +34,18 @@ export class Client { readonly #maxReconnectAttempts: number; readonly #preserveCompletedSessions: boolean; readonly #redirect: ClientRedirectPolicy | undefined; + readonly #streamIdleTimeoutMs: number | undefined; constructor(options: ClientOptions) { this.#host = options.host; this.#auth = options.auth; this.#headers = options.headers; - this.#maxReconnectAttempts = options.maxReconnectAttempts ?? 3; + this.#maxReconnectAttempts = options.maxReconnectAttempts ?? DEFAULT_MAX_RECONNECT_ATTEMPTS; this.#preserveCompletedSessions = options.preserveCompletedSessions ?? false; this.#redirect = options.redirect; + this.#streamIdleTimeoutMs = normalizeStreamIdleTimeoutMs( + options.streamIdleTimeoutMs ?? DEFAULT_STREAM_IDLE_TIMEOUT_MS, + ); } /** @@ -139,6 +146,7 @@ export class Client { preserveCompletedSessions: this.#preserveCompletedSessions, redirect: this.#redirect, resolveHeaders: (perRequest) => this.#resolveHeaders(perRequest), + streamIdleTimeoutMs: this.#streamIdleTimeoutMs, }, resolved, ); @@ -242,6 +250,14 @@ function withRedirectPolicy( return redirect === undefined ? init : { ...init, redirect }; } +function normalizeStreamIdleTimeoutMs(value: number): number | undefined { + if (!Number.isFinite(value) || value < 0) { + throw new Error("streamIdleTimeoutMs must be a non-negative finite number."); + } + + return value === 0 ? undefined : value; +} + /** * Encodes a username:password pair as a base64 Basic auth credential. * Uses `TextEncoder` for correct UTF-8 handling across all runtimes. diff --git a/packages/eve/src/client/eve-agent-store.ts b/packages/eve/src/client/eve-agent-store.ts index 9fc73a22d..050feba23 100644 --- a/packages/eve/src/client/eve-agent-store.ts +++ b/packages/eve/src/client/eve-agent-store.ts @@ -54,8 +54,9 @@ export interface EveAgentStoreCallbacks { * Configuration for constructing an {@link EveAgentStore}. * * Requires a {@link EveAgentReducer | reducer}, plus either connection options - * (`host`, `auth`, `headers`, `maxReconnectAttempts`, `initialSession`) for a - * store-owned session or an existing {@link ClientSession} via `session`. + * (`host`, `auth`, `headers`, `maxReconnectAttempts`, + * `streamIdleTimeoutMs`, `initialSession`) for a store-owned session or an + * existing {@link ClientSession} via `session`. * * `optimistic` (default `true`) projects submitted user messages before the * server confirms them. `host` defaults to `""`. `initialEvents` and @@ -72,6 +73,7 @@ export interface EveAgentStoreInit { readonly optimistic?: boolean; readonly reducer: EveAgentReducer; readonly session?: ClientSession; + readonly streamIdleTimeoutMs?: number; } interface PendingMessageSubmission { @@ -119,6 +121,7 @@ export class EveAgentStore { headers: init.headers, host: init.host ?? "", maxReconnectAttempts: init.maxReconnectAttempts, + streamIdleTimeoutMs: init.streamIdleTimeoutMs, }).session(init.initialSession); this.#events = [...(init.initialEvents ?? [])]; this.#projectionEvents = [...this.#events]; diff --git a/packages/eve/src/client/index.ts b/packages/eve/src/client/index.ts index d89edf182..d12e43a45 100644 --- a/packages/eve/src/client/index.ts +++ b/packages/eve/src/client/index.ts @@ -8,7 +8,7 @@ export { AgentInfoResponseError } from "#client/agent-info-error.js"; export { ClientError } from "#client/client-error.js"; export { defaultMessageReducer } from "#client/message-reducer.js"; export { createDataUrlFilePart, createTextWithFileContent } from "#client/file-parts.js"; -export { MessageResponse } from "#client/message-response.js"; +export { MessageResponse, MessageStreamBoundaryError } from "#client/message-response.js"; export { ClientSession } from "#client/session.js"; // --------------------------------------------------------------------------- diff --git a/packages/eve/src/client/message-response.ts b/packages/eve/src/client/message-response.ts index f3fe1573e..217497942 100644 --- a/packages/eve/src/client/message-response.ts +++ b/packages/eve/src/client/message-response.ts @@ -1,4 +1,4 @@ -import type { HandleMessageStreamEvent } from "#protocol/message.js"; +import { isCurrentTurnBoundaryEvent, type HandleMessageStreamEvent } from "#protocol/message.js"; import { extractCompletedResult } from "#client/output-schema.js"; import { deriveResultStatus, @@ -55,6 +55,10 @@ export class MessageResponse implements AsyncIterable(events), events, @@ -79,3 +83,24 @@ export class MessageResponse implements AsyncIterable["read"]>>; + /** * Reads newline-delimited JSON events from a `ReadableStream`. * @@ -34,6 +58,7 @@ export function isStreamDisconnectError(error: unknown): boolean { */ export async function* readNdjsonStream( body: ReadableStream, + options: ReadNdjsonStreamOptions = {}, ): AsyncGenerator { const reader = body.getReader(); const decoder = new TextDecoder(); @@ -42,7 +67,7 @@ export async function* readNdjsonStream( try { while (true) { - const result = await reader.read(); + const result = await readWithIdleTimeout(reader, options.idleTimeoutMs); if (result.done) { reachedEof = true; @@ -83,3 +108,25 @@ export async function* readNdjsonStream( reader.releaseLock(); } } + +async function readWithIdleTimeout( + reader: ReadableStreamDefaultReader, + idleTimeoutMs: number | undefined, +): Promise { + if (idleTimeoutMs === undefined) { + return await reader.read(); + } + + let timeout: ReturnType | undefined; + const timeoutPromise = new Promise((_resolve, reject) => { + timeout = setTimeout(() => reject(new StreamIdleTimeoutError(idleTimeoutMs)), idleTimeoutMs); + }); + + try { + return await Promise.race([reader.read(), timeoutPromise]); + } finally { + if (timeout !== undefined) { + clearTimeout(timeout); + } + } +} diff --git a/packages/eve/src/client/open-stream.ts b/packages/eve/src/client/open-stream.ts index bc413254d..523b52697 100644 --- a/packages/eve/src/client/open-stream.ts +++ b/packages/eve/src/client/open-stream.ts @@ -14,6 +14,7 @@ const STREAM_OPEN_RETRYABLE_STATUS = new Set([404, 409, 425, 500, 502, 503, 504] */ interface OpenStreamInput { readonly host: string; + readonly idleTimeoutMs?: number; readonly maxReconnectAttempts: number; readonly resolveHeaders: () => Promise; readonly redirect?: ClientRedirectPolicy; @@ -26,7 +27,7 @@ type OpenStreamBodyInput = Omit; /** * Opens a durable NDJSON event stream with automatic reconnection on socket - * disconnection. Used by {@link ClientSession.stream}. + * disconnection and idle stream reads. Used by {@link ClientSession.stream}. */ export async function* openStreamIterable( input: OpenStreamInput, @@ -40,8 +41,9 @@ export async function* openStreamIterable( let disconnected = false; try { - for await (const event of readNdjsonStream(body)) { + for await (const event of readNdjsonStream(body, { idleTimeoutMs: input.idleTimeoutMs })) { startIndex += 1; + remainingReconnectAttempts = input.maxReconnectAttempts; yield event; } } catch (error) { @@ -51,7 +53,7 @@ export async function* openStreamIterable( disconnected = true; } - // Only reconnect on socket disconnection, not clean EOF or a + // Only reconnect on socket disconnection/idle timeout, not clean EOF or a // caller-initiated abort. if (!disconnected || input.signal?.aborted || remainingReconnectAttempts <= 0) { return; diff --git a/packages/eve/src/client/session.test.ts b/packages/eve/src/client/session.test.ts index aec7be5da..30e83c019 100644 --- a/packages/eve/src/client/session.test.ts +++ b/packages/eve/src/client/session.test.ts @@ -4,20 +4,26 @@ import { ClientSession } from "#client/session.js"; import type { SessionState } from "#client/types.js"; afterEach(() => { + vi.useRealTimers(); vi.restoreAllMocks(); }); function createSession( state: SessionState = { streamIndex: 0 }, - options: { readonly preserveCompletedSessions?: boolean } = {}, + options: { + readonly maxReconnectAttempts?: number; + readonly preserveCompletedSessions?: boolean; + readonly streamIdleTimeoutMs?: number; + } = {}, ) { const context: ConstructorParameters[0] = { host: "https://eve.test", - maxReconnectAttempts: 0, + maxReconnectAttempts: options.maxReconnectAttempts ?? 0, preserveCompletedSessions: options.preserveCompletedSessions ?? false, async resolveHeaders() { return new Headers(); }, + streamIdleTimeoutMs: options.streamIdleTimeoutMs, }; return new ClientSession(context, state); @@ -48,6 +54,22 @@ function createStreamResponse(events: readonly unknown[]) { ); } +function createOpenStreamResponse(events: readonly unknown[], onCancel?: () => void) { + const encoder = new TextEncoder(); + return new Response( + new ReadableStream({ + start(controller) { + for (const event of events) { + controller.enqueue(encoder.encode(`${JSON.stringify(event)}\n`)); + } + }, + cancel() { + onCancel?.(); + }, + }), + ); +} + describe("ClientSession", () => { it("serializes clientContext when sending a create-session message", async () => { const fetchMock = vi.spyOn(globalThis, "fetch").mockResolvedValueOnce(createAcceptedResponse()); @@ -171,6 +193,110 @@ describe("ClientSession", () => { expect(cancelled).toBe(true); }); + it("rejects a consumed turn stream that closes before a boundary", async () => { + vi.spyOn(globalThis, "fetch").mockImplementation(async (_request, init) => { + if ((init?.method ?? "GET") === "POST") { + return createAcceptedResponse(); + } + + return createStreamResponse([]); + }); + const session = createSession(); + + await expect((await session.send("first")).result()).rejects.toThrow( + 'Message stream for session "session_1" closed before the turn boundary after 0 event(s); last event: none.', + ); + }); + + it("rejects async iteration when a turn stream closes before a boundary", async () => { + vi.spyOn(globalThis, "fetch").mockImplementation(async (_request, init) => { + if ((init?.method ?? "GET") === "POST") { + return createAcceptedResponse(); + } + + return createStreamResponse([]); + }); + const session = createSession(); + const response = await session.send("first"); + + const drain = async () => { + for await (const _event of response) { + // Drain the stream so the generator can observe the missing boundary. + } + }; + + await expect(drain()).rejects.toThrow( + 'Message stream for session "session_1" closed before the turn boundary after 0 event(s); last event: none.', + ); + }); + + it("reopens an idle turn stream from the latest event index", async () => { + vi.useFakeTimers(); + const getUrls: string[] = []; + let cancelled = false; + vi.spyOn(globalThis, "fetch").mockImplementation(async (request, init) => { + if ((init?.method ?? "GET") === "POST") { + return createAcceptedResponse(); + } + + const url = + typeof request === "string" ? request : request instanceof URL ? request.href : request.url; + getUrls.push(url); + + if (getUrls.length === 1) { + return createOpenStreamResponse( + [{ type: "message.received", data: { turnId: "turn_1" } }], + () => { + cancelled = true; + }, + ); + } + + return createStreamResponse([{ type: "session.completed", data: {} }]); + }); + const session = createSession( + { streamIndex: 0 }, + { maxReconnectAttempts: 1, streamIdleTimeoutMs: 1 }, + ); + + const resultPromise = (await session.send("first")).result(); + await vi.advanceTimersByTimeAsync(1); + const result = await resultPromise; + + expect(result.status).toBe("completed"); + expect(cancelled).toBe(true); + expect(getUrls.map((url) => new URL(url).searchParams.get("startIndex"))).toEqual([null, "1"]); + }); + + it("rejects an idle turn stream after the reconnect budget is exhausted", async () => { + vi.useFakeTimers(); + let cancels = 0; + const fetchMock = vi.spyOn(globalThis, "fetch").mockImplementation(async (_request, init) => { + if ((init?.method ?? "GET") === "POST") { + return createAcceptedResponse(); + } + + return createOpenStreamResponse([], () => { + cancels += 1; + }); + }); + const session = createSession( + { streamIndex: 0 }, + { maxReconnectAttempts: 1, streamIdleTimeoutMs: 1 }, + ); + + const resultPromise = (await session.send("first")).result(); + const expectation = expect(resultPromise).rejects.toThrow( + 'Message stream for session "session_1" closed before the turn boundary after 0 event(s); last event: none.', + ); + await vi.advanceTimersByTimeAsync(1); + await vi.advanceTimersByTimeAsync(1); + + await expectation; + expect(fetchMock).toHaveBeenCalledTimes(3); + expect(cancels).toBe(2); + }); + it("resets the session by default after consuming through session.completed", async () => { const requests: Array<{ body?: unknown; method: string; url: string }> = []; const fetchMock = vi.spyOn(globalThis, "fetch").mockImplementation(async (request, init) => { diff --git a/packages/eve/src/client/session.ts b/packages/eve/src/client/session.ts index d839259ce..85dd1f05b 100644 --- a/packages/eve/src/client/session.ts +++ b/packages/eve/src/client/session.ts @@ -5,7 +5,7 @@ import { createEveContinueSessionRoutePath, } from "#protocol/routes.js"; import { ClientError } from "#client/client-error.js"; -import { MessageResponse } from "#client/message-response.js"; +import { MessageResponse, MessageStreamBoundaryError } from "#client/message-response.js"; import { isStreamDisconnectError, readNdjsonStream } from "#client/ndjson.js"; import { openStreamBody, openStreamIterable } from "#client/open-stream.js"; import { normalizeOutputSchemaForRequest } from "#client/output-schema.js"; @@ -32,6 +32,7 @@ interface SessionContext { readonly preserveCompletedSessions: boolean; readonly redirect?: ClientRedirectPolicy; resolveHeaders(perRequest?: Readonly>): Promise; + readonly streamIdleTimeoutMs: number | undefined; } /** @@ -68,13 +69,18 @@ export class ClientSession { */ async send(input: SendTurnInput): Promise> { const payload = normalizeSendTurnInput(input); + const streamIdleTimeoutMs = resolveStreamIdleTimeoutMs( + payload.streamIdleTimeoutMs, + this.#context.streamIdleTimeoutMs, + ); const state = this.#state; const postResult = await this.#postTurn(payload, state); const { continuationToken, sessionId } = postResult; return new MessageResponse({ continuationToken, - createStream: () => this.#createEventStream(sessionId, continuationToken, state, payload), + createStream: () => + this.#createEventStream(sessionId, continuationToken, state, payload, streamIdleTimeoutMs), sessionId, }); } @@ -84,7 +90,8 @@ export class ClientSession { * * Resumes from the session's stored stream cursor unless `options.startIndex` * overrides it. The returned iterable reconnects on transient socket - * disconnects, up to the client's `maxReconnectAttempts`. + * disconnects or sits idle, up to the client's consecutive reconnect + * budget. * * @throws {Error} If the session has no session ID (no message has been sent * yet). @@ -96,7 +103,12 @@ export class ClientSession { throw new Error("Session has no session ID. Send a message first."); } - return this.#streamAndAdvance(sessionId, options); + const streamIdleTimeoutMs = resolveStreamIdleTimeoutMs( + options?.streamIdleTimeoutMs, + this.#context.streamIdleTimeoutMs, + ); + + return this.#streamAndAdvance(sessionId, options, streamIdleTimeoutMs); } // --------------------------------------------------------------------------- @@ -159,8 +171,11 @@ export class ClientSession { continuationToken: string | undefined, initialState: SessionState, input: SendTurnPayload, + streamIdleTimeoutMs: number | undefined, ): AsyncGenerator { const events: HandleMessageStreamEvent[] = []; + let reachedBoundary = false; + let stoppedByAbort = false; try { let currentStreamIndex = initialState.sessionId === sessionId ? initialState.streamIndex : 0; @@ -177,9 +192,12 @@ export class ClientSession { let foundBoundary = false; try { - for await (const event of readNdjsonStream(body)) { + for await (const event of readNdjsonStream(body, { + idleTimeoutMs: streamIdleTimeoutMs, + })) { events.push(event); currentStreamIndex += 1; + remainingReconnectAttempts = this.#context.maxReconnectAttempts; yield event; if (isCurrentTurnBoundaryEvent(event)) { @@ -194,12 +212,14 @@ export class ClientSession { } if (foundBoundary) { + reachedBoundary = true; break; } // A caller-initiated abort is a stop signal, not a transient socket // disconnect — do not reconnect. if (input.signal?.aborted) { + stoppedByAbort = true; break; } @@ -218,6 +238,10 @@ export class ClientSession { session: initialState, }); } + + if (!reachedBoundary && !stoppedByAbort) { + throw new MessageStreamBoundaryError(sessionId, events); + } } async #openStreamBody( @@ -239,6 +263,7 @@ export class ClientSession { async *#streamAndAdvance( sessionId: string, options?: StreamOptions, + streamIdleTimeoutMs?: number, ): AsyncGenerator { const initialState = this.#state; const streamIndex = options?.startIndex ?? initialState.streamIndex; @@ -253,6 +278,7 @@ export class ClientSession { sessionId, signal: options?.signal, startIndex: streamIndex, + idleTimeoutMs: streamIdleTimeoutMs, })) { events.push(event); yield event; @@ -319,6 +345,21 @@ function normalizeSendTurnInput(input: SendTurnInput): SendTur return typeof input === "string" ? { message: input } : input; } +function resolveStreamIdleTimeoutMs( + override: number | undefined, + fallback: number | undefined, +): number | undefined { + if (override === undefined) { + return fallback; + } + + if (!Number.isFinite(override) || override < 0) { + throw new Error("streamIdleTimeoutMs must be a non-negative finite number."); + } + + return override === 0 ? undefined : override; +} + function createHandleMessageBody(input: { readonly input: SendTurnPayload; readonly outputSchema?: Record; diff --git a/packages/eve/src/client/types.ts b/packages/eve/src/client/types.ts index bb15fe479..5678a776f 100644 --- a/packages/eve/src/client/types.ts +++ b/packages/eve/src/client/types.ts @@ -94,12 +94,24 @@ export interface ClientOptions { readonly redirect?: ClientRedirectPolicy; /** - * Maximum number of stream reconnection attempts per message turn. + * Maximum consecutive stream reconnection attempts that may occur without + * receiving another event. * - * @default 3 + * @default 10 */ readonly maxReconnectAttempts?: number; + /** + * Milliseconds to wait for the next stream event before reopening the stream + * from the last consumed event index. This keeps realtime clients from + * sitting on an idle serverless stream until the platform closes it. + * + * Set to `0` to disable idle reconnects. + * + * @default 30000 + */ + readonly streamIdleTimeoutMs?: number; + /** * Keep a session's continuation token after a normal `session.completed` * boundary. @@ -166,6 +178,12 @@ export interface SendTurnPayload { * Additional headers for this request only. */ readonly headers?: Readonly>; + + /** + * Per-turn override for the client's stream idle reconnect timeout. Set to + * `0` to disable idle reconnects for this turn. + */ + readonly streamIdleTimeoutMs?: number; } /** @@ -182,6 +200,12 @@ export interface StreamOptions { * Abort signal for cancelling the stream. */ readonly signal?: AbortSignal; + + /** + * Per-stream override for the client's stream idle reconnect timeout. Set to + * `0` to disable idle reconnects for this stream. + */ + readonly streamIdleTimeoutMs?: number; } /** diff --git a/packages/eve/src/execution/workflow-steps.ts b/packages/eve/src/execution/workflow-steps.ts index a0c56af94..69370a2fa 100644 --- a/packages/eve/src/execution/workflow-steps.ts +++ b/packages/eve/src/execution/workflow-steps.ts @@ -643,6 +643,8 @@ export async function dispatchTurnStep( ): Promise<{ readonly runId: string }> { "use step"; + const requestId = input.delivery.kind === "deliver" ? input.delivery.requestId : undefined; + const rootSessionId = readRootSessionId(input.serializedContext) ?? input.sessionState.sessionId; const run = await startWorkflowPreferLatest( turnWorkflowReference, [createTurnWorkflowInput(input)], @@ -651,12 +653,19 @@ export async function dispatchTurnStep( attributes: normalizeEveAttributes( buildTurnAttributes({ parentSessionId: input.sessionState.sessionId, - requestId: input.delivery.kind === "deliver" ? input.delivery.requestId : undefined, - rootSessionId: readRootSessionId(input.serializedContext) ?? input.sessionState.sessionId, + requestId, + rootSessionId, }), ), }, ); + log.info("dispatched turn workflow", { + parentSessionId: input.sessionState.sessionId, + requestId, + rootSessionId, + turnRunId: run.runId, + }); + return { runId: run.runId }; } diff --git a/packages/eve/src/react/use-eve-agent.ts b/packages/eve/src/react/use-eve-agent.ts index fc102795a..907d45b08 100644 --- a/packages/eve/src/react/use-eve-agent.ts +++ b/packages/eve/src/react/use-eve-agent.ts @@ -70,6 +70,7 @@ export interface UseEveAgentOptions extends EveAgentStoreCallbacks readonly initialEvents?: readonly HandleMessageStreamEvent[]; readonly initialSession?: SessionState; readonly maxReconnectAttempts?: number; + readonly streamIdleTimeoutMs?: number; /** * Project submitted user messages before eve confirms them with a * `message.received` stream event. @@ -102,10 +103,10 @@ export function useEveAgent( * infer `TData`. * * Session-shaping options (`host`, `reducer`, `session`, `initialEvents`, - * `initialSession`, `auth`, `headers`, `maxReconnectAttempts`, `optimistic`) are - * read once when the store is created; remount to change them. Lifecycle - * callbacks (`onError`, `onEvent`, `onFinish`, `onSessionChange`, `prepareSend`) - * refresh on every render. + * `initialSession`, `auth`, `headers`, `maxReconnectAttempts`, + * `streamIdleTimeoutMs`, `optimistic`) are read once when the store is created; + * remount to change them. Lifecycle callbacks (`onError`, `onEvent`, + * `onFinish`, `onSessionChange`, `prepareSend`) refresh on every render. */ export function useEveAgent( options: UseEveAgentOptions = {}, @@ -124,6 +125,7 @@ export function useEveAgent( optimistic: options.optimistic, reducer, session: options.session, + streamIdleTimeoutMs: options.streamIdleTimeoutMs, }); } diff --git a/packages/eve/src/svelte/use-eve-agent.ts b/packages/eve/src/svelte/use-eve-agent.ts index c454e05bb..4345562bf 100644 --- a/packages/eve/src/svelte/use-eve-agent.ts +++ b/packages/eve/src/svelte/use-eve-agent.ts @@ -87,11 +87,18 @@ export interface UseEveAgentOptions extends EveAgentStoreCallbacks /** Seed session identity and stream cursor for resuming a prior conversation. */ readonly initialSession?: SessionState; /** - * Maximum number of stream reconnection attempts per turn. + * Maximum consecutive stream reconnection attempts that may occur without + * receiving another event. * - * @default 3 + * @default 10 */ readonly maxReconnectAttempts?: number; + /** + * Milliseconds of stream silence before the client reconnects. + * + * @default 30000 + */ + readonly streamIdleTimeoutMs?: number; /** * Project submitted user messages before eve confirms them with a * `message.received` stream event. Optimistic events are reducer-facing @@ -205,6 +212,7 @@ export function useEveAgent( optimistic: options.optimistic, reducer, session: options.session, + streamIdleTimeoutMs: options.streamIdleTimeoutMs, }); store.setCallbacks({ diff --git a/packages/eve/src/vue/use-eve-agent.ts b/packages/eve/src/vue/use-eve-agent.ts index 22645ea52..14a946508 100644 --- a/packages/eve/src/vue/use-eve-agent.ts +++ b/packages/eve/src/vue/use-eve-agent.ts @@ -84,8 +84,10 @@ export interface UseEveAgentOptions extends EveAgentStoreCallbacks readonly initialEvents?: readonly HandleMessageStreamEvent[]; /** Prior session cursor to resume from on mount. */ readonly initialSession?: SessionState; - /** Maximum SSE reconnection attempts per turn. @default 3 */ + /** Maximum consecutive stream reconnects without another event. @default 10 */ readonly maxReconnectAttempts?: number; + /** Milliseconds of stream silence before the client reconnects. @default 30000 */ + readonly streamIdleTimeoutMs?: number; /** * Project submitted user messages before eve confirms them with a * `message.received` stream event. @@ -144,6 +146,7 @@ export function useEveAgent( optimistic: options.optimistic, reducer, session: options.session, + streamIdleTimeoutMs: options.streamIdleTimeoutMs, }); store.setCallbacks({ From f1de748ea7ee491364144ada70a3416298761ff3 Mon Sep 17 00:00:00 2001 From: Andrew Barba Date: Sat, 27 Jun 2026 10:14:42 -0400 Subject: [PATCH 2/2] fix(eve): tui - include stream idle timeout in fake session Signed-off-by: Andrew Barba --- packages/eve/test/tui-client/tui-connection-auth-states.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/eve/test/tui-client/tui-connection-auth-states.ts b/packages/eve/test/tui-client/tui-connection-auth-states.ts index 5c908ebb7..3cab20888 100644 --- a/packages/eve/test/tui-client/tui-connection-auth-states.ts +++ b/packages/eve/test/tui-client/tui-connection-auth-states.ts @@ -44,6 +44,7 @@ class FakeSession extends ClientSession { host: "http://fake.invalid", maxReconnectAttempts: 0, preserveCompletedSessions: false, + streamIdleTimeoutMs: undefined, resolveHeaders: async () => new Headers(), }, { streamIndex: 0 },