Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
ca7828a
Clarify deterministic channel cancellation tokens
AndrewBarba Jun 25, 2026
d97d78b
Merge remote-tracking branch 'origin/main' into barba/session-turn-ca…
AndrewBarba Jun 25, 2026
ce191ef
Propagate abort signals through turn execution
AndrewBarba Jun 25, 2026
5ac74de
Add tests for cancel hook and continuation reuse
AndrewBarba Jun 25, 2026
e491c9b
Propagate abort signals through tool execution
AndrewBarba Jun 25, 2026
fc8d762
Add turn cancellation event handling
AndrewBarba Jun 25, 2026
a529514
Add channel-local turn cancellation hook
AndrewBarba Jun 25, 2026
e3d550e
Add turn cancellation support
AndrewBarba Jun 25, 2026
5ae5175
Add active turn cancellation to client
AndrewBarba Jun 25, 2026
f6e3e7b
Merge remote-tracking branch 'origin/main' into barba/session-turn-ca…
AndrewBarba Jun 26, 2026
a68d883
plumbing
AndrewBarba Jun 26, 2026
80e5d00
Record remote agent session identity on pending actions
AndrewBarba Jun 26, 2026
c020131
Propagate abort signals through turn cancellation
AndrewBarba Jun 26, 2026
96fa590
Delay remote child cancellation until dispatch adoption
AndrewBarba Jun 26, 2026
eb21fb2
Handle turn cancellation for pending remote agents
AndrewBarba Jun 26, 2026
4f9a252
Support cancellable eval turns
AndrewBarba Jun 26, 2026
e81a043
Merge remote-tracking branch 'origin/main' into barba/session-turn-ca…
AndrewBarba Jun 26, 2026
8a207df
Add turn cancellation e2e coverage
AndrewBarba Jun 26, 2026
aa01157
Limit cancellation e2e to settled turn
AndrewBarba Jun 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/calm-turns-cancel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"eve": patch
---

Turn cancellation now propagates through active local and remote subagents, allowing parent turns to settle cleanly after descendant work is cancelled. Evals can start a cancellable turn with `startTurn()` and observe its settled result.
15 changes: 15 additions & 0 deletions docs/concepts/sessions-runs-and-streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ The stream is newline-delimited JSON (NDJSON), one event per line:
| `authorization.completed` | A connection's authorization resolved; carries `outcome`. |
| `step.completed` | A model step finished; carries `finishReason` and usage. |
| `step.failed` | A model step failed; carries `{ code, message, details? }`. |
| `turn.cancelled` | The active turn was intentionally cancelled; followed by `session.waiting`. |
| `turn.completed` | The turn finished. |
| `turn.failed` | The turn failed; carries `{ code, message, details? }`. |
| `session.waiting` | The session parked, waiting for the next input (a message, an answer). |
Expand Down Expand Up @@ -86,6 +87,20 @@ The follow-up reuses the same durable session: same history, same state.

