Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/quick-stream-reconnect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"eve": patch
---

The TypeScript client now reconnects idle message streams after 30 seconds by default, resuming from the last consumed event index instead of waiting for the platform to close the stream. The default reconnect budget now tolerates more than five minutes of silent stream time, and a new `streamIdleTimeoutMs` option tunes or disables idle reconnects for `Client`, `send`, `stream`, and frontend bindings.
2 changes: 1 addition & 1 deletion docs/guides/client/overview.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ For browser chat UIs, start with [`useEveAgent`](../frontend/overview). For wire

## Create a client

A `Client` binds one host, auth policy, header policy, and stream reconnection budget:
A `Client` binds one host, auth policy, header policy, and stream behavior:

```ts
import { Client } from "eve/client";
Expand Down
10 changes: 8 additions & 2 deletions docs/guides/client/streaming.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,17 @@ The client reconnects after transient stream disconnects. It resumes from the nu
```ts
const client = new Client({
host: "https://agent.example.com",
maxReconnectAttempts: 5,
maxReconnectAttempts: 10,
streamIdleTimeoutMs: 30_000,
});
```

`maxReconnectAttempts` is per turn. The default is `3`.
`streamIdleTimeoutMs` controls how long the client waits for the next event
before reopening the stream from the last consumed event index. The default is
`30000`; set it to `0` to disable idle reconnects. `maxReconnectAttempts`
limits consecutive reconnects that do not produce another event. The default is
`10`, so a fully silent stream is tolerated for more than five minutes before
the client fails the turn.

## Open a stream manually

Expand Down
7 changes: 4 additions & 3 deletions docs/guides/frontend/overview.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,11 @@ const agent = useEveAgent({
});
```

Two more options tune turn behavior:
Three more options tune turn behavior:

- `optimistic` (default `true`): projects submitted user messages into `data` before eve confirms them with a `message.received` event. These are reducer-facing projection events only. `events` stays the authoritative eve stream.
- `maxReconnectAttempts` (default `3`): stream reconnection budget per turn.
- `streamIdleTimeoutMs` (default `30000`): milliseconds of stream silence before the client reconnects from the last event index. Set to `0` to disable idle reconnects.
- `maxReconnectAttempts` (default `10`): consecutive reconnect budget for a turn when reconnects do not produce another event.

## Custom reducer

