diff --git a/.changeset/cross-channel-output-schema.md b/.changeset/cross-channel-output-schema.md new file mode 100644 index 000000000..7dfd27b5f --- /dev/null +++ b/.changeset/cross-channel-output-schema.md @@ -0,0 +1,5 @@ +--- +"eve": patch +--- + +Allow cross-channel `receive(...)` calls from routes and schedules to request turn-scoped structured output with Standard JSON Schema or raw JSON Schema. Fresh conversation deliveries also clear schemas retained by earlier failed turns while active runtime and HITL continuations preserve them. diff --git a/docs/channels/custom.mdx b/docs/channels/custom.mdx index a0c0d40c9..8ba4e9cba 100644 --- a/docs/channels/custom.mdx +++ b/docs/channels/custom.mdx @@ -48,7 +48,7 @@ Declare routes with the `POST()` and `GET()` helpers. Each route handler receive - `send(message, { auth, continuationToken, state? })` starts or resumes a session. Returns a `Session`. - `getSession(sessionId)` looks up an existing session. The returned `Session` exposes `getEventStream({ startIndex? })` for streaming. -- `receive(channel, ...)` hands inbound work to a different channel for cross-channel hand-off. +- `receive(channel, { message, target, auth, outputSchema? })` hands inbound work to a different channel for cross-channel hand-off. - `params` holds route parameters extracted from the path pattern. - `waitUntil(promise)` extends the request lifetime for background work. - `requestIp` is the client IP, or `null` when the host cannot provide it. @@ -102,8 +102,14 @@ Route handlers can start a session on a different channel via `args.receive(chan ```ts import { defineChannel, POST } from "eve/channels"; +import { z } from "zod"; import slack from "./slack.js"; +const investigationSchema = z.object({ + summary: z.string(), + severity: z.enum(["low", "medium", "high"]), +}); + export default defineChannel({ routes: [ POST("/incident", async (req, args) => { @@ -112,6 +118,7 @@ export default defineChannel({ args.waitUntil( args.receive(slack, { message: `Investigate ${incident.reference}: ${incident.title}`, + outputSchema: investigationSchema, target: { channelId: "C0123ABC" }, auth: { authenticator: "incidentio", @@ -130,11 +137,15 @@ export default defineChannel({ Semantics: -- The target channel's authored `receive(input, { send })` hook owns the continuation-token format and initial state. Callers supply only `{ message, target, auth }`. +- The target channel's authored `receive(input, { send })` hook owns the continuation-token format and initial state. Callers supply `{ message, target, auth, outputSchema? }`. +- `outputSchema` accepts Standard JSON Schema (for example, Zod 4) or raw JSON Schema. eve applies it to the target's framework-provided `send`, so the target channel does not need schema-specific forwarding. The schema applies only to this turn; successful structured output emits `result.completed`. +- `receive(...)` still resolves to a `Session`, not the structured value. Read the result asynchronously from `result.completed` in an agent hook or the session stream. - `auth` flows through to `session.auth.initiator` so the target's event handlers and the agent's tools can read who started the session. - Calling `args.receive(...)` does not also start a session on the current channel. The inbound channel's response is whatever the route handler returns explicitly. - The first argument is the target channel module's default export. Import it directly from `agent/channels/.ts`. Identity is matched by reference. +See [Output Schema](../guides/client/output-schema) for schema formats, result events, and per-turn behavior. + ## Channel metadata A channel can project a subset of its adapter state as metadata, available to instrumentation resolvers, dynamic tool resolvers, and dynamic skill or instruction resolvers. Define a `metadata(state)` function on the channel config: diff --git a/docs/guides/client/output-schema.mdx b/docs/guides/client/output-schema.mdx index 36a9207c0..cc485a6b9 100644 --- a/docs/guides/client/output-schema.mdx +++ b/docs/guides/client/output-schema.mdx @@ -42,9 +42,9 @@ console.log(result.data?.count); `result.data` is `undefined` when the turn did not produce a structured result. -## Standard Schema +## Standard JSON Schema -The client also accepts Standard Schema implementations such as Zod, Valibot, and ArkType. The schema is lowered to JSON Schema before the request is sent: +The client also accepts schemas that implement [Standard JSON Schema](https://standardschema.dev/json-schema), such as Zod 4. The schema is lowered to plain JSON Schema before the request is sent. Libraries that implement only Standard Schema validation need their JSON Schema adapter first; for example, Valibot provides `toStandardJsonSchema` through `@valibot/to-json-schema`. ```ts import { z } from "zod"; diff --git a/docs/schedules.mdx b/docs/schedules.mdx index 2a3ecca97..bae4f327b 100644 --- a/docs/schedules.mdx +++ b/docs/schedules.mdx @@ -77,12 +77,44 @@ export default defineSchedule({ }); ``` -- `receive(channel, { message, target, auth })`: starts a session on another channel. Same contract as a route handler's `args.receive`. +- `receive(channel, { message, target, auth, outputSchema? })`: starts a session on another channel. Same contract as a route handler's `args.receive`; `outputSchema` accepts Standard JSON Schema (for example, Zod 4) or raw JSON Schema for this turn. - `waitUntil(promise)`: extends the cron task's lifetime so the parked session and any in-flight fetches settle before the task ends. Wrap the `receive` call in it. - `appAuth`: the app principal (`{ authenticator: "app", principalId: "eve:app", principalType: "runtime" }`). Pass it as `receive(..., { auth: appAuth })` for work the agent does on its own behalf. A handler-form session runs on the same durable runtime engine as any other session, so it can park (durably suspend), for instance when the channel handoff is waiting for a Slack reply. Only markdown task mode is barred from waiting. +### Structured handoffs + +Pass `outputSchema` when a scheduled handoff needs a structured result: + +```ts title="agent/schedules/daily-assessment.ts" +import { defineSchedule } from "eve/schedules"; +import { z } from "zod"; + +import slack from "../channels/slack.js"; + +const assessmentSchema = z.object({ + summary: z.string(), + priority: z.enum(["low", "medium", "high"]), +}); + +export default defineSchedule({ + cron: "0 9 * * 1-5", + run({ receive, waitUntil, appAuth }) { + waitUntil( + receive(slack, { + message: "Assess today's queue and summarize the priorities.", + target: { channelId: "C0123ABC" }, + auth: appAuth, + outputSchema: assessmentSchema, + }), + ); + }, +}); +``` + +Adding a schema does not change the target's run mode or durability. The schema applies only to the delivered turn, and successful structured completion emits `result.completed` for an [agent hook](./guides/hooks) or session event consumer. `receive(...)` still resolves to a `Session`, so read the structured value asynchronously from that event rather than from the return value. + ## Trigger a schedule while iterating The dev server mounts a one-shot dispatch route that fires a schedule by name, out of band, exactly once. Since `eve dev` never runs schedules on their cron cadence, this is how you trigger one without waiting for the next production tick. diff --git a/e2e/fixtures/agent-channels/agent/channels/webhook.ts b/e2e/fixtures/agent-channels/agent/channels/webhook.ts index 13e80fd91..a2ed1addb 100644 --- a/e2e/fixtures/agent-channels/agent/channels/webhook.ts +++ b/e2e/fixtures/agent-channels/agent/channels/webhook.ts @@ -13,8 +13,9 @@ export default defineChannel({ const body = (await req.json().catch(() => ({}))) as { message?: string; sessionRef?: string; + structured?: boolean; }; - const session = await args.receive(target, { + const options = { message: body.message ?? "Reply with the single word: hello.", target: { sessionRef: body.sessionRef ?? crypto.randomUUID() }, auth: { @@ -23,7 +24,21 @@ export default defineChannel({ principalId: "smoke-test", principalType: "service", }, - }); + } as const; + const session = body.structured + ? await args.receive(target, { + ...options, + outputSchema: { + additionalProperties: false, + properties: { + count: { type: "integer" }, + title: { type: "string" }, + }, + required: ["title", "count"], + type: "object", + }, + }) + : await args.receive(target, options); return Response.json({ ok: true, sessionId: session.id }); }), ], diff --git a/e2e/fixtures/agent-channels/evals/custom-channels/cross-channel-output-schema.eval.ts b/e2e/fixtures/agent-channels/evals/custom-channels/cross-channel-output-schema.eval.ts new file mode 100644 index 000000000..12cb5e0c7 --- /dev/null +++ b/e2e/fixtures/agent-channels/evals/custom-channels/cross-channel-output-schema.eval.ts @@ -0,0 +1,54 @@ +import { defineEval } from "eve/evals"; + +import { postChannel } from "./shared.js"; + +/** Structured-output smoke for the cross-channel `args.receive` handoff. */ +export default defineEval({ + description: "Custom channel smoke: structured cross-channel receive.", + + async test(t) { + const payload = await postChannel<{ ok: boolean; sessionId?: string }>(t.target, "/webhook", { + message: + 'Return exactly one structured result with title "handoff" and count 1. Do not answer in prose.', + structured: true, + }); + if (payload.ok !== true || typeof payload.sessionId !== "string") { + throw new Error(`Unexpected webhook response: ${JSON.stringify(payload)}`); + } + + const session = await t.target.attachSession(payload.sessionId); + const results = session.events.filter((event) => event.type === "result.completed"); + if (results.length !== 1) { + const failures = session.events.filter( + (event) => + event.type === "session.failed" || + event.type === "turn.failed" || + event.type === "step.failed", + ); + throw new Error( + `Expected one result.completed event, received ${results.length}. ` + + `Observed events: ${session.events.map((event) => event.type).join(", ")}. ` + + `Failures: ${JSON.stringify(failures)}.`, + ); + } + + const result = results[0]?.data.result; + if ( + !isRecord(result) || + typeof result.title !== "string" || + typeof result.count !== "number" || + !Number.isInteger(result.count) || + Object.keys(result).some((key) => key !== "count" && key !== "title") || + Object.keys(result).length !== 2 + ) { + throw new Error(`Unexpected structured result: ${JSON.stringify(result)}`); + } + + t.didNotFail(); + t.completed(); + }, +}); + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} diff --git a/packages/eve/src/channel/cross-channel-receive.test.ts b/packages/eve/src/channel/cross-channel-receive.test.ts index 30c2e45f6..a8f1a61d7 100644 --- a/packages/eve/src/channel/cross-channel-receive.test.ts +++ b/packages/eve/src/channel/cross-channel-receive.test.ts @@ -1,4 +1,6 @@ import { describe, expect, it, vi } from "vitest"; +import { z } from "zod"; +import type { StandardJSONSchemaV1 } from "#compiled/@standard-schema/spec/index.js"; import { CHANNEL_SENTINEL, type CompiledChannel } from "#channel/compiled-channel.js"; import { @@ -6,7 +8,9 @@ import { type CrossChannelTarget, } from "#channel/cross-channel-receive.js"; import type { Session } from "#channel/session.js"; -import type { Runtime } from "#channel/types.js"; +import type { RunHandle, Runtime } from "#channel/types.js"; +import { RuntimeNoActiveSessionError } from "#execution/runtime-errors.js"; +import type { HandleMessageStreamEvent } from "#protocol/message.js"; function makeRuntime(): Runtime { return { @@ -26,6 +30,14 @@ function makeSession(): Session { }; } +function makeRunHandle(): RunHandle { + return { + continuationToken: "target:thread", + events: new ReadableStream(), + sessionId: "new-session-id", + }; +} + function makeChannel(name: string): { target: CrossChannelTarget; receive: ReturnType; @@ -50,6 +62,37 @@ function makeChannel(name: string): { }; } +function makeForwardingChannel(name: string, channelOutputSchema?: { readonly type: string }) { + type ReceiveFn = NonNullable; + const receive = vi.fn(async (input, { send }) => + send( + channelOutputSchema === undefined + ? input.message + : { message: input.message, outputSchema: channelOutputSchema }, + { + auth: input.auth, + continuationToken: "thread", + }, + ), + ); + const definition: CompiledChannel = { + __kind: CHANNEL_SENTINEL, + routes: [{ method: "POST", path: `/${name}`, handler: async () => new Response("ok") }], + adapter: { kind: `channel:${name}` }, + receive, + }; + return { + target: { + name, + definition, + receive, + adapter: definition.adapter, + }, + receive, + definition, + }; +} + describe("createCrossChannelReceiveFn", () => { it("delegates to the target channel's receive with a per-target send", async () => { const slack = makeChannel("slack"); @@ -72,6 +115,254 @@ describe("createCrossChannelReceiveFn", () => { expect(typeof ctx.send).toBe("function"); }); + it("normalizes and injects Standard JSON Schema when resuming a session", async () => { + const normalizedSchema = { + additionalProperties: false, + properties: { summary: { type: "string" } }, + required: ["summary"], + type: "object", + } as const; + const inputConverter = vi.fn(() => ({ type: "string" })); + const outputConverter = vi.fn(() => normalizedSchema); + const outputSchema: StandardJSONSchemaV1 = { + "~standard": { + jsonSchema: { input: inputConverter, output: outputConverter }, + vendor: "test", + version: 1, + }, + }; + const runtime = makeRuntime(); + vi.mocked(runtime.deliver).mockResolvedValue({ sessionId: "existing-session-id" }); + const target = makeForwardingChannel("target"); + const fn = createCrossChannelReceiveFn(runtime, [target.target]); + + await fn(target.definition, { + auth: null, + message: "assess this", + outputSchema, + target: {}, + }); + + expect(outputConverter).toHaveBeenCalledWith({ target: "draft-07" }); + expect(inputConverter).not.toHaveBeenCalled(); + expect(target.receive.mock.calls[0]![0]).toEqual({ + auth: null, + message: "assess this", + target: {}, + }); + expect(vi.mocked(runtime.deliver).mock.calls[0]![0].payload.outputSchema).toEqual( + normalizedSchema, + ); + }); + + it("accepts and normalizes a Zod schema through the public receive type", async () => { + const runtime = makeRuntime(); + vi.mocked(runtime.deliver).mockResolvedValue({ sessionId: "existing-session-id" }); + const target = makeForwardingChannel("target"); + const fn = createCrossChannelReceiveFn(runtime, [target.target]); + + await fn(target.definition, { + auth: null, + message: "assess this", + outputSchema: z.object({ + summary: z.string(), + verdict: z.enum(["approve", "reject"]), + }), + target: {}, + }); + + expect(vi.mocked(runtime.deliver).mock.calls[0]![0].payload.outputSchema).toEqual({ + $schema: "http://json-schema.org/draft-07/schema#", + additionalProperties: false, + properties: { + summary: { type: "string" }, + verdict: { enum: ["approve", "reject"], type: "string" }, + }, + required: ["summary", "verdict"], + type: "object", + }); + }); + + it("rejects unsupported Standard JSON Schema before invoking the target receive hook", async () => { + const outputSchema: StandardJSONSchemaV1 = { + "~standard": { + jsonSchema: { + input: () => ({ type: "object" }), + output: () => { + throw new Error("draft-07 output conversion is unsupported"); + }, + }, + vendor: "test", + version: 1, + }, + }; + const runtime = makeRuntime(); + const target = makeChannel("target"); + const fn = createCrossChannelReceiveFn(runtime, [target.target]); + + await expect( + fn(target.definition, { + auth: null, + message: "assess this", + outputSchema, + target: {}, + }), + ).rejects.toThrow("draft-07 output conversion is unsupported"); + + expect(target.receive).not.toHaveBeenCalled(); + expect(runtime.deliver).not.toHaveBeenCalled(); + expect(runtime.run).not.toHaveBeenCalled(); + }); + + it("injects a raw output schema when starting a new session", async () => { + const outputSchema = { + properties: { verdict: { type: "string" } }, + required: ["verdict"], + type: "object", + } as const; + const runtime = makeRuntime(); + vi.mocked(runtime.deliver).mockRejectedValue(new RuntimeNoActiveSessionError("target:thread")); + vi.mocked(runtime.run).mockResolvedValue(makeRunHandle()); + const target = makeForwardingChannel("target"); + const fn = createCrossChannelReceiveFn(runtime, [target.target]); + + await fn(target.definition, { + auth: null, + message: "assess this", + outputSchema, + target: {}, + }); + + expect(vi.mocked(runtime.run).mock.calls[0]![0]).toMatchObject({ + continuationToken: "target:thread", + input: { message: "assess this", outputSchema }, + mode: "conversation", + }); + }); + + it("injects a raw output schema when resuming a session", async () => { + const outputSchema = { + properties: { verdict: { type: "string" } }, + required: ["verdict"], + type: "object", + } as const; + const runtime = makeRuntime(); + vi.mocked(runtime.deliver).mockResolvedValue({ sessionId: "existing-session-id" }); + const target = makeForwardingChannel("target"); + const fn = createCrossChannelReceiveFn(runtime, [target.target]); + + await fn(target.definition, { + auth: null, + message: "retry this assessment", + outputSchema, + target: {}, + }); + + expect(vi.mocked(runtime.deliver).mock.calls[0]![0]).toMatchObject({ + continuationToken: "target:thread", + payload: { message: "retry this assessment", outputSchema }, + }); + expect(runtime.run).not.toHaveBeenCalled(); + }); + + it("overrides a target channel's send schema with the outer schema", async () => { + const channelSchema = { type: "string" } as const; + const outputSchema = { type: "object" } as const; + const runtime = makeRuntime(); + vi.mocked(runtime.deliver).mockResolvedValue({ sessionId: "existing-session-id" }); + const target = makeForwardingChannel("target", channelSchema); + const fn = createCrossChannelReceiveFn(runtime, [target.target]); + + await fn(target.definition, { auth: null, message: "assess this", outputSchema, target: {} }); + + expect(vi.mocked(runtime.deliver).mock.calls[0]![0].payload.outputSchema).toEqual(outputSchema); + }); + + it("preserves a target channel's send schema when no outer schema is provided", async () => { + const channelSchema = { type: "string" } as const; + const runtime = makeRuntime(); + vi.mocked(runtime.deliver).mockResolvedValue({ sessionId: "existing-session-id" }); + const target = makeForwardingChannel("target", channelSchema); + const fn = createCrossChannelReceiveFn(runtime, [target.target]); + + await fn(target.definition, { auth: null, message: "plain", target: {} }); + + expect(vi.mocked(runtime.deliver).mock.calls[0]![0].payload.outputSchema).toEqual( + channelSchema, + ); + }); + + it("keeps schemas isolated between concurrent receive invocations", async () => { + const runtime = makeRuntime(); + vi.mocked(runtime.deliver).mockResolvedValue({ sessionId: "existing-session-id" }); + const target = makeForwardingChannel("target"); + const fn = createCrossChannelReceiveFn(runtime, [target.target]); + const firstSchema = { properties: { first: { type: "string" } }, type: "object" } as const; + const secondSchema = { properties: { second: { type: "number" } }, type: "object" } as const; + + await Promise.all([ + fn(target.definition, { + auth: null, + message: "first", + outputSchema: firstSchema, + target: {}, + }), + fn(target.definition, { + auth: null, + message: "second", + outputSchema: secondSchema, + target: {}, + }), + ]); + + const payloads = vi.mocked(runtime.deliver).mock.calls.map(([input]) => input.payload); + expect(payloads).toEqual([ + expect.objectContaining({ message: "first", outputSchema: firstSchema }), + expect.objectContaining({ message: "second", outputSchema: secondSchema }), + ]); + }); + + it("does not carry an outer schema into a later schema-free receive", async () => { + const runtime = makeRuntime(); + vi.mocked(runtime.deliver).mockResolvedValue({ sessionId: "existing-session-id" }); + const target = makeForwardingChannel("target"); + const fn = createCrossChannelReceiveFn(runtime, [target.target]); + + await fn(target.definition, { + auth: null, + message: "structured", + outputSchema: { type: "object" }, + target: {}, + }); + await fn(target.definition, { auth: null, message: "plain", target: {} }); + + expect(vi.mocked(runtime.deliver).mock.calls[0]![0].payload.outputSchema).toEqual({ + type: "object", + }); + expect(vi.mocked(runtime.deliver).mock.calls[1]![0].payload.outputSchema).toBeUndefined(); + }); + + it("injects the outer schema into structured user content", async () => { + const outputSchema = { type: "object" } as const; + const message = [{ text: "inspect this image", type: "text" as const }]; + const runtime = makeRuntime(); + vi.mocked(runtime.deliver).mockResolvedValue({ sessionId: "existing-session-id" }); + const target = makeForwardingChannel("target"); + const fn = createCrossChannelReceiveFn(runtime, [target.target]); + + await fn(target.definition, { + auth: null, + message, + outputSchema, + target: {}, + }); + + expect(vi.mocked(runtime.deliver).mock.calls[0]![0].payload).toMatchObject({ + message, + outputSchema, + }); + }); + it("resolves the target by reference identity even when multiple channels are registered", async () => { const slack = makeChannel("slack"); const twilio = makeChannel("twilio"); diff --git a/packages/eve/src/channel/cross-channel-receive.ts b/packages/eve/src/channel/cross-channel-receive.ts index f64f20e39..1a4a201b2 100644 --- a/packages/eve/src/channel/cross-channel-receive.ts +++ b/packages/eve/src/channel/cross-channel-receive.ts @@ -1,4 +1,5 @@ import type { UserContent } from "ai"; +import type { StandardJSONSchemaV1 } from "#compiled/@standard-schema/spec/index.js"; import type { ChannelAdapter } from "#channel/adapter.js"; import { isCompiledChannel, type CompiledChannel } from "#channel/compiled-channel.js"; @@ -7,17 +8,22 @@ import { createSendFn } from "#channel/send.js"; import type { Session } from "#channel/session.js"; import type { Runtime, SessionAuthContext } from "#channel/types.js"; import type { ResolvedChannelDefinition } from "#runtime/types.js"; +import { normalizeJsonSchemaDefinition } from "#shared/json-schema.js"; +import type { JsonObject } from "#shared/json.js"; + +type OutputSchemaDefinition = StandardJSONSchemaV1 | JsonObject; /** - * Options accepted by {@link CrossChannelReceiveFn}. Mirrors the input - * argument of a channel's authored `receive(input, { send })` hook — - * the runtime constructs `send` internally so route-handler callers - * only supply the platform target, payload, and auth. + * Options accepted by {@link CrossChannelReceiveFn}. Includes the fields from + * a channel's authored `receive(input, { send })` input plus optional + * framework-managed turn policy. The runtime constructs `send` internally. */ export interface CrossChannelReceiveOptions> { readonly message: string | UserContent; readonly target: TTarget; readonly auth: SessionAuthContext | null; + /** Standard JSON Schema or raw JSON Schema required for this turn's result. */ + readonly outputSchema?: OutputSchemaDefinition; } /** @@ -79,9 +85,14 @@ export function createCrossChannelReceiveFn( ): CrossChannelReceiveFn { return async (channel, options) => { const targetChannel = resolveTargetByReference(channel, channels); + const outputSchema = + options.outputSchema === undefined + ? undefined + : normalizeJsonSchemaDefinition(options.outputSchema, "output"); return await invokeChannelReceive({ runtime, target: targetChannel, + outputSchema, input: { message: options.message as string, target: options.target as Readonly>, @@ -99,6 +110,7 @@ export function createCrossChannelReceiveFn( interface InvokeChannelReceiveInput { readonly runtime: Runtime; readonly target: Pick; + readonly outputSchema?: JsonObject; readonly input: { readonly message: string; readonly target: Readonly>; @@ -121,7 +133,18 @@ export async function invokeChannelReceive(args: InvokeChannelReceiveInput): Pro if (!args.target.adapter) { throw new Error(args.describeMissingAdapter()); } - const send = createSendFn(args.runtime, args.target.adapter, args.target.name); + const baseSend = createSendFn(args.runtime, args.target.adapter, args.target.name); + const outputSchema = args.outputSchema; + const send: typeof baseSend = + outputSchema === undefined + ? baseSend + : (input, options) => + baseSend( + typeof input === "string" || Array.isArray(input) + ? { message: input, outputSchema } + : { ...input, outputSchema }, + options, + ); return await args.target.receive(args.input, { send }); } diff --git a/packages/eve/src/channel/schedule.test.ts b/packages/eve/src/channel/schedule.test.ts index 549bad23f..9cc596f01 100644 --- a/packages/eve/src/channel/schedule.test.ts +++ b/packages/eve/src/channel/schedule.test.ts @@ -93,6 +93,11 @@ describe("ScheduleDispatcher", () => { describe("run handler form", () => { it("invokes the author's run() with { receive, waitUntil, appAuth }", async () => { const runtime = createMockRuntime(); + const outputSchema = { + properties: { summary: { type: "string" } }, + required: ["summary"], + type: "object", + } as const; vi.stubEnv("SLACK_BOT_TOKEN", "xoxb-test"); vi.stubEnv("SLACK_SIGNING_SECRET", "test-secret"); try { @@ -111,6 +116,7 @@ describe("ScheduleDispatcher", () => { observed.hasWaitUntil = typeof waitUntil === "function"; await receive(definition, { message: "post the digest", + outputSchema, target: { channelId: "C0123ABC" }, auth: appAuth, }); @@ -125,6 +131,7 @@ describe("ScheduleDispatcher", () => { const runInput = vi.mocked(runtime.run).mock.calls[0]![0]; expect(runInput.continuationToken).toBe("slack:C0123ABC:"); expect(runInput.auth).toEqual(SCHEDULE_APP_AUTH); + expect(runInput.input.outputSchema).toEqual(outputSchema); } finally { vi.unstubAllEnvs(); } diff --git a/packages/eve/src/client/types.ts b/packages/eve/src/client/types.ts index bb15fe479..11a733d7d 100644 --- a/packages/eve/src/client/types.ts +++ b/packages/eve/src/client/types.ts @@ -150,8 +150,8 @@ export interface SendTurnPayload { /** * Optional schema the harness must satisfy before this turn terminates. * - * The client lowers Standard Schema implementations (Zod, Valibot, - * ArkType, etc.) to JSON Schema before sending the request. The server is + * The client lowers Standard JSON Schema implementations (for example, + * Zod 4) to plain JSON Schema before sending the request. The server is * authoritative for validation; {@link MessageResult.data} is typed to this * schema's output type and is not revalidated client-side. */ diff --git a/packages/eve/src/execution/workflow-steps.test.ts b/packages/eve/src/execution/workflow-steps.test.ts index 9cbbf7143..83c226f17 100644 --- a/packages/eve/src/execution/workflow-steps.test.ts +++ b/packages/eve/src/execution/workflow-steps.test.ts @@ -7,6 +7,7 @@ import { ContextKey } from "#context/key.js"; import { AuthKey, ContinuationTokenKey, ModeKey, SessionIdKey } from "#context/keys.js"; import { BundleKey, ChannelKey } from "#runtime/sessions/runtime-context-keys.js"; import { serializeContext } from "#context/serialize.js"; +import { setPendingInputBatch } from "#harness/input-requests.js"; import { setPendingRuntimeActionBatch } from "#harness/runtime-actions.js"; import type { HarnessSession, StepResult } from "#harness/types.js"; import { createEmptyHookRegistry } from "#runtime/hooks/registry.js"; @@ -131,7 +132,11 @@ const threadContextAdapter: ChannelAdapter = { const thread = adapterCtx.ctx.ensure(ThreadKey, () => "unset"); const message = payload.message ?? ""; - return { message: `thread=${thread}; user=${message}` }; + return { + inputResponses: payload.inputResponses, + message: `thread=${thread}; user=${message}`, + outputSchema: payload.outputSchema, + }; }, }; @@ -521,6 +526,79 @@ describe("turnStep", () => { expect(second.serializedContext[ThreadKey.name]).toBe("alpha"); }); + it("does not let stale input responses leak a failed turn's schema into the next delivery", async () => { + const outputSchema = { + properties: { assessment: { type: "string" } }, + required: ["assessment"], + type: "object", + } as const; + const initialSession = createStubSession({ outputSchema }); + const failedSession = createStubSession({ outputSchema }); + installSessionStoreMocks([initialSession, failedSession]); + + const compiledBundle = { + adapterRegistry: { + adaptersByKind: new Map([[threadContextAdapter.kind, threadContextAdapter]]), + }, + compiledArtifactsSource: {} as never, + graph: { + nodesByNodeId: new Map(), + root: { + sandboxRegistry: { sandbox: null }, + turnAgent: TestTurnAgent, + }, + }, + moduleMap: { nodes: {} }, + hookRegistry: createEmptyHookRegistry(), + resolvedAgent: { config: {} }, + subagentRegistry: {}, + toolRegistry: {}, + turnAgent: TestTurnAgent, + } as never; + vi.mocked(getCompiledRuntimeAgentBundle).mockResolvedValue(compiledBundle); + + const observedSchemas: Array = []; + let invocationCount = 0; + vi.mocked(createExecutionNodeStep).mockImplementation(() => { + return async (session): Promise => { + observedSchemas.push(session.outputSchema); + invocationCount += 1; + return invocationCount === 1 + ? { next: null, session: failedSession } + : { next: { done: true, output: "plain follow-up" }, session }; + }; + }); + + const parentWritable = createTestWritable(); + const first = await turnStep({ + input: { + kind: "deliver", + payloads: [{ message: "assess this", outputSchema }], + }, + parentWritable, + serializedContext: createSerializedContext(), + sessionState: createStubSessionState(), + }); + expect(first.action).toBe("park"); + + await turnStep({ + input: { + kind: "deliver", + payloads: [ + { + inputResponses: [{ optionId: "approve", requestId: "stale-approval" }], + message: "answer normally", + }, + ], + }, + parentWritable, + serializedContext: first.serializedContext, + sessionState: first.sessionState, + }); + + expect(observedSchemas).toEqual([outputSchema, undefined]); + }); + it("refreshes the system prompt from the current bundled deployment", async () => { const session = createStubSession({ agent: { @@ -892,7 +970,7 @@ describe("resolveEffectiveOutputSchema", () => { it("uses a run-scoped schema in either mode", () => { for (const mode of ["conversation", "task"] as const) { - const session = createStubSession(); + const session = createStubSession({ outputSchema: agentSchema }); const resolved = resolveEffectiveOutputSchema({ agentOutputSchema: agentSchema, input: { outputSchema: runSchema }, @@ -932,4 +1010,149 @@ describe("resolveEffectiveOutputSchema", () => { }); expect(resolved).toBe(session); }); + + it("preserves the in-effect schema on a runtime-action continuation", () => { + const session = createStubSession({ outputSchema: runSchema }); + const resolved = resolveEffectiveOutputSchema({ + agentOutputSchema: agentSchema, + input: { runtimeActionResults: [] }, + mode: "conversation", + session, + }); + expect(resolved).toBe(session); + }); + + it("clears a failed turn's schema from the next plain conversation delivery", () => { + const resolved = resolveEffectiveOutputSchema({ + agentOutputSchema: agentSchema, + input: { message: "try again without structured output" }, + mode: "conversation", + session: createStubSession({ outputSchema: runSchema }), + }); + expect(resolved.outputSchema).toBeUndefined(); + }); + + it("treats an empty input-response array as a fresh conversation delivery", () => { + const resolved = resolveEffectiveOutputSchema({ + agentOutputSchema: agentSchema, + input: { inputResponses: [], message: "try again without structured output" }, + mode: "conversation", + session: createStubSession({ outputSchema: runSchema }), + }); + expect(resolved.outputSchema).toBeUndefined(); + }); + + it("preserves the caller's run schema across a conversation-mode HITL response", () => { + const session = createPendingInputSession(createStubSession({ outputSchema: runSchema })); + const resolved = resolveEffectiveOutputSchema({ + agentOutputSchema: agentSchema, + input: { inputResponses: [{ optionId: "approve", requestId: "approval-1" }] }, + mode: "conversation", + session, + }); + expect(resolved).toBe(session); + }); + + it("preserves the caller's run schema across a task-mode HITL response", () => { + const session = createPendingInputSession(createStubSession({ outputSchema: runSchema })); + const resolved = resolveEffectiveOutputSchema({ + agentOutputSchema: agentSchema, + input: { inputResponses: [{ optionId: "approve", requestId: "approval-1" }] }, + mode: "task", + session, + }); + expect(resolved).toBe(session); + }); + + it("preserves the caller's run schema when a message resolves pending HITL", () => { + const session = createPendingInputSession(createStubSession({ outputSchema: runSchema })); + const resolved = resolveEffectiveOutputSchema({ + agentOutputSchema: agentSchema, + input: { message: "continue without approving" }, + mode: "conversation", + session, + }); + expect(resolved).toBe(session); + }); + + it("clears a failed turn's schema when non-empty input responses are stale", () => { + const resolved = resolveEffectiveOutputSchema({ + agentOutputSchema: agentSchema, + input: { + inputResponses: [{ optionId: "approve", requestId: "stale-approval" }], + message: "start a fresh turn", + }, + mode: "conversation", + session: createStubSession({ outputSchema: runSchema }), + }); + expect(resolved.outputSchema).toBeUndefined(); + }); + + it("clears a failed turn's schema for stale response-only input", () => { + const resolved = resolveEffectiveOutputSchema({ + agentOutputSchema: agentSchema, + input: { + inputResponses: [{ text: "late reply", requestId: "stale-question" }], + }, + mode: "conversation", + session: createStubSession({ outputSchema: runSchema }), + }); + expect(resolved.outputSchema).toBeUndefined(); + }); + + it("preserves the in-flight policy for a pending context-only delivery", () => { + const session = createPendingInputSession(createStubSession({ outputSchema: runSchema })); + const resolved = resolveEffectiveOutputSchema({ + agentOutputSchema: agentSchema, + input: { context: ["delivery context"] }, + mode: "task", + session, + }); + expect(resolved).toBe(session); + }); + + it("replaces stale task state with the agent schema on a fresh task delivery", () => { + const resolved = resolveEffectiveOutputSchema({ + agentOutputSchema: agentSchema, + input: { message: "run the task" }, + mode: "task", + session: createStubSession({ outputSchema: runSchema }), + }); + expect(resolved.outputSchema).toEqual(agentSchema); + }); + + it("clears stale task state on a fresh delivery when the agent has no schema", () => { + const resolved = resolveEffectiveOutputSchema({ + agentOutputSchema: undefined, + input: { message: "run the task" }, + mode: "task", + session: createStubSession({ outputSchema: runSchema }), + }); + expect(resolved.outputSchema).toBeUndefined(); + }); }); + +function createPendingInputSession(session: HarnessSession): HarnessSession { + return setPendingInputBatch({ + requests: [ + { + action: { + callId: "approval-call", + input: {}, + kind: "tool-call", + toolName: "protected-action", + }, + allowFreeform: false, + display: "confirmation", + options: [ + { id: "approve", label: "Approve" }, + { id: "deny", label: "Deny" }, + ], + prompt: "Approve the protected action?", + requestId: "approval-1", + }, + ], + responseMessages: [], + session, + }); +} diff --git a/packages/eve/src/execution/workflow-steps.ts b/packages/eve/src/execution/workflow-steps.ts index 682e7287e..f539dc0a5 100644 --- a/packages/eve/src/execution/workflow-steps.ts +++ b/packages/eve/src/execution/workflow-steps.ts @@ -439,8 +439,11 @@ export function reconcileSessionContinuationToken( * wins. With no run-scoped schema, a task run adopts the agent's declared * return schema — its function-output contract, which only applies when the * agent is invoked as a function (subagent / schedule / job), i.e. task mode. - * A conversation run with no run-scoped schema enforces nothing. Continuation - * steps (no new `StepInput`) preserve whatever is already in effect. + * A conversation run with no run-scoped schema enforces nothing. Fresh + * conversation deliveries replace the previous turn's policy. Runtime-owned + * continuation steps and pending HITL resolution preserve whatever is already + * in effect because their callers cannot attach the original run-scoped + * schema again. */ export function resolveEffectiveOutputSchema(input: { readonly agentOutputSchema: JsonObject | undefined; @@ -454,11 +457,21 @@ export function resolveEffectiveOutputSchema(input: { return { ...session, outputSchema: stepInput.outputSchema }; } - if (mode === "task" && session.outputSchema === undefined && agentOutputSchema !== undefined) { + if ( + stepInput === undefined || + stepInput.runtimeActionResults !== undefined || + hasPendingInputBatch(session.state) + ) { + return session; + } + + if (mode === "task" && agentOutputSchema !== undefined) { return { ...session, outputSchema: agentOutputSchema }; } - return session; + if (session.outputSchema === undefined) return session; + const { outputSchema: _outputSchema, ...withoutOutputSchema } = session; + return withoutOutputSchema; } const log = createLogger("execution.workflow-entry"); diff --git a/packages/eve/src/harness/tool-loop.test.ts b/packages/eve/src/harness/tool-loop.test.ts index 8d2983dbb..eefdd87d6 100644 --- a/packages/eve/src/harness/tool-loop.test.ts +++ b/packages/eve/src/harness/tool-loop.test.ts @@ -856,6 +856,8 @@ describe("createToolLoopHarness", () => { type: "step.failed", }), ); + expect(events.some((event) => event.type === "result.completed")).toBe(false); + expect(result.session.outputSchema).toEqual({ type: "object" }); }); it("returns only the final assistant reply when a completed task step includes tool work", async () => { diff --git a/packages/eve/src/harness/types.ts b/packages/eve/src/harness/types.ts index 651036872..48aa12425 100644 --- a/packages/eve/src/harness/types.ts +++ b/packages/eve/src/harness/types.ts @@ -93,8 +93,10 @@ export interface StepInput { */ readonly context?: readonly string[]; /** - * Run-scoped schema that replaces the session's current output schema when - * present. Omitted continuations keep the existing schema. + * Run-scoped schema that replaces the session's current output schema for + * this turn. Runtime-owned continuations and pending HITL resolution preserve + * it; a fresh delivery that omits it clears any schema retained by an earlier + * failed turn. */ readonly outputSchema?: JsonObject; /** diff --git a/packages/eve/src/internal/testing/scenario-apps/cross-channel-output-schema-portability.ts b/packages/eve/src/internal/testing/scenario-apps/cross-channel-output-schema-portability.ts new file mode 100644 index 000000000..ae348579d --- /dev/null +++ b/packages/eve/src/internal/testing/scenario-apps/cross-channel-output-schema-portability.ts @@ -0,0 +1,71 @@ +import type { ScenarioAppDescriptor } from "#internal/testing/scenario-app.js"; + +export const CROSS_CHANNEL_OUTPUT_SCHEMA_PORTABILITY_DESCRIPTOR: ScenarioAppDescriptor = { + files: { + "agent/channels/source.ts": `import { defineChannel, POST } from "eve/channels"; + +import target, { summarySchema } from "./target.js"; + +export default defineChannel({ + routes: [ + POST("/api/handoff", async (req, { receive }) => { + const body = (await req.json()) as { message: string; threadId: string }; + const session = await receive(target, { + auth: null, + message: body.message, + outputSchema: summarySchema, + target: { threadId: body.threadId }, + }); + return Response.json({ sessionId: session.id }); + }), + ], +}); +`, + "agent/channels/target.ts": `import { defineChannel, POST } from "eve/channels"; + +export const summarySchema = { + "~standard": { + version: 1, + vendor: "portability-test", + jsonSchema: { + input: () => ({ type: "object" }), + output: () => ({ + type: "object", + properties: { summary: { type: "string" } }, + required: ["summary"], + }), + }, + }, +} as const; + +export default defineChannel({ + routes: [POST("/api/target", async () => new Response("ok"))], + receive(input, { send }) { + return send(input.message, { + auth: input.auth, + continuationToken: input.target.threadId, + }); + }, +}); +`, + "agent/schedules/daily-summary.ts": `import { defineSchedule } from "eve/schedules"; + +import target, { summarySchema } from "../channels/target.js"; + +export default defineSchedule({ + cron: "0 9 * * *", + run({ receive, waitUntil, appAuth }) { + waitUntil( + receive(target, { + auth: appAuth, + message: "Prepare the daily summary.", + outputSchema: summarySchema, + target: { threadId: "daily" }, + }), + ); + }, +}); +`, + }, + name: "cross-channel-output-schema-portability", +}; diff --git a/packages/eve/src/internal/testing/scenario-apps/index.ts b/packages/eve/src/internal/testing/scenario-apps/index.ts index 08f5107c0..571f1bc79 100644 --- a/packages/eve/src/internal/testing/scenario-apps/index.ts +++ b/packages/eve/src/internal/testing/scenario-apps/index.ts @@ -1,4 +1,5 @@ export { CUSTOM_CHANNEL_PORTABILITY_DESCRIPTOR } from "#internal/testing/scenario-apps/custom-channel-portability.js"; +export { CROSS_CHANNEL_OUTPUT_SCHEMA_PORTABILITY_DESCRIPTOR } from "#internal/testing/scenario-apps/cross-channel-output-schema-portability.js"; export { DISCORD_ROUTE_PORTABILITY_DESCRIPTOR } from "#internal/testing/scenario-apps/discord-route-portability.js"; export { EXTENSION_AGENT_DESCRIPTOR } from "#internal/testing/scenario-apps/extension-agent.js"; export { GITHUB_ROUTE_PORTABILITY_DESCRIPTOR } from "#internal/testing/scenario-apps/github-route-portability.js"; diff --git a/packages/eve/test/scenarios/public-api-portability.scenario.test.ts b/packages/eve/test/scenarios/public-api-portability.scenario.test.ts index 498371db8..c8cca3f62 100644 --- a/packages/eve/test/scenarios/public-api-portability.scenario.test.ts +++ b/packages/eve/test/scenarios/public-api-portability.scenario.test.ts @@ -8,6 +8,7 @@ import { describe, it } from "vitest"; import type { ScenarioAppDescriptor } from "../../src/internal/testing/scenario-app.js"; import { + CROSS_CHANNEL_OUTPUT_SCHEMA_PORTABILITY_DESCRIPTOR, EVE_ROUTE_PORTABILITY_DESCRIPTOR, DISCORD_ROUTE_PORTABILITY_DESCRIPTOR, GITHUB_ROUTE_PORTABILITY_DESCRIPTOR, @@ -92,6 +93,19 @@ export default defineSandbox({ }, }, }, + { + descriptor: CROSS_CHANNEL_OUTPUT_SCHEMA_PORTABILITY_DESCRIPTOR, + include: ["src/public/channels/index.ts", "src/public/schedules/index.ts"], + name: "lets tsc typecheck structured cross-channel receives from routes and schedules", + packageExports: { + "./channels": { + types: "./dist/src/public/channels/index.d.ts", + }, + "./schedules": { + types: "./dist/src/public/schedules/index.d.ts", + }, + }, + }, { descriptor: SLACK_ROUTE_PORTABILITY_DESCRIPTOR, include: ["src/public/channels/slack/index.ts", "src/public/definitions/defineChannel.ts"],