For deterministic ordering, send one follow-up at a time and wait for the next `session.waiting` event before sending another message to the same session. See [message delivery and queueing](./execution-model-and-durability#message-delivery-and-queueing) for the current runtime contract.

## Cancel the active turn

Use the current continuation token to cancel active work without ending the session:

```bash
curl -X POST http://127.0.0.1:3000/eve/v1/session/<sessionId>/cancel \
-H 'content-type: application/json' \
-d '{"scope":"turn","continuationToken":"<token>"}'
```

Accepted cancellation returns `202`. A stale token or a continuation with no active turn returns
`409`. The stream emits `turn.cancelled` followed by `session.waiting`, and the same session can
accept a later follow-up.

## Reconnect and rewind

The stream is durable. Every event is recorded before a step completes, so the whole stream is replayable. Pass `startIndex` to reconnect by event count and pick up where you dropped off, or rewind to the start:
Expand Down
16 changes: 16 additions & 0 deletions docs/guides/client/messages.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,22 @@ await resumed.result();

You can send `message`, `inputResponses`, and `clientContext` together when the resumed turn needs both a human answer and follow-up text.

## Cancel an active turn

Call `cancel()` on the response whose turn should stop:

```ts
const response = await session.send("Run the long analysis.");

await response.cancel();
const result = await response.result();
```

Cancellation stops server-side model, tool, and descendant work. The stream settles with
`turn.cancelled` followed by `session.waiting`, so the same `ClientSession` can send a later
follow-up. Aborting a request or stream with an `AbortSignal` only stops the local transport; it
does not request server-side cancellation.

## Single-use responses

`MessageResponse` is single-use. Either aggregate it:
Expand Down
11 changes: 11 additions & 0 deletions e2e/fixtures/agent-cancellation/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
node_modules
.env*
.eve
.vercel
.workflow-data
.next
.output
.nitro
dist
.DS_Store
*.tsbuildinfo
7 changes: 7 additions & 0 deletions e2e/fixtures/agent-cancellation/.vercelignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.env*.local
node_modules
.eve
.next
.output
.nitro
dist
20 changes: 20 additions & 0 deletions e2e/fixtures/agent-cancellation/agent/agent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { defineAgent } from "eve";
import { mockModel } from "eve/evals";

const WAIT_TOOL_NAME = "wait-for-cancellation";

export default defineAgent({
model: mockModel(({ lastUserMessage }) =>
lastUserMessage?.includes(WAIT_TOOL_NAME) === true
? {
toolCalls: [
{
id: "call_wait_for_cancellation",
name: WAIT_TOOL_NAME,
},
],
}
: `cancellation-follow-up-ok:${lastUserMessage ?? ""}`,
),
modelContextWindowTokens: 100_000,
});
1 change: 1 addition & 0 deletions e2e/fixtures/agent-cancellation/agent/instructions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
You are a deterministic cancellation test agent.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { defineTool } from "eve/tools";
import { z } from "zod";

export default defineTool({
description: "Blocks until the active turn is cancelled.",
inputSchema: z.object({}),
async execute(_input, { abortSignal }) {
await new Promise<never>((_resolve, reject) => {
const onAbort = () => {
reject(abortSignal.reason ?? new Error("Turn cancelled."));
};

if (abortSignal.aborted) {
onAbort();
return;
}

abortSignal.addEventListener("abort", onAbort, { once: true });
});
},
});
6 changes: 6 additions & 0 deletions e2e/fixtures/agent-cancellation/evals/evals.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { defineEvalConfig } from "eve/evals";

export default defineEvalConfig({
maxConcurrency: 1,
timeoutMs: 120_000,
});
26 changes: 26 additions & 0 deletions e2e/fixtures/agent-cancellation/evals/turn-cancellation.eval.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { defineEval } from "eve/evals";
import { equals } from "eve/evals/expect";

const WAIT_TOOL_NAME = "wait-for-cancellation";

export default defineEval({
description:
"Turn cancellation aborts active work and settles the session at a waiting boundary.",
tags: ["cancellation", "workflow"],

async test(t) {
const activeTurn = await t.startTurn(
`Call ${WAIT_TOOL_NAME} and wait for it to finish before replying.`,
);

await t.sleep(5_000);
await activeTurn.cancel();

const cancelledTurn = await activeTurn.result();
await t.require(cancelledTurn.status, equals("waiting"));
cancelledTurn.calledTool(WAIT_TOOL_NAME, { count: 1, status: "pending" });
cancelledTurn.eventOrder([{ type: "turn.cancelled" }, { type: "session.waiting" }]);
cancelledTurn.notEvent("turn.failed");
cancelledTurn.notEvent("session.failed");
},
});
24 changes: 24 additions & 0 deletions e2e/fixtures/agent-cancellation/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"name": "agent-cancellation",
"version": "0.0.0",
"private": true,
"type": "module",
"scripts": {
"build": "eve build",
"dev": "eve dev",
"start": "eve start",
"typecheck": "eve build && tsc",
"test:e2e": "eve eval --strict"
},
"dependencies": {
"@opentelemetry/core": "2.6.1",
"@opentelemetry/sdk-trace-base": "2.6.1",
"@vercel/otel": "2.1.2",
"eve": "workspace:*",
"zod": "catalog:"
},
"devDependencies": {
"@types/node": "catalog:",
"typescript": "catalog:"
}
}
17 changes: 17 additions & 0 deletions e2e/fixtures/agent-cancellation/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "NodeNext",
"moduleResolution": "NodeNext",
"outDir": "dist",
"rootDir": ".",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"declaration": true,
"noEmit": true,
"types": ["node"]
},
"include": ["agent/**/*.ts", "evals/**/*.ts", ".eve/**/*.d.ts"]
}
1 change: 1 addition & 0 deletions packages/eve/src/channel/cross-channel-receive.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type { Runtime } from "#channel/types.js";

