diff --git a/.changeset/github-turn-output-schema.md b/.changeset/github-turn-output-schema.md new file mode 100644 index 000000000..3577f37bf --- /dev/null +++ b/.changeset/github-turn-output-schema.md @@ -0,0 +1,5 @@ +--- +"eve": patch +--- + +Allow GitHub webhook handlers and cross-channel `receive(...)` calls to request turn-scoped structured output with Standard Schema or raw JSON Schema. GitHub webhook turns and scheduled retries can now share one schema-validated `result.completed` contract while keeping sessions in conversation mode. diff --git a/docs/channels/github.mdx b/docs/channels/github.mdx index e7d36c433..10d911405 100644 --- a/docs/channels/github.mdx +++ b/docs/channels/github.mdx @@ -42,28 +42,55 @@ Inbound hooks return `{ auth }` to dispatch, or `null` to ignore. Use `defaultGi ```ts import { defaultGitHubAuth, githubChannel } from "eve/channels/github"; +import { z } from "zod"; + +const assessmentSchema = z.object({ + summary: z.string(), + priority: z.enum(["low", "medium", "high"]), +}); export default githubChannel({ botName: "my-agent", // Replaces the @mention gate. ctx.conversation.kind is "issue", "pull_request", or "review_thread". onComment: (ctx, comment) => ({ auth: defaultGitHubAuth(ctx) }), // Opt in; no default dispatch on these events. - onIssue: (ctx, issue) => (issue.action === "opened" ? { auth: defaultGitHubAuth(ctx) } : null), - onPullRequest: (ctx, pr) => (pr.action === "opened" ? { auth: defaultGitHubAuth(ctx) } : null), + onIssue: (ctx, issue) => + issue.action === "opened" + ? { auth: defaultGitHubAuth(ctx), outputSchema: assessmentSchema } + : null, + onPullRequest: (ctx, pr) => + pr.action === "opened" + ? { auth: defaultGitHubAuth(ctx), outputSchema: assessmentSchema } + : null, }); ``` +Add `outputSchema` when a delivery needs a schema-validated result. It accepts a Standard Schema implementation such as Zod or a raw JSON Schema object and applies only to that turn. A successful structured turn emits the finalized value as `result.completed`; the durable session stays in conversation mode, and later turns do not inherit the schema. + +Consume `result.completed` in an [agent hook](../guides/hooks), where `ctx.session.auth.current` provides the trusted caller for validation and side effects. Keep those side effects idempotent and handle their failures inside the hook. `message.completed` is not the structured-result signal, and the GitHub channel does not need a separate result callback. See [Output Schema](../guides/client/output-schema) for the event lifecycle and failure behavior. + ### Delivery When a turn starts, the channel adds an `eyes` reaction to the triggering comment (turn this off with `progress: { reactions: false }`). The reply comes back as a comment, on the timeline or in the review thread, and splits across multiple comments when it runs long. If the turn fails, you get a short error comment carrying an error id. ### Human-in-the-loop (HITL) -GitHub comments have no interactive button or card affordance. A human-in-the-loop (HITL) `input.requested` event is posted as a comment prompt, and the user's reply comment maps back to the pending input request. Declare an `events["input.requested"]` handler to customize the prompt. +GitHub comments have no interactive button or card affordance. A human-in-the-loop (HITL) `input.requested` event is posted as a comment prompt, and the user's reply comment maps back to the pending input request. Declare an `events["input.requested"]` handler to customize the prompt. The reply is a new turn, so its inbound hook must return `outputSchema` again when that response turn also needs structured output. ### Proactive sessions -Start a session without an inbound mention through `receive(github, { message, target, auth })` from a schedule `run` handler, or `args.receive(github, ...)` from another channel. The target requires `owner`, `repo`, and exactly one of `issueNumber` or `pullRequestNumber`. +Start a session without an inbound mention through `receive(github, { message, target, auth, outputSchema })` from a schedule `run` handler, or `args.receive(github, ...)` from another channel. The target requires `owner`, `repo`, and exactly one of `issueNumber` or `pullRequestNumber`. + +```ts +await receive(github, { + message: "Assess this issue.", + target: { owner: "vercel", repo: "eve", issueNumber: 214 }, + auth: appAuth, + outputSchema: assessmentSchema, +}); +``` + +The `outputSchema` contract is identical to an inbound hook, so an initial webhook delivery and a scheduled retry can request the same structured result without changing session identity or mode. ### Attachments diff --git a/docs/schedules.mdx b/docs/schedules.mdx index 2a3ecca97..c5d67bfab 100644 --- a/docs/schedules.mdx +++ b/docs/schedules.mdx @@ -77,12 +77,44 @@ export default defineSchedule({ }); ``` -- `receive(channel, { message, target, auth })`: starts a session on another channel. Same contract as a route handler's `args.receive`. +- `receive(channel, { message, target, auth, outputSchema? })`: starts a session on another channel. Same contract as a route handler's `args.receive`; `outputSchema` accepts Standard Schema or raw JSON Schema for this turn. - `waitUntil(promise)`: extends the cron task's lifetime so the parked session and any in-flight fetches settle before the task ends. Wrap the `receive` call in it. - `appAuth`: the app principal (`{ authenticator: "app", principalId: "eve:app", principalType: "runtime" }`). Pass it as `receive(..., { auth: appAuth })` for work the agent does on its own behalf. A handler-form session runs on the same durable runtime engine as any other session, so it can park (durably suspend), for instance when the channel handoff is waiting for a Slack reply. Only markdown task mode is barred from waiting. +### Structured handoffs + +Pass `outputSchema` when a scheduled handoff needs a structured result, including a retry of an earlier webhook delivery: + +```ts title="agent/schedules/retry-assessment.ts" +import { defineSchedule } from "eve/schedules"; +import { z } from "zod"; + +import github from "../channels/github.js"; + +const assessmentSchema = z.object({ + summary: z.string(), + priority: z.enum(["low", "medium", "high"]), +}); + +export default defineSchedule({ + cron: "*/5 * * * *", + run({ receive, waitUntil, appAuth }) { + waitUntil( + receive(github, { + message: "Assess the claimed issue delivery.", + target: { owner: "vercel", repo: "eve", issueNumber: 214 }, + auth: appAuth, + outputSchema: assessmentSchema, + }), + ); + }, +}); +``` + +This remains a durable conversation-mode session. The schema applies only to the delivered turn, and successful structured completion emits `result.completed` for an [agent hook](./guides/hooks) to consume. + ## Trigger a schedule while iterating The dev server mounts a one-shot dispatch route that fires a schedule by name, out of band, exactly once. Since `eve dev` never runs schedules on their cron cadence, this is how you trigger one without waiting for the next production tick. diff --git a/e2e/fixtures/agent-channels/agent/channels/webhook.ts b/e2e/fixtures/agent-channels/agent/channels/webhook.ts index 13e80fd91..a2ed1addb 100644 --- a/e2e/fixtures/agent-channels/agent/channels/webhook.ts +++ b/e2e/fixtures/agent-channels/agent/channels/webhook.ts @@ -13,8 +13,9 @@ export default defineChannel({ const body = (await req.json().catch(() => ({}))) as { message?: string; sessionRef?: string; + structured?: boolean; }; - const session = await args.receive(target, { + const options = { message: body.message ?? "Reply with the single word: hello.", target: { sessionRef: body.sessionRef ?? crypto.randomUUID() }, auth: { @@ -23,7 +24,21 @@ export default defineChannel({ principalId: "smoke-test", principalType: "service", }, - }); + } as const; + const session = body.structured + ? await args.receive(target, { + ...options, + outputSchema: { + additionalProperties: false, + properties: { + count: { type: "integer" }, + title: { type: "string" }, + }, + required: ["title", "count"], + type: "object", + }, + }) + : await args.receive(target, options); return Response.json({ ok: true, sessionId: session.id }); }), ], diff --git a/e2e/fixtures/agent-channels/evals/custom-channels/cross-channel-output-schema.eval.ts b/e2e/fixtures/agent-channels/evals/custom-channels/cross-channel-output-schema.eval.ts new file mode 100644 index 000000000..1ca99e2b2 --- /dev/null +++ b/e2e/fixtures/agent-channels/evals/custom-channels/cross-channel-output-schema.eval.ts @@ -0,0 +1,41 @@ +import { defineEval } from "eve/evals"; + +import { postChannel } from "./shared.js"; + +/** Structured-output smoke for the cross-channel `args.receive` handoff. */ +export default defineEval({ + description: "Custom channel smoke: structured cross-channel receive.", + + async test(t) { + const payload = await postChannel<{ ok: boolean; sessionId?: string }>(t.target, "/webhook", { + message: "Return a structured summary of this handoff.", + structured: true, + }); + if (payload.ok !== true || typeof payload.sessionId !== "string") { + throw new Error(`Unexpected webhook response: ${JSON.stringify(payload)}`); + } + + const session = await t.target.attachSession(payload.sessionId); + const results = session.events.filter((event) => event.type === "result.completed"); + if (results.length !== 1) { + throw new Error(`Expected one result.completed event, received ${results.length}.`); + } + + const result = results[0]?.data.result; + if ( + !isRecord(result) || + typeof result.title !== "string" || + typeof result.count !== "number" || + !Number.isInteger(result.count) + ) { + throw new Error(`Unexpected structured result: ${JSON.stringify(result)}`); + } + + t.didNotFail(); + t.completed(); + }, +}); + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} diff --git a/packages/eve/src/channel/cross-channel-receive.test.ts b/packages/eve/src/channel/cross-channel-receive.test.ts index 30c2e45f6..4079d93a4 100644 --- a/packages/eve/src/channel/cross-channel-receive.test.ts +++ b/packages/eve/src/channel/cross-channel-receive.test.ts @@ -1,12 +1,20 @@ import { describe, expect, it, vi } from "vitest"; +import type { StandardJSONSchemaV1 } from "#compiled/@standard-schema/spec/index.js"; -import { CHANNEL_SENTINEL, type CompiledChannel } from "#channel/compiled-channel.js"; +import { + CHANNEL_SENTINEL, + isCompiledChannel, + type CompiledChannel, +} from "#channel/compiled-channel.js"; import { createCrossChannelReceiveFn, type CrossChannelTarget, } from "#channel/cross-channel-receive.js"; import type { Session } from "#channel/session.js"; -import type { Runtime } from "#channel/types.js"; +import type { RunHandle, Runtime } from "#channel/types.js"; +import { RuntimeNoActiveSessionError } from "#execution/runtime-errors.js"; +import type { HandleMessageStreamEvent } from "#protocol/message.js"; +import { githubChannel } from "#public/channels/github/githubChannel.js"; function makeRuntime(): Runtime { return { @@ -16,6 +24,14 @@ function makeRuntime(): Runtime { }; } +function makeRunHandle(): RunHandle { + return { + continuationToken: "target:token", + events: new ReadableStream(), + sessionId: "new-session-id", + }; +} + function makeSession(): Session { return { id: "sess_1", @@ -50,6 +66,25 @@ function makeChannel(name: string): { }; } +function makeGitHubChannel(): { + readonly definition: CompiledChannel; + readonly target: CrossChannelTarget; +} { + const definition = githubChannel(); + if (!isCompiledChannel(definition)) { + throw new Error("expected a compiled GitHub channel"); + } + return { + definition, + target: { + adapter: definition.adapter, + definition, + name: "github", + receive: definition.receive, + }, + }; +} + describe("createCrossChannelReceiveFn", () => { it("delegates to the target channel's receive with a per-target send", async () => { const slack = makeChannel("slack"); @@ -72,6 +107,182 @@ describe("createCrossChannelReceiveFn", () => { expect(typeof ctx.send).toBe("function"); }); + it("normalizes and injects a Standard Schema when resuming a session", async () => { + const normalizedSchema = { + additionalProperties: false, + properties: { summary: { type: "string" } }, + required: ["summary"], + type: "object", + } as const; + const inputConverter = vi.fn(() => ({ type: "string" })); + const outputConverter = vi.fn(() => normalizedSchema); + const outputSchema: StandardJSONSchemaV1 = { + "~standard": { + jsonSchema: { input: inputConverter, output: outputConverter }, + vendor: "test", + version: 1, + }, + }; + const runtime = makeRuntime(); + vi.mocked(runtime.deliver).mockResolvedValue({ sessionId: "existing-session-id" }); + const target = makeChannel("target"); + const fn = createCrossChannelReceiveFn(runtime, [target.target]); + + await fn(target.definition, { + auth: null, + message: "assess this", + outputSchema, + target: {}, + }); + + const [receiveInput, receiveContext] = target.receive.mock.calls[0]!; + expect(receiveInput).toEqual({ auth: null, message: "assess this", target: {} }); + await receiveContext.send( + { message: "from the channel", outputSchema: { type: "string" } }, + { auth: null, continuationToken: "token" }, + ); + + expect(outputConverter).toHaveBeenCalledWith({ target: "draft-07" }); + expect(inputConverter).not.toHaveBeenCalled(); + expect(vi.mocked(runtime.deliver).mock.calls[0]![0].payload.outputSchema).toEqual( + normalizedSchema, + ); + }); + + it("rejects an unsupported Standard Schema before invoking the target receive hook", async () => { + const outputSchema: StandardJSONSchemaV1 = { + "~standard": { + jsonSchema: { + input: () => ({ type: "object" }), + output: () => { + throw new Error("draft-07 output conversion is unsupported"); + }, + }, + vendor: "test", + version: 1, + }, + }; + const runtime = makeRuntime(); + const target = makeChannel("target"); + const fn = createCrossChannelReceiveFn(runtime, [target.target]); + + await expect( + fn(target.definition, { + auth: null, + message: "assess this", + outputSchema, + target: {}, + }), + ).rejects.toThrow("draft-07 output conversion is unsupported"); + + expect(target.receive).not.toHaveBeenCalled(); + expect(runtime.deliver).not.toHaveBeenCalled(); + expect(runtime.run).not.toHaveBeenCalled(); + }); + + it("injects a raw output schema when receive(github, ...) starts a new session", async () => { + const outputSchema = { + properties: { verdict: { type: "string" } }, + required: ["verdict"], + type: "object", + } as const; + const runtime = makeRuntime(); + vi.mocked(runtime.deliver).mockRejectedValue(new RuntimeNoActiveSessionError("target:token")); + vi.mocked(runtime.run).mockResolvedValue(makeRunHandle()); + const github = makeGitHubChannel(); + const fn = createCrossChannelReceiveFn(runtime, [github.target]); + + await fn(github.definition, { + auth: null, + message: "assess this", + outputSchema, + target: { + issueNumber: 214, + owner: "vercel", + repo: "eve", + repositoryId: 123, + }, + }); + + expect(vi.mocked(runtime.run).mock.calls[0]![0].continuationToken).toBe( + "github:repo:123:issue:214", + ); + expect(vi.mocked(runtime.run).mock.calls[0]![0].mode).toBe("conversation"); + expect(vi.mocked(runtime.run).mock.calls[0]![0].input.outputSchema).toEqual(outputSchema); + }); + + it("injects a raw output schema when receive(github, ...) resumes a session", async () => { + const outputSchema = { + properties: { verdict: { type: "string" } }, + required: ["verdict"], + type: "object", + } as const; + const runtime = makeRuntime(); + vi.mocked(runtime.deliver).mockResolvedValue({ sessionId: "existing-session-id" }); + const github = makeGitHubChannel(); + const fn = createCrossChannelReceiveFn(runtime, [github.target]); + + await fn(github.definition, { + auth: null, + message: "retry this assessment", + outputSchema, + target: { + issueNumber: 214, + owner: "vercel", + repo: "eve", + repositoryId: 123, + }, + }); + + expect(vi.mocked(runtime.deliver).mock.calls[0]![0]).toMatchObject({ + continuationToken: "github:repo:123:issue:214", + payload: { message: "retry this assessment", outputSchema }, + }); + expect(runtime.run).not.toHaveBeenCalled(); + }); + + it("preserves a target channel's send payload when no outer schema is provided", async () => { + const channelSchema = { type: "string" } as const; + const runtime = makeRuntime(); + vi.mocked(runtime.deliver).mockResolvedValue({ sessionId: "existing-session-id" }); + const target = makeChannel("target"); + const fn = createCrossChannelReceiveFn(runtime, [target.target]); + + await fn(target.definition, { auth: null, message: "plain", target: {} }); + const receiveContext = target.receive.mock.calls[0]![1]; + await receiveContext.send( + { message: "from the channel", outputSchema: channelSchema }, + { auth: null, continuationToken: "token" }, + ); + + expect(vi.mocked(runtime.deliver).mock.calls[0]![0].payload.outputSchema).toEqual( + channelSchema, + ); + }); + + it("injects the outer schema into structured user content", async () => { + const outputSchema = { type: "object" } as const; + const message = [{ text: "inspect this image", type: "text" as const }]; + const runtime = makeRuntime(); + vi.mocked(runtime.deliver).mockResolvedValue({ sessionId: "existing-session-id" }); + const target = makeChannel("target"); + const fn = createCrossChannelReceiveFn(runtime, [target.target]); + + await fn(target.definition, { + auth: null, + message: "assess this", + outputSchema, + target: {}, + }); + const receiveContext = target.receive.mock.calls[0]![1]; + await receiveContext.send(message, { auth: null, continuationToken: "token" }); + + expect(vi.mocked(runtime.deliver).mock.calls[0]![0].payload).toMatchObject({ + message, + outputSchema, + }); + }); + it("resolves the target by reference identity even when multiple channels are registered", async () => { const slack = makeChannel("slack"); const twilio = makeChannel("twilio"); diff --git a/packages/eve/src/channel/cross-channel-receive.ts b/packages/eve/src/channel/cross-channel-receive.ts index f64f20e39..e38ce21b3 100644 --- a/packages/eve/src/channel/cross-channel-receive.ts +++ b/packages/eve/src/channel/cross-channel-receive.ts @@ -7,14 +7,21 @@ import { createSendFn } from "#channel/send.js"; import type { Session } from "#channel/session.js"; import type { Runtime, SessionAuthContext } from "#channel/types.js"; import type { ResolvedChannelDefinition } from "#runtime/types.js"; +import { + normalizeOutputSchemaDefinition, + type OutputSchemaDefinition, +} from "#shared/json-schema.js"; +import type { JsonObject } from "#shared/json.js"; /** * Options accepted by {@link CrossChannelReceiveFn}. Mirrors the input * argument of a channel's authored `receive(input, { send })` hook — * the runtime constructs `send` internally so route-handler callers - * only supply the platform target, payload, and auth. + * only supply the platform target, payload, auth, and optional turn policy. */ export interface CrossChannelReceiveOptions> { + /** Standard Schema or raw JSON Schema required for this turn's result. */ + readonly outputSchema?: OutputSchemaDefinition; readonly message: string | UserContent; readonly target: TTarget; readonly auth: SessionAuthContext | null; @@ -79,9 +86,11 @@ export function createCrossChannelReceiveFn( ): CrossChannelReceiveFn { return async (channel, options) => { const targetChannel = resolveTargetByReference(channel, channels); + const outputSchema = normalizeOutputSchemaDefinition(options.outputSchema); return await invokeChannelReceive({ runtime, target: targetChannel, + outputSchema, input: { message: options.message as string, target: options.target as Readonly>, @@ -99,6 +108,7 @@ export function createCrossChannelReceiveFn( interface InvokeChannelReceiveInput { readonly runtime: Runtime; readonly target: Pick; + readonly outputSchema?: JsonObject; readonly input: { readonly message: string; readonly target: Readonly>; @@ -121,7 +131,18 @@ export async function invokeChannelReceive(args: InvokeChannelReceiveInput): Pro if (!args.target.adapter) { throw new Error(args.describeMissingAdapter()); } - const send = createSendFn(args.runtime, args.target.adapter, args.target.name); + const baseSend = createSendFn(args.runtime, args.target.adapter, args.target.name); + const outputSchema = args.outputSchema; + const send: typeof baseSend = + outputSchema === undefined + ? baseSend + : (input, options) => + baseSend( + typeof input === "string" || Array.isArray(input) + ? { message: input, outputSchema } + : { ...input, outputSchema }, + options, + ); return await args.target.receive(args.input, { send }); } diff --git a/packages/eve/src/channel/schedule.test.ts b/packages/eve/src/channel/schedule.test.ts index 549bad23f..9cc596f01 100644 --- a/packages/eve/src/channel/schedule.test.ts +++ b/packages/eve/src/channel/schedule.test.ts @@ -93,6 +93,11 @@ describe("ScheduleDispatcher", () => { describe("run handler form", () => { it("invokes the author's run() with { receive, waitUntil, appAuth }", async () => { const runtime = createMockRuntime(); + const outputSchema = { + properties: { summary: { type: "string" } }, + required: ["summary"], + type: "object", + } as const; vi.stubEnv("SLACK_BOT_TOKEN", "xoxb-test"); vi.stubEnv("SLACK_SIGNING_SECRET", "test-secret"); try { @@ -111,6 +116,7 @@ describe("ScheduleDispatcher", () => { observed.hasWaitUntil = typeof waitUntil === "function"; await receive(definition, { message: "post the digest", + outputSchema, target: { channelId: "C0123ABC" }, auth: appAuth, }); @@ -125,6 +131,7 @@ describe("ScheduleDispatcher", () => { const runInput = vi.mocked(runtime.run).mock.calls[0]![0]; expect(runInput.continuationToken).toBe("slack:C0123ABC:"); expect(runInput.auth).toEqual(SCHEDULE_APP_AUTH); + expect(runInput.input.outputSchema).toEqual(outputSchema); } finally { vi.unstubAllEnvs(); } diff --git a/packages/eve/src/client/output-schema.test.ts b/packages/eve/src/client/output-schema.test.ts index b538bd1f8..ae5197aec 100644 --- a/packages/eve/src/client/output-schema.test.ts +++ b/packages/eve/src/client/output-schema.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it } from "vitest"; +import type { StandardJSONSchemaV1 } from "#compiled/@standard-schema/spec/index.js"; import { extractCompletedResult, normalizeOutputSchemaForRequest } from "#client/output-schema.js"; import { @@ -20,6 +21,24 @@ describe("output schema client helpers", () => { expect(normalizeOutputSchemaForRequest(schema)).toEqual(schema); }); + it("normalizes the output side of Standard Schema definitions", () => { + const schema: StandardJSONSchemaV1 = { + "~standard": { + jsonSchema: { + input: () => ({ title: "input" }), + output: ({ target }) => ({ target, title: "output" }), + }, + vendor: "test", + version: 1, + }, + }; + + expect(normalizeOutputSchemaForRequest(schema)).toEqual({ + target: "draft-07", + title: "output", + }); + }); + it("extracts the most recent completed structured result", () => { const events: HandleMessageStreamEvent[] = [ createResultCompletedEvent({ diff --git a/packages/eve/src/client/output-schema.ts b/packages/eve/src/client/output-schema.ts index b758b9125..475c8f392 100644 --- a/packages/eve/src/client/output-schema.ts +++ b/packages/eve/src/client/output-schema.ts @@ -1,7 +1,8 @@ -import type { StandardJSONSchemaV1 } from "#compiled/@standard-schema/spec/index.js"; - import type { HandleMessageStreamEvent, ResultCompletedStreamEvent } from "#protocol/message.js"; -import { normalizeJsonSchemaDefinition } from "#shared/json-schema.js"; +import { + normalizeOutputSchemaDefinition, + type OutputSchemaDefinition, +} from "#shared/json-schema.js"; import type { JsonObject } from "#shared/json.js"; /** @@ -9,9 +10,9 @@ import type { JsonObject } from "#shared/json.js"; * representation accepted by eve message routes. */ export function normalizeOutputSchemaForRequest( - schema: StandardJSONSchemaV1 | JsonObject | undefined, + schema: OutputSchemaDefinition | undefined, ): JsonObject | undefined { - return schema === undefined ? undefined : normalizeJsonSchemaDefinition(schema, "output"); + return normalizeOutputSchemaDefinition(schema); } /** diff --git a/packages/eve/src/client/types.ts b/packages/eve/src/client/types.ts index bb15fe479..2ecd2b572 100644 --- a/packages/eve/src/client/types.ts +++ b/packages/eve/src/client/types.ts @@ -1,8 +1,8 @@ import type { UserContent } from "ai"; -import type { StandardJSONSchemaV1 } from "#compiled/@standard-schema/spec/index.js"; import type { HandleMessageStreamEvent } from "#protocol/message.js"; import type { InputRequest, InputResponse } from "#runtime/input/types.js"; +import type { OutputSchemaDefinition } from "#shared/json-schema.js"; import type { JsonObject } from "#shared/json.js"; export type { @@ -155,7 +155,7 @@ export interface SendTurnPayload { * authoritative for validation; {@link MessageResult.data} is typed to this * schema's output type and is not revalidated client-side. */ - readonly outputSchema?: StandardJSONSchemaV1 | JsonObject; + readonly outputSchema?: OutputSchemaDefinition; /** * Abort signal for cancelling the request. diff --git a/packages/eve/src/context/hook-lifecycle.integration.test.ts b/packages/eve/src/context/hook-lifecycle.integration.test.ts index d52f33aca..d9756d60e 100644 --- a/packages/eve/src/context/hook-lifecycle.integration.test.ts +++ b/packages/eve/src/context/hook-lifecycle.integration.test.ts @@ -2,7 +2,7 @@ import { describe, expect, it } from "vitest"; import { createRuntimeHookRegistry } from "#runtime/hooks/registry.js"; import type { ResolvedHookDefinition } from "#runtime/types.js"; -import type { HandleMessageStreamEvent } from "#protocol/message.js"; +import { createResultCompletedEvent, type HandleMessageStreamEvent } from "#protocol/message.js"; import { ContextContainer, contextStorage } from "./container.js"; import { dispatchStreamEventHooks } from "./hook-lifecycle.js"; import { @@ -101,4 +101,59 @@ describe("dispatchStreamEventHooks", () => { ), ).rejects.toThrow(/event hook boom/); }); + + it("provides structured results with the trusted current session auth", async () => { + const ctx = buildCtx(); + ctx.set(SessionKey, { + auth: { + current: { + attributes: { repository: "vercel/eve" }, + authenticator: "github-webhook", + principalId: "github:1", + principalType: "user", + }, + initiator: null, + }, + sessionId: "session_test", + turn: { id: "turn_0", sequence: 0 }, + }); + + const observed: Array<{ principalId: string | undefined; result: unknown }> = []; + const registry = createRuntimeHookRegistry([ + hook("assessment", { + events: { + "result.completed": async (event, hookCtx) => { + const completed = event as Extract< + HandleMessageStreamEvent, + { type: "result.completed" } + >; + observed.push({ + principalId: hookCtx.session.auth.current?.principalId, + result: completed.data.result, + }); + }, + }, + }), + ]); + + await contextStorage.run(ctx, () => + dispatchStreamEventHooks({ + ctx, + registry, + event: createResultCompletedEvent({ + result: { priority: "high", summary: "Needs maintainer review" }, + sequence: 0, + stepIndex: 0, + turnId: "turn_0", + }), + }), + ); + + expect(observed).toEqual([ + { + principalId: "github:1", + result: { priority: "high", summary: "Needs maintainer review" }, + }, + ]); + }); }); diff --git a/packages/eve/src/execution/workflow-steps.test.ts b/packages/eve/src/execution/workflow-steps.test.ts index 9cbbf7143..b152d0eaa 100644 --- a/packages/eve/src/execution/workflow-steps.test.ts +++ b/packages/eve/src/execution/workflow-steps.test.ts @@ -131,7 +131,7 @@ const threadContextAdapter: ChannelAdapter = { const thread = adapterCtx.ctx.ensure(ThreadKey, () => "unset"); const message = payload.message ?? ""; - return { message: `thread=${thread}; user=${message}` }; + return { message: `thread=${thread}; user=${message}`, outputSchema: payload.outputSchema }; }, }; @@ -521,6 +521,74 @@ describe("turnStep", () => { expect(second.serializedContext[ThreadKey.name]).toBe("alpha"); }); + it("does not leak a schema retained by a failed turn into the next delivery", async () => { + const outputSchema = { + properties: { assessment: { type: "string" } }, + required: ["assessment"], + type: "object", + } as const; + const initialSession = createStubSession({ outputSchema }); + const failedSession = createStubSession({ outputSchema }); + installSessionStoreMocks([initialSession, failedSession]); + + const compiledBundle = { + adapterRegistry: { + adaptersByKind: new Map([[threadContextAdapter.kind, threadContextAdapter]]), + }, + compiledArtifactsSource: {} as never, + graph: { + nodesByNodeId: new Map(), + root: { + sandboxRegistry: { sandbox: null }, + turnAgent: TestTurnAgent, + }, + }, + moduleMap: { nodes: {} }, + hookRegistry: createEmptyHookRegistry(), + resolvedAgent: { config: {} }, + subagentRegistry: {}, + toolRegistry: {}, + turnAgent: TestTurnAgent, + } as never; + vi.mocked(getCompiledRuntimeAgentBundle).mockResolvedValue(compiledBundle); + + const observedSchemas: Array = []; + let invocationCount = 0; + vi.mocked(createExecutionNodeStep).mockImplementation(() => { + return async (session): Promise => { + observedSchemas.push(session.outputSchema); + invocationCount += 1; + return invocationCount === 1 + ? { next: null, session: failedSession } + : { next: { done: true, output: "plain follow-up" }, session }; + }; + }); + + const parentWritable = createTestWritable(); + const first = await turnStep({ + input: { + kind: "deliver", + payloads: [{ message: "assess this", outputSchema }], + }, + parentWritable, + serializedContext: createSerializedContext(), + sessionState: createStubSessionState(), + }); + expect(first.action).toBe("park"); + + await turnStep({ + input: { + kind: "deliver", + payloads: [{ message: "answer normally" }], + }, + parentWritable, + serializedContext: first.serializedContext, + sessionState: first.sessionState, + }); + + expect(observedSchemas).toEqual([outputSchema, undefined]); + }); + it("refreshes the system prompt from the current bundled deployment", async () => { const session = createStubSession({ agent: { @@ -892,7 +960,7 @@ describe("resolveEffectiveOutputSchema", () => { it("uses a run-scoped schema in either mode", () => { for (const mode of ["conversation", "task"] as const) { - const session = createStubSession(); + const session = createStubSession({ outputSchema: agentSchema }); const resolved = resolveEffectiveOutputSchema({ agentOutputSchema: agentSchema, input: { outputSchema: runSchema }, @@ -932,4 +1000,56 @@ describe("resolveEffectiveOutputSchema", () => { }); expect(resolved).toBe(session); }); + + it("preserves the in-effect schema on a runtime-action continuation", () => { + const session = createStubSession({ outputSchema: runSchema }); + const resolved = resolveEffectiveOutputSchema({ + agentOutputSchema: agentSchema, + input: { runtimeActionResults: [] }, + mode: "conversation", + session, + }); + expect(resolved).toBe(session); + }); + + it("clears a failed turn's schema from the next plain conversation delivery", () => { + const resolved = resolveEffectiveOutputSchema({ + agentOutputSchema: agentSchema, + input: { message: "try again without structured output" }, + mode: "conversation", + session: createStubSession({ outputSchema: runSchema }), + }); + expect(resolved.outputSchema).toBeUndefined(); + }); + + it("requires HITL response turns to attach their output schema again", () => { + const resolved = resolveEffectiveOutputSchema({ + agentOutputSchema: agentSchema, + input: { inputResponses: [{ optionId: "approve", requestId: "approval-1" }] }, + mode: "conversation", + session: createStubSession({ outputSchema: runSchema }), + }); + expect(resolved.outputSchema).toBeUndefined(); + }); + + it("preserves the caller's run schema across a task-mode HITL response", () => { + const session = createStubSession({ outputSchema: runSchema }); + const resolved = resolveEffectiveOutputSchema({ + agentOutputSchema: agentSchema, + input: { inputResponses: [{ optionId: "approve", requestId: "approval-1" }] }, + mode: "task", + session, + }); + expect(resolved).toBe(session); + }); + + it("replaces stale task state with the agent schema on a fresh task delivery", () => { + const resolved = resolveEffectiveOutputSchema({ + agentOutputSchema: agentSchema, + input: { message: "run the task" }, + mode: "task", + session: createStubSession({ outputSchema: runSchema }), + }); + expect(resolved.outputSchema).toEqual(agentSchema); + }); }); diff --git a/packages/eve/src/execution/workflow-steps.ts b/packages/eve/src/execution/workflow-steps.ts index 682e7287e..f7e44cf84 100644 --- a/packages/eve/src/execution/workflow-steps.ts +++ b/packages/eve/src/execution/workflow-steps.ts @@ -439,8 +439,11 @@ export function reconcileSessionContinuationToken( * wins. With no run-scoped schema, a task run adopts the agent's declared * return schema — its function-output contract, which only applies when the * agent is invoked as a function (subagent / schedule / job), i.e. task mode. - * A conversation run with no run-scoped schema enforces nothing. Continuation - * steps (no new `StepInput`) preserve whatever is already in effect. + * A conversation run with no run-scoped schema enforces nothing. Fresh + * conversation deliveries — including HITL response turns — replace the + * previous turn's policy. Runtime-owned continuation steps and task-mode HITL + * responses preserve whatever is already in effect because their callers + * cannot attach the original run-scoped schema again. */ export function resolveEffectiveOutputSchema(input: { readonly agentOutputSchema: JsonObject | undefined; @@ -454,11 +457,21 @@ export function resolveEffectiveOutputSchema(input: { return { ...session, outputSchema: stepInput.outputSchema }; } - if (mode === "task" && session.outputSchema === undefined && agentOutputSchema !== undefined) { + if ( + stepInput === undefined || + stepInput.runtimeActionResults !== undefined || + (mode === "task" && stepInput.inputResponses !== undefined) + ) { + return session; + } + + if (mode === "task" && agentOutputSchema !== undefined) { return { ...session, outputSchema: agentOutputSchema }; } - return session; + if (session.outputSchema === undefined) return session; + const { outputSchema: _outputSchema, ...withoutOutputSchema } = session; + return withoutOutputSchema; } const log = createLogger("execution.workflow-entry"); diff --git a/packages/eve/src/harness/tool-loop.test.ts b/packages/eve/src/harness/tool-loop.test.ts index 8d2983dbb..eefdd87d6 100644 --- a/packages/eve/src/harness/tool-loop.test.ts +++ b/packages/eve/src/harness/tool-loop.test.ts @@ -856,6 +856,8 @@ describe("createToolLoopHarness", () => { type: "step.failed", }), ); + expect(events.some((event) => event.type === "result.completed")).toBe(false); + expect(result.session.outputSchema).toEqual({ type: "object" }); }); it("returns only the final assistant reply when a completed task step includes tool work", async () => { diff --git a/packages/eve/src/internal/testing/scenario-apps/github-route-portability.ts b/packages/eve/src/internal/testing/scenario-apps/github-route-portability.ts index f6e934d02..1853834cf 100644 --- a/packages/eve/src/internal/testing/scenario-apps/github-route-portability.ts +++ b/packages/eve/src/internal/testing/scenario-apps/github-route-portability.ts @@ -4,8 +4,43 @@ export const GITHUB_ROUTE_PORTABILITY_DESCRIPTOR: ScenarioAppDescriptor = { files: { "agent/channels/github.ts": `import { githubChannel } from "eve/channels/github"; +export const assessmentSchema = { + "~standard": { + version: 1, + vendor: "portability-test", + jsonSchema: { + input: () => ({ type: "object" }), + output: () => ({ + type: "object", + properties: { summary: { type: "string" } }, + required: ["summary"], + }), + }, + }, +} as const; + export default githubChannel({ botName: "testbot", + onIssue: (_ctx, issue) => + issue.action === "opened" ? { auth: null, outputSchema: assessmentSchema } : null, +}); +`, + "agent/schedules/retry-assessment.ts": `import { defineSchedule } from "eve/schedules"; + +import github, { assessmentSchema } from "../channels/github.js"; + +export default defineSchedule({ + cron: "*/5 * * * *", + run({ receive, waitUntil, appAuth }) { + waitUntil( + receive(github, { + auth: appAuth, + message: "Retry the assessment.", + outputSchema: assessmentSchema, + target: { owner: "vercel", repo: "eve", issueNumber: 214, repositoryId: 123 }, + }), + ); + }, }); `, }, diff --git a/packages/eve/src/public/channels/github/dispatch.ts b/packages/eve/src/public/channels/github/dispatch.ts index f450f75c9..449b19b95 100644 --- a/packages/eve/src/public/channels/github/dispatch.ts +++ b/packages/eve/src/public/channels/github/dispatch.ts @@ -30,10 +30,11 @@ import { import type { GitHubChannelConfig, GitHubInboundContext, - GitHubInboundResult, GitHubInboundResultOrPromise, } from "#public/channels/github/githubChannel.js"; -import type { SendFn } from "#public/definitions/defineChannel.js"; +import type { SendFn, SendPayload } from "#public/definitions/defineChannel.js"; +import { normalizeOutputSchemaDefinition } from "#shared/json-schema.js"; +import type { JsonObject } from "#shared/json.js"; const log = createLogger("github.dispatch"); @@ -153,6 +154,7 @@ async function dispatchWebhookEventTurn(input: { github: await buildPullRequestContext(input.config, input.state, input.event.delivery.id), hook: result.context, }), + outputSchema: result.outputSchema, send: input.send, state: input.state, }); @@ -189,6 +191,7 @@ async function dispatchCommentTurn(input: { github: await buildPullRequestContext(input.config, input.state, input.event.delivery.id), hook: result.context, }), + outputSchema: result.outputSchema, send: input.send, state: input.state, }); @@ -197,9 +200,12 @@ async function dispatchCommentTurn(input: { async function runInboundHandler(input: { readonly event: GitHubTurnEvent; readonly handlerResult: () => GitHubInboundResultOrPromise; -}): Promise { +}): Promise { try { - return await input.handlerResult(); + const result = await input.handlerResult(); + if (result === null) return result; + const outputSchema = normalizeOutputSchemaDefinition(result.outputSchema); + return { auth: result.auth, context: result.context, outputSchema }; } catch (error) { logError(log, "GitHub inbound handler failed", error, { deliveryId: input.event.delivery.id, @@ -208,12 +214,19 @@ async function runInboundHandler(input: { } } +type NormalizedGitHubInboundResult = { + readonly auth: SessionAuthContext | null; + readonly context?: readonly string[]; + readonly outputSchema?: JsonObject; +} | null; + async function sendGitHubTurn(input: { readonly auth: SessionAuthContext | null; readonly commentUrl?: string; readonly event: GitHubTurnEvent; readonly logMessage?: string; readonly message: string; + readonly outputSchema?: JsonObject; readonly context: readonly string[] | undefined; readonly send: SendFn; readonly state: GitHubChannelState; @@ -228,19 +241,24 @@ async function sendGitHubTurn(input: { sender: input.event.sender, }); const turnMessage = prependGitHubContext(input.message, contextBlock); + const payload: { + context: SendPayload["context"]; + message: SendPayload["message"]; + outputSchema?: SendPayload["outputSchema"]; + } = { + context: input.context, + message: turnMessage, + }; + if (input.outputSchema !== undefined) { + payload.outputSchema = input.outputSchema; + } try { - await input.send( - { - message: turnMessage, - context: input.context, - }, - { - auth: input.auth, - continuationToken: continuationTokenFromState(input.state), - state: input.state, - }, - ); + await input.send(payload, { + auth: input.auth, + continuationToken: continuationTokenFromState(input.state), + state: input.state, + }); } catch (error) { logError(log, input.logMessage ?? "GitHub delivery failed", error, { deliveryId: input.event.delivery.id, diff --git a/packages/eve/src/public/channels/github/githubChannel.test.ts b/packages/eve/src/public/channels/github/githubChannel.test.ts index 17a8ed2ef..d10657fa6 100644 --- a/packages/eve/src/public/channels/github/githubChannel.test.ts +++ b/packages/eve/src/public/channels/github/githubChannel.test.ts @@ -1,4 +1,6 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { StandardJSONSchemaV1 } from "#compiled/@standard-schema/spec/index.js"; +import { z } from "#compiled/zod/index.js"; import { buildAdapterContext } from "#channel/adapter-context.js"; import { callAdapterEventHandler, type ChannelAdapter } from "#channel/adapter.js"; @@ -221,6 +223,7 @@ describe("githubChannel", () => { expect(payload.message).toContain(""); expect(payload.message).toContain("help me"); expect(payload.inputResponses).toBeUndefined(); + expect(payload).not.toHaveProperty("outputSchema"); expect(options).toMatchObject({ auth: { attributes: { @@ -410,13 +413,47 @@ describe("githubChannel", () => { expect(send).not.toHaveBeenCalled(); }); - it("dispatches opt-in issue webhook hooks", async () => { + it("forwards a custom comment hook's output schema", async () => { + const outputSchema = { + properties: { answer: { type: "string" } }, + required: ["answer"], + type: "object", + } as const; + const channel = githubChannel({ + credentials: { webhookSecret: SECRET }, + onComment(ctx) { + return { auth: defaultGitHubAuth(ctx), outputSchema }; + }, + }); + + const { send } = await firePost( + channel, + signedRequest( + "issue_comment", + basePayload({ + action: "created", + comment: { body: "custom", id: 10, user: { id: 1, login: "octocat" } }, + issue: { number: 5 }, + }), + ), + ); + + expect(send.mock.calls[0]?.[0].outputSchema).toEqual(outputSchema); + }); + + it("dispatches opt-in issue webhook hooks with a raw output schema", async () => { const hook = vi.fn(); + const outputSchema = { + additionalProperties: false, + properties: { summary: { type: "string" } }, + required: ["summary"], + type: "object", + } as const; const channel = githubChannel({ credentials: { webhookSecret: SECRET }, onIssue(ctx, issue) { hook(ctx.conversation, issue); - return { auth: defaultGitHubAuth(ctx) }; + return { auth: defaultGitHubAuth(ctx), outputSchema }; }, }); @@ -438,6 +475,7 @@ describe("githubChannel", () => { expect(send).toHaveBeenCalledTimes(1); const [payload, options] = send.mock.calls[0]!; expect(payload.message).toContain("Issue opened: #5 Track webhook issue events"); + expect(payload.outputSchema).toEqual(outputSchema); expect(options).toMatchObject({ continuationToken: "repo:123:issue:5", state: { @@ -449,14 +487,18 @@ describe("githubChannel", () => { }); }); - it("dispatches opt-in pull request webhook hooks with checkout-ready state", async () => { + it("normalizes a Standard Schema returned by a pull request webhook hook", async () => { const hook = vi.fn(); + const outputSchema = z.object({ + score: z.number().int(), + summary: z.string(), + }); const channel = githubChannel({ api: { apiBaseUrl: "https://github.test", fetch: prContextFetch() }, credentials: { appId: "test-app", webhookSecret: SECRET }, onPullRequest(ctx, pullRequest) { hook(ctx.conversation, pullRequest); - return { auth: defaultGitHubAuth(ctx) }; + return { auth: defaultGitHubAuth(ctx), outputSchema }; }, }); @@ -489,6 +531,20 @@ describe("githubChannel", () => { expect(send).toHaveBeenCalledTimes(1); const [payload, options] = send.mock.calls[0]!; expect(payload.message).toContain("Pull request opened: #7 Add webhook PR handling"); + expect(payload.outputSchema).toEqual({ + $schema: "http://json-schema.org/draft-07/schema#", + additionalProperties: false, + properties: { + score: { + maximum: 9_007_199_254_740_991, + minimum: -9_007_199_254_740_991, + type: "integer", + }, + summary: { type: "string" }, + }, + required: ["score", "summary"], + type: "object", + }); expect(options).toMatchObject({ continuationToken: "repo:123:pull:7", state: { @@ -504,6 +560,45 @@ describe("githubChannel", () => { }); }); + it("drops a webhook turn when its Standard Schema cannot be normalized", async () => { + const errorLog = vi.spyOn(console, "error").mockImplementation(() => {}); + const outputSchema: StandardJSONSchemaV1 = { + "~standard": { + jsonSchema: { + input: () => ({ type: "object" }), + output: () => { + throw new Error("draft-07 output conversion is unsupported"); + }, + }, + vendor: "test", + version: 1, + }, + }; + const channel = githubChannel({ + credentials: { webhookSecret: SECRET }, + onIssue(ctx) { + return { auth: defaultGitHubAuth(ctx), outputSchema }; + }, + }); + + const { send } = await firePost( + channel, + signedRequest( + "issues", + basePayload({ + action: "opened", + issue: { number: 5, title: "Unsupported schema" }, + }), + ), + ); + + expect(send).not.toHaveBeenCalled(); + expect(errorLog).toHaveBeenCalledWith( + expect.stringContaining("GitHub inbound handler failed"), + expect.anything(), + ); + }); + it("ignores issue and pull request webhooks without opt-in hooks", async () => { const channel = githubChannel({ credentials: { webhookSecret: SECRET }, diff --git a/packages/eve/src/public/channels/github/githubChannel.ts b/packages/eve/src/public/channels/github/githubChannel.ts index 8bc0973f6..290e492bd 100644 --- a/packages/eve/src/public/channels/github/githubChannel.ts +++ b/packages/eve/src/public/channels/github/githubChannel.ts @@ -2,6 +2,7 @@ import type { SessionHandle } from "#channel/session.js"; import type { SessionAuthContext } from "#channel/types.js"; import type { SessionContext } from "#public/definitions/callback-context.js"; import type { ChannelSessionOps } from "#public/definitions/defineChannel.js"; +import type { OutputSchemaDefinition } from "#shared/json-schema.js"; import { createLogger } from "#internal/logging.js"; import type { HandleMessageStreamEvent } from "#protocol/message.js"; @@ -93,11 +94,15 @@ export interface GitHubEventContext extends GitHubChannelContext, ChannelSession /** * Result of a GitHub inbound hook. Return `null` to acknowledge without * dispatching; return `{ auth }` to dispatch. Optional `context` strings are - * added as `role: "user"` messages before the dispatched turn. + * added as `role: "user"` messages before the dispatched turn. An optional + * `outputSchema` requests framework-managed structured output for this turn + * without changing the session's conversation mode. */ export type GitHubInboundResult = { readonly auth: SessionAuthContext | null; readonly context?: readonly string[]; + /** Standard Schema or raw JSON Schema required for this turn's result. */ + readonly outputSchema?: OutputSchemaDefinition; } | null; /** diff --git a/packages/eve/src/shared/json-schema.ts b/packages/eve/src/shared/json-schema.ts index aef6b9d32..57af18728 100644 --- a/packages/eve/src/shared/json-schema.ts +++ b/packages/eve/src/shared/json-schema.ts @@ -6,6 +6,11 @@ const STANDARD_JSON_SCHEMA_TARGET: StandardJSONSchemaV1.Target = "draft-07"; type JsonSchemaDirection = "input" | "output"; +/** Schema input accepted by eve public APIs that request structured output. */ +export type OutputSchemaDefinition = + | StandardJSONSchemaV1 + | JsonObject; + /** * Normalizes one Standard Schema or JSON Schema definition into plain JSON * Schema data that can cross eve runtime and client boundaries. @@ -25,6 +30,13 @@ export function normalizeJsonSchemaDefinition( return parseJsonObject(value); } +/** Normalizes an optional output schema into durable, wire-safe JSON data. */ +export function normalizeOutputSchemaDefinition( + value: OutputSchemaDefinition | undefined, +): JsonObject | undefined { + return value === undefined ? undefined : normalizeJsonSchemaDefinition(value, "output"); +} + function isStandardSchema(value: unknown): value is StandardJSONSchemaV1 { return value !== null && typeof value === "object" && "~standard" in value; } diff --git a/packages/eve/test/scenarios/public-api-portability.scenario.test.ts b/packages/eve/test/scenarios/public-api-portability.scenario.test.ts index 498371db8..8b8f5b018 100644 --- a/packages/eve/test/scenarios/public-api-portability.scenario.test.ts +++ b/packages/eve/test/scenarios/public-api-portability.scenario.test.ts @@ -114,12 +114,20 @@ export default defineSandbox({ }, { descriptor: GITHUB_ROUTE_PORTABILITY_DESCRIPTOR, - include: ["src/public/channels/github/index.ts", "src/public/definitions/defineChannel.ts"], - name: "lets tsc typecheck a default-exported githubChannel without extra annotations", + include: [ + "src/public/channels/github/index.ts", + "src/public/definitions/defineChannel.ts", + "src/public/definitions/schedule.ts", + "src/public/schedules/index.ts", + ], + name: "lets tsc typecheck GitHub structured turns from handlers and schedules", packageExports: { "./channels/github": { types: "./dist/src/public/channels/github/index.d.ts", }, + "./schedules": { + types: "./dist/src/public/schedules/index.d.ts", + }, }, }, {