Expand Down Expand Up @@ -250,7 +251,7 @@ const agent = useEveAgent({

Store the full `session` object (`sessionId`, `continuationToken`, `streamIndex`), not a single field. The session cursor lets eve continue the durable conversation; the event log lets your UI render historical messages without replaying the whole stream. A database-backed chat app should usually persist stream events as they arrive with `onEvent` and then save a final snapshot in `onFinish`.

For multiple chat threads, keep one saved event log and session cursor per thread. `host`, `reducer`, `session`, `initialEvents`, `initialSession`, `auth`, `headers`, `maxReconnectAttempts`, and `optimistic` are read when the hook creates its store, so remount the chat component when switching threads, for example with `key={chat.id}`.
For multiple chat threads, keep one saved event log and session cursor per thread. `host`, `reducer`, `session`, `initialEvents`, `initialSession`, `auth`, `headers`, `streamIdleTimeoutMs`, `maxReconnectAttempts`, and `optimistic` are read when the hook creates its store, so remount the chat component when switching threads, for example with `key={chat.id}`.

If the user can refresh or navigate immediately after pressing send, create your app-level chat row and store the pending user message before calling `send()`. After the request starts, persist the session state as soon as it contains a `sessionId`, then reconnect an interrupted in-flight turn with `session.stream({ startIndex: savedEvents.length })` from the lower-level client.

Expand Down
5 changes: 3 additions & 2 deletions docs/guides/frontend/use-eve-agent-svelte.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,11 @@ const agent = useEveAgent({
});
```

Two more options tune turn behavior:
Three more options tune turn behavior:

- `optimistic` (default `true`): projects submitted user messages into `data` before eve confirms them with a `message.received` event. These are reducer-facing projection events only; `events` stays the authoritative eve stream.
- `maxReconnectAttempts` (default `3`): stream reconnection budget per turn.
- `streamIdleTimeoutMs` (default `30000`): milliseconds of stream silence before the client reconnects from the last event index. Set to `0` to disable idle reconnects.
- `maxReconnectAttempts` (default `10`): consecutive reconnect budget for a turn when reconnects do not produce another event.

## Custom reducer

Expand Down
5 changes: 3 additions & 2 deletions docs/guides/frontend/use-eve-agent-vue.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,11 @@ const agent = useEveAgent({
});
```

Two more options tune turn behavior:
Three more options tune turn behavior:

- `optimistic` (default `true`): projects submitted user messages into `data` before eve confirms them with a `message.received` event. These are reducer-facing projection events only; `events` stays the authoritative eve stream.
- `maxReconnectAttempts` (default `3`): stream reconnection budget per turn.
- `streamIdleTimeoutMs` (default `30000`): milliseconds of stream silence before the client reconnects from the last event index. Set to `0` to disable idle reconnects.
- `maxReconnectAttempts` (default `10`): consecutive reconnect budget for a turn when reconnects do not produce another event.

## Custom reducer

Expand Down
33 changes: 33 additions & 0 deletions packages/eve/src/client/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,44 @@ const AGENT_INFO: AgentInfoResult = {
};

afterEach(() => {
vi.useRealTimers();
vi.restoreAllMocks();
vi.unstubAllEnvs();
});

describe("Client request policy", () => {
it("tolerates more than five minutes of default idle stream reconnects", async () => {
vi.useFakeTimers();
let streamCancels = 0;
const fetchMock = vi.spyOn(globalThis, "fetch").mockImplementation(async (_request, init) => {
if ((init?.method ?? "GET") === "POST") {
return Response.json({ continuationToken: "eve:test", sessionId: "session_1" });
}

return new Response(
new ReadableStream<Uint8Array>({
cancel() {
streamCancels += 1;
},
}),
);
});
const client = new Client({ host: "https://eve.test" });

const resultPromise = (await client.session().send("hello")).result();
const expectation = expect(resultPromise).rejects.toThrow(
'Message stream for session "session_1" closed before the turn boundary after 0 event(s); last event: none.',
);

for (let i = 0; i < 11; i += 1) {
await vi.advanceTimersByTimeAsync(30_000);
}

await expectation;
expect(fetchMock).toHaveBeenCalledTimes(12);
expect(streamCancels).toBe(11);
});

it("enforces its redirect policy for info, health, raw fetch, and sessions", async () => {
const fetchMock = vi
.spyOn(globalThis, "fetch")
Expand Down
18 changes: 17 additions & 1 deletion packages/eve/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import type {
} from "#client/types.js";
import { VERCEL_TRUSTED_OIDC_IDP_TOKEN_HEADER } from "#client/types.js";

const DEFAULT_MAX_RECONNECT_ATTEMPTS = 10;
const DEFAULT_STREAM_IDLE_TIMEOUT_MS = 30_000;

/**
* HTTP client for talking to a deployed eve agent.
*
Expand All @@ -31,14 +34,18 @@ export class Client {
readonly #maxReconnectAttempts: number;
readonly #preserveCompletedSessions: boolean;
readonly #redirect: ClientRedirectPolicy | undefined;
readonly #streamIdleTimeoutMs: number | undefined;

constructor(options: ClientOptions) {
this.#host = options.host;
this.#auth = options.auth;
this.#headers = options.headers;
this.#maxReconnectAttempts = options.maxReconnectAttempts ?? 3;
this.#maxReconnectAttempts = options.maxReconnectAttempts ?? DEFAULT_MAX_RECONNECT_ATTEMPTS;
this.#preserveCompletedSessions = options.preserveCompletedSessions ?? false;
this.#redirect = options.redirect;
this.#streamIdleTimeoutMs = normalizeStreamIdleTimeoutMs(
options.streamIdleTimeoutMs ?? DEFAULT_STREAM_IDLE_TIMEOUT_MS,
);
}

/**
Expand Down Expand Up @@ -139,6 +146,7 @@ export class Client {
preserveCompletedSessions: this.#preserveCompletedSessions,
redirect: this.#redirect,
resolveHeaders: (perRequest) => this.#resolveHeaders(perRequest),
streamIdleTimeoutMs: this.#streamIdleTimeoutMs,
},
resolved,
);
Expand Down Expand Up @@ -242,6 +250,14 @@ function withRedirectPolicy(
return redirect === undefined ? init : { ...init, redirect };
}

function normalizeStreamIdleTimeoutMs(value: number): number | undefined {
if (!Number.isFinite(value) || value < 0) {
throw new Error("streamIdleTimeoutMs must be a non-negative finite number.");
}

return value === 0 ? undefined : value;
}

/**
* Encodes a username:password pair as a base64 Basic auth credential.
* Uses `TextEncoder` for correct UTF-8 handling across all runtimes.
Expand Down
7 changes: 5 additions & 2 deletions packages/eve/src/client/eve-agent-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ export interface EveAgentStoreCallbacks<TData> {
* Configuration for constructing an {@link EveAgentStore}.
*
* Requires a {@link EveAgentReducer | reducer}, plus either connection options
* (`host`, `auth`, `headers`, `maxReconnectAttempts`, `initialSession`) for a
* store-owned session or an existing {@link ClientSession} via `session`.
* (`host`, `auth`, `headers`, `maxReconnectAttempts`,
* `streamIdleTimeoutMs`, `initialSession`) for a store-owned session or an
* existing {@link ClientSession} via `session`.
*
* `optimistic` (default `true`) projects submitted user messages before the
* server confirms them. `host` defaults to `""`. `initialEvents` and
Expand All @@ -72,6 +73,7 @@ export interface EveAgentStoreInit<TData> {
readonly optimistic?: boolean;
readonly reducer: EveAgentReducer<TData>;
readonly session?: ClientSession;
readonly streamIdleTimeoutMs?: number;
}

interface PendingMessageSubmission {
Expand Down Expand Up @@ -119,6 +121,7 @@ export class EveAgentStore<TData> {
headers: init.headers,
host: init.host ?? "",
maxReconnectAttempts: init.maxReconnectAttempts,
streamIdleTimeoutMs: init.streamIdleTimeoutMs,
}).session(init.initialSession);
this.#events = [...(init.initialEvents ?? [])];
this.#projectionEvents = [...this.#events];
Expand Down
2 changes: 1 addition & 1 deletion packages/eve/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export { AgentInfoResponseError } from "#client/agent-info-error.js";
export { ClientError } from "#client/client-error.js";
export { defaultMessageReducer } from "#client/message-reducer.js";
export { createDataUrlFilePart, createTextWithFileContent } from "#client/file-parts.js";
export { MessageResponse } from "#client/message-response.js";
export { MessageResponse, MessageStreamBoundaryError } from "#client/message-response.js";
export { ClientSession } from "#client/session.js";

// ---------------------------------------------------------------------------
Expand Down
27 changes: 26 additions & 1 deletion packages/eve/src/client/message-response.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { HandleMessageStreamEvent } from "#protocol/message.js";
import { isCurrentTurnBoundaryEvent, type HandleMessageStreamEvent } from "#protocol/message.js";
import { extractCompletedResult } from "#client/output-schema.js";
import {
deriveResultStatus,
Expand Down Expand Up @@ -55,6 +55,10 @@ export class MessageResponse<TOutput = unknown> implements AsyncIterable<HandleM
events.push(event);
}

if (!events.some(isCurrentTurnBoundaryEvent)) {
throw new MessageStreamBoundaryError(this.sessionId, events);
}

return {
data: extractCompletedResult<TOutput>(events),
events,
Expand All @@ -79,3 +83,24 @@ export class MessageResponse<TOutput = unknown> implements AsyncIterable<HandleM
return this.#createStream();
}
}

/**
* Error thrown when a turn stream closes before a `session.*` boundary for
* the submitted turn. This normally points at a transport timeout, platform
* stream cutoff, or workflow/control-plane stall.
*/
export class MessageStreamBoundaryError extends Error {
readonly events: readonly HandleMessageStreamEvent[];
readonly sessionId: string;

constructor(sessionId: string, events: readonly HandleMessageStreamEvent[]) {
const last = events.at(-1);
const lastType = last === undefined ? "none" : last.type;
super(
`Message stream for session "${sessionId}" closed before the turn boundary after ${events.length} event(s); last event: ${lastType}.`,
);
this.name = "MessageStreamBoundaryError";
this.events = events;
this.sessionId = sessionId;
}
}
49 changes: 48 additions & 1 deletion packages/eve/src/client/ndjson.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import type { HandleMessageStreamEvent } from "#protocol/message.js";
* can be recovered via reconnection.
*/
export function isStreamDisconnectError(error: unknown): boolean {
if (error instanceof StreamIdleTimeoutError) {
return true;
}

if (error instanceof DOMException) {
return error.name === "AbortError";
}
Expand All @@ -23,6 +27,26 @@ export function isStreamDisconnectError(error: unknown): boolean {
);
}

/**
* Error thrown when a stream produces no bytes before its configured idle
* deadline. Callers treat this as a reconnectable transport condition.
*/
export class StreamIdleTimeoutError extends Error {
readonly timeoutMs: number;

constructor(timeoutMs: number) {
super(`Message stream produced no events for ${timeoutMs}ms.`);
this.name = "StreamIdleTimeoutError";
this.timeoutMs = timeoutMs;
}
}

interface ReadNdjsonStreamOptions {
readonly idleTimeoutMs?: number;
}

type StreamReadResult = Awaited<ReturnType<ReadableStreamDefaultReader<Uint8Array>["read"]>>;

/**
* Reads newline-delimited JSON events from a `ReadableStream<Uint8Array>`.
*
Expand All @@ -34,6 +58,7 @@ export function isStreamDisconnectError(error: unknown): boolean {
*/
export async function* readNdjsonStream(
body: ReadableStream<Uint8Array>,
options: ReadNdjsonStreamOptions = {},
): AsyncGenerator<HandleMessageStreamEvent> {
const reader = body.getReader();
const decoder = new TextDecoder();
Expand All @@ -42,7 +67,7 @@ export async function* readNdjsonStream(

try {
while (true) {
const result = await reader.read();
const result = await readWithIdleTimeout(reader, options.idleTimeoutMs);

if (result.done) {
reachedEof = true;
Expand Down Expand Up @@ -83,3 +108,25 @@ export async function* readNdjsonStream(
reader.releaseLock();
}
}

async function readWithIdleTimeout(
reader: ReadableStreamDefaultReader<Uint8Array>,
idleTimeoutMs: number | undefined,
): Promise<StreamReadResult> {
if (idleTimeoutMs === undefined) {
return await reader.read();
}

let timeout: ReturnType<typeof setTimeout> | undefined;
const timeoutPromise = new Promise<never>((_resolve, reject) => {
timeout = setTimeout(() => reject(new StreamIdleTimeoutError(idleTimeoutMs)), idleTimeoutMs);
});

try {
return await Promise.race([reader.read(), timeoutPromise]);
} finally {
if (timeout !== undefined) {
clearTimeout(timeout);
}
}
}
8 changes: 5 additions & 3 deletions packages/eve/src/client/open-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const STREAM_OPEN_RETRYABLE_STATUS = new Set([404, 409, 425, 500, 502, 503, 504]
*/
interface OpenStreamInput {
readonly host: string;
readonly idleTimeoutMs?: number;
readonly maxReconnectAttempts: number;
readonly resolveHeaders: () => Promise<Headers>;
readonly redirect?: ClientRedirectPolicy;
Expand All @@ -26,7 +27,7 @@ type OpenStreamBodyInput = Omit<OpenStreamInput, "maxReconnectAttempts">;

/**
* Opens a durable NDJSON event stream with automatic reconnection on socket
* disconnection. Used by {@link ClientSession.stream}.
* disconnection and idle stream reads. Used by {@link ClientSession.stream}.
*/
export async function* openStreamIterable(
input: OpenStreamInput,
Expand All @@ -40,8 +41,9 @@ export async function* openStreamIterable(
let disconnected = false;

try {
for await (const event of readNdjsonStream(body)) {
for await (const event of readNdjsonStream(body, { idleTimeoutMs: input.idleTimeoutMs })) {
startIndex += 1;
remainingReconnectAttempts = input.maxReconnectAttempts;
yield event;
}
} catch (error) {
Expand All @@ -51,7 +53,7 @@ export async function* openStreamIterable(
disconnected = true;
}

// Only reconnect on socket disconnection, not clean EOF or a
// Only reconnect on socket disconnection/idle timeout, not clean EOF or a
// caller-initiated abort.
if (!disconnected || input.signal?.aborted || remainingReconnectAttempts <= 0) {
return;
Expand Down
Loading
Loading