function makeRuntime(): Runtime {
return {
cancelTurn: vi.fn(),
deliver: vi.fn(),
getEventStream: vi.fn(),
run: vi.fn(),
Expand Down
13 changes: 9 additions & 4 deletions packages/eve/src/channel/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ type WebSocketHeaders = Headers | readonly (readonly [string, string])[] | Recor

/**
* Second argument passed to every route handler. `send` starts or continues a
* session on this channel; `getSession` looks one up by id; `receive` hands
* inbound work to a different channel; `params` contains the matched path
* parameters; `waitUntil` keeps background work alive past the response;
* `requestIp` is the client IP, or `null` when the host cannot provide it.
* session on this channel; `cancelTurn` cancels its active turn; `getSession`
* looks one up by id; `receive` hands inbound work to a different channel;
* `params` contains the matched path parameters; `waitUntil` keeps background
* work alive past the response; `requestIp` is the client IP, or `null` when
* the host cannot provide it.
*/
export interface RouteHandlerArgs<TState = undefined> {
cancelTurn: CancelTurnFn;
send: SendFn<TState>;
getSession: GetSessionFn;
/**
Expand Down Expand Up @@ -50,6 +52,9 @@ export interface SendPayload {
readonly outputSchema?: JsonObject;
}

/** Cancels the active turn addressed by a channel-local continuation token. */
export type CancelTurnFn = (continuationToken: string) => Promise<void>;

/**
* Starts or continues a session on this channel. Accepts a plain string,
* `UserContent`, or a {@link SendPayload}, plus {@link SendOptions} (auth,
Expand Down
1 change: 1 addition & 0 deletions packages/eve/src/channel/schedule.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ function createMockRunHandle(): RunHandle {

function createMockRuntime(): Runtime {
return {
cancelTurn: vi.fn(),
deliver: vi.fn().mockRejectedValue(new Error("no parked session")),
run: vi.fn().mockResolvedValue(createMockRunHandle()),
getEventStream: vi.fn().mockResolvedValue(new ReadableStream<HandleMessageStreamEvent>()),
Expand Down
4 changes: 4 additions & 0 deletions packages/eve/src/channel/send.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ function createMockRunHandle(): RunHandle {

function createRuntime(deliverError: unknown): Runtime {
return {
cancelTurn: vi.fn(),
deliver: vi.fn().mockRejectedValue(deliverError),
run: vi.fn().mockResolvedValue(createMockRunHandle()),
getEventStream: vi.fn().mockResolvedValue(new ReadableStream<HandleMessageStreamEvent>()),
Expand Down Expand Up @@ -84,6 +85,7 @@ describe("createSendFn", () => {
it("forwards context through deliver and run payloads", async () => {
const context = ["thread background"];
const deliverRuntime: Runtime = {
cancelTurn: vi.fn(),
deliver: vi.fn().mockResolvedValue({ sessionId: "existing-session-id" }),
run: vi.fn().mockResolvedValue(createMockRunHandle()),
getEventStream: vi.fn().mockResolvedValue(new ReadableStream<HandleMessageStreamEvent>()),
Expand Down Expand Up @@ -115,6 +117,7 @@ describe("createSendFn", () => {

it("adds channel request ids to deliver and run inputs when provided", async () => {
const deliverRuntime: Runtime = {
cancelTurn: vi.fn(),
deliver: vi.fn().mockResolvedValue({ sessionId: "existing-session-id" }),
run: vi.fn().mockResolvedValue(createMockRunHandle()),
getEventStream: vi.fn().mockResolvedValue(new ReadableStream<HandleMessageStreamEvent>()),
Expand All @@ -141,6 +144,7 @@ describe("createSendFn", () => {
type: "object",
} as const;
const deliverRuntime: Runtime = {
cancelTurn: vi.fn(),
deliver: vi.fn().mockResolvedValue({ sessionId: "existing-session-id" }),
run: vi.fn().mockResolvedValue(createMockRunHandle()),
getEventStream: vi.fn().mockResolvedValue(new ReadableStream<HandleMessageStreamEvent>()),
Expand Down
5 changes: 5 additions & 0 deletions packages/eve/src/channel/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,11 @@ export interface RunHandle {
* Runtime interface consumed by routes and the subagent tool wrapper.
*/
export interface Runtime {
/**
* Cancels the active turn addressed by a continuation token.
*/
cancelTurn(continuationToken: string): Promise<void>;

/**
* Starts a new run from a flat platform-shape input.
*
Expand Down
1 change: 1 addition & 0 deletions packages/eve/src/cli/dev/tui/runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ function stubSession(): ClientSession {
/** Wraps literal stream events in a real `MessageResponse`. */
function messageResponseOf(events: readonly unknown[]): MessageResponse {
return new MessageResponse({
cancelTurn: async () => undefined,
continuationToken: "eve:test",
createStream: async function* () {
for (const event of events) yield event as HandleMessageStreamEvent;
Expand Down
14 changes: 14 additions & 0 deletions packages/eve/src/client/message-response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { MessageResult } from "#client/types.js";
* Internal configuration passed to construct a {@link MessageResponse}.
*/
interface MessageResponseInput {
readonly cancelTurn: () => Promise<void>;
readonly continuationToken?: string;
readonly createStream: () => AsyncGenerator<HandleMessageStreamEvent>;
readonly sessionId: string;
Expand All @@ -35,15 +36,28 @@ export class MessageResponse<TOutput = unknown> implements AsyncIterable<HandleM
readonly sessionId: string;

#consumed = false;
readonly #cancelTurn: () => Promise<void>;
readonly #createStream: () => AsyncGenerator<HandleMessageStreamEvent>;

/** @internal */
constructor(input: MessageResponseInput) {
this.#cancelTurn = input.cancelTurn;
this.continuationToken = input.continuationToken;
this.sessionId = input.sessionId;
this.#createStream = input.createStream;
}

/**
* Requests server-side cancellation of this response's active turn.
*
* This does not merely stop local stream consumption: the server aborts
* active model, tool, and descendant work and settles the session at
* `turn.cancelled` followed by `session.waiting`.
*/
async cancel(): Promise<void> {
await this.#cancelTurn();
}

/**
* Consumes the full event stream and returns the aggregated
* {@link MessageResult}.
Expand Down
Loading
Loading