diff --git a/.changeset/remote-agent-usage-callback.md b/.changeset/remote-agent-usage-callback.md new file mode 100644 index 000000000..c4c208eea --- /dev/null +++ b/.changeset/remote-agent-usage-callback.md @@ -0,0 +1,5 @@ +--- +"eve": patch +--- + +Remote agents now report their token usage back to the caller. When a `defineRemoteAgent` task completes, the terminal callback carries the run's token totals, and the caller emits a local `invoke_agent` span (`gen_ai.operation.name=invoke_agent`, `gen_ai.agent.name`, `gen_ai.usage.*`) so caller-side observability can attribute a remote agent's tokens. Usage is best-effort and optional, so older callees keep working unchanged. Both the calling agent and the remote agent must run this version for remote usage to appear. diff --git a/packages/eve/src/execution/session-callback-step.test.ts b/packages/eve/src/execution/session-callback-step.test.ts index 923eb6675..f7e63702f 100644 --- a/packages/eve/src/execution/session-callback-step.test.ts +++ b/packages/eve/src/execution/session-callback-step.test.ts @@ -1,11 +1,37 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + readDurableSession, + type DurableSession, + type DurableSessionState, +} from "#execution/durable-session-store.js"; import { fireSessionCallbackStep } from "#execution/session-callback-step.js"; +import type { SessionStateMap } from "#harness/types.js"; + +vi.mock("#execution/durable-session-store.js", () => ({ + readDurableSession: vi.fn(), +})); + +const readDurableSessionMock = vi.mocked(readDurableSession); + +const TURN_USAGE_STATE_KEY = "eve.harness.turnUsage"; +const SESSION_STATE = { sessionId: "remote-session" } as DurableSessionState; + +function durableSessionWithState(state: SessionStateMap): DurableSession { + return { + agent: { system: "" }, + continuationToken: "tok", + history: [], + sessionId: "remote-session", + state, + }; +} describe("fireSessionCallbackStep", () => { let errorSpy: ReturnType; beforeEach(() => { + readDurableSessionMock.mockReset(); errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); }); @@ -92,6 +118,79 @@ describe("fireSessionCallbackStep", () => { }); }); + it("includes token usage when the completed session reports it", async () => { + const fetchMock = vi.fn().mockResolvedValue(new Response(null, { status: 202 })); + vi.stubGlobal("fetch", fetchMock); + readDurableSessionMock.mockResolvedValue( + durableSessionWithState({ + [TURN_USAGE_STATE_KEY]: { + turnId: "turn_0", + inputTokens: 100, + outputTokens: 50, + cacheReadTokens: 10, + cacheWriteTokens: 5, + }, + }), + ); + + await fireSessionCallbackStep({ + output: "done", + serializedContext: createSerializedContext(), + sessionState: SESSION_STATE, + status: "completed", + }); + + expect(fetchMock).toHaveBeenCalledWith("https://caller.example.com/eve/v1/callback/tok123", { + body: JSON.stringify({ + callId: "call-1", + kind: "session.completed", + output: "done", + sessionId: "remote-session", + subagentName: "research", + usage: { inputTokens: 100, outputTokens: 50, cacheReadTokens: 10 }, + }), + headers: { + "content-type": "application/json", + }, + method: "POST", + redirect: "error", + signal: expect.any(AbortSignal), + }); + expect(errorSpy).not.toHaveBeenCalled(); + }); + + it("omits usage when the completed session reports none", async () => { + const fetchMock = vi.fn().mockResolvedValue(new Response(null, { status: 202 })); + vi.stubGlobal("fetch", fetchMock); + readDurableSessionMock.mockResolvedValue(durableSessionWithState({})); + + await fireSessionCallbackStep({ + output: "done", + serializedContext: createSerializedContext(), + sessionState: SESSION_STATE, + status: "completed", + }); + + expect(parsePostedBody(fetchMock).usage).toBeUndefined(); + expect(errorSpy).not.toHaveBeenCalled(); + }); + + it("still posts the callback when usage cannot be read", async () => { + const fetchMock = vi.fn().mockResolvedValue(new Response(null, { status: 202 })); + vi.stubGlobal("fetch", fetchMock); + readDurableSessionMock.mockRejectedValue(new Error("snapshot unavailable")); + + await fireSessionCallbackStep({ + output: "done", + serializedContext: createSerializedContext(), + sessionState: SESSION_STATE, + status: "completed", + }); + + expect(fetchMock).toHaveBeenCalledTimes(1); + expect(parsePostedBody(fetchMock).usage).toBeUndefined(); + }); + it("posts the failed callback with the normalized error message", async () => { const fetchMock = vi.fn().mockResolvedValue(new Response(null, { status: 202 })); vi.stubGlobal("fetch", fetchMock); @@ -238,6 +337,14 @@ describe("fireSessionCallbackStep", () => { }); }); +function parsePostedBody(fetchMock: ReturnType): { usage?: unknown } { + const call = fetchMock.mock.calls[0]; + if (call === undefined) { + throw new Error("expected fetch to have been called"); + } + return JSON.parse((call[1] as { body: string }).body) as { usage?: unknown }; +} + function createSerializedContext(): Record { return { "eve.sessionCallback": { diff --git a/packages/eve/src/execution/session-callback-step.ts b/packages/eve/src/execution/session-callback-step.ts index 6b8496171..b4ec17c25 100644 --- a/packages/eve/src/execution/session-callback-step.ts +++ b/packages/eve/src/execution/session-callback-step.ts @@ -1,12 +1,20 @@ import type { SessionCallback } from "#channel/types.js"; import { parseSessionCallback } from "#channel/session-callback.js"; import { SessionCallbackKey } from "#context/keys.js"; +import { readDurableSession, type DurableSessionState } from "#execution/durable-session-store.js"; +import { getTurnUsageState } from "#harness/turn-tag-state.js"; import { createLogger } from "#internal/logging.js"; import { toErrorMessage } from "#shared/errors.js"; const SESSION_CALLBACK_TIMEOUT_MS = 30_000; const log = createLogger("execution.session-callback"); +export interface SessionCallbackUsage { + readonly inputTokens: number; + readonly outputTokens: number; + readonly cacheReadTokens: number; +} + /** * Sends the configured session terminal callback. * @@ -21,6 +29,7 @@ export async function fireSessionCallbackStep(input: { readonly error?: unknown; readonly output?: unknown; readonly serializedContext: Record; + readonly sessionState?: DurableSessionState; readonly status: "completed" | "failed"; }): Promise { "use step"; @@ -33,25 +42,47 @@ export async function fireSessionCallbackStep(input: { try { const callback = parseSerializedSessionCallback(value); - const body = - input.status === "completed" - ? { - callId: callback.callId, - kind: "session.completed" as const, - output: input.output ?? "", - sessionId, - subagentName: callback.subagentName, - } - : { - callId: callback.callId, - error: { - code: "SESSION_FAILED", - message: toErrorMessage(input.error), - }, - kind: "session.failed" as const, - sessionId, - subagentName: callback.subagentName, - }; + let body: + | { + callId: string; + kind: "session.completed"; + output: unknown; + sessionId: string; + subagentName: string; + usage?: SessionCallbackUsage; + } + | { + callId: string; + error: { code: string; message: string }; + kind: "session.failed"; + sessionId: string; + subagentName: string; + }; + if (input.status === "completed") { + body = { + callId: callback.callId, + kind: "session.completed", + output: input.output ?? "", + sessionId, + subagentName: callback.subagentName, + }; + const usage = + input.sessionState !== undefined ? await readCompletedUsage(input.sessionState) : undefined; + if (usage !== undefined) { + body.usage = usage; + } + } else { + body = { + callId: callback.callId, + error: { + code: "SESSION_FAILED", + message: toErrorMessage(input.error), + }, + kind: "session.failed", + sessionId, + subagentName: callback.subagentName, + }; + } const response = await fetch(callback.url, { body: JSON.stringify(body), @@ -78,6 +109,26 @@ export async function fireSessionCallbackStep(input: { } } +async function readCompletedUsage( + state: DurableSessionState, +): Promise { + try { + const durable = await readDurableSession(state); + const turn = getTurnUsageState(durable.state); + if (turn === undefined) { + return undefined; + } + return { + inputTokens: turn.inputTokens, + outputTokens: turn.outputTokens, + cacheReadTokens: turn.cacheReadTokens, + }; + } catch (error) { + log.warn("failed to read remote-agent usage for session callback", { error }); + return undefined; + } +} + function parseSerializedSessionCallback(value: unknown): SessionCallback { const parsed = parseSessionCallback(value); if (!parsed.ok) { diff --git a/packages/eve/src/execution/workflow-entry.ts b/packages/eve/src/execution/workflow-entry.ts index dbc82e2aa..3aaa1cb80 100644 --- a/packages/eve/src/execution/workflow-entry.ts +++ b/packages/eve/src/execution/workflow-entry.ts @@ -292,6 +292,7 @@ async function finalizeDone(input: { error: failed ? output : undefined, output: failed ? undefined : output, serializedContext, + sessionState: failed ? undefined : input.action.sessionState, status: failed ? "failed" : "completed", }); await notifyDelegatedParentStep({ diff --git a/packages/eve/src/runtime/session-callback-route.test.ts b/packages/eve/src/runtime/session-callback-route.test.ts index 3f13b8644..57019573a 100644 --- a/packages/eve/src/runtime/session-callback-route.test.ts +++ b/packages/eve/src/runtime/session-callback-route.test.ts @@ -15,9 +15,25 @@ vi.mock("#compiled/@workflow/core/runtime.js", () => ({ resumeHook: (token: string, payload: unknown) => resumeHookMock(token, payload), })); +const startSpanMock = vi.fn(); +const endSpanMock = vi.fn(); + +vi.mock("#compiled/@opentelemetry/api/index.js", () => ({ + trace: { + getTracer: () => ({ + startSpan: (name: string, options: unknown) => { + startSpanMock(name, options); + return { end: endSpanMock }; + }, + }), + }, +})); + describe("session callback route", () => { beforeEach(() => { resumeHookMock.mockReset(); + startSpanMock.mockReset(); + endSpanMock.mockReset(); }); it("registers the POST framework callback route", () => { @@ -65,6 +81,87 @@ describe("session callback route", () => { }, ], }); + expect(startSpanMock).not.toHaveBeenCalled(); + }); + + it("emits an invoke_agent usage span when a completed callback reports usage", async () => { + resumeHookMock.mockResolvedValue(undefined); + + const response = await handleSessionCallbackRequest( + new Request("https://app.example.com/eve/v1/callback/tok123", { + body: JSON.stringify({ + callId: "call-1", + kind: "session.completed", + output: "done", + sessionId: "remote-session", + subagentName: "research", + usage: { inputTokens: 100, outputTokens: 50, cacheReadTokens: 10 }, + }), + method: "POST", + }), + createRouteContext({ token: "tok123" }), + ); + + expect(response.status).toBe(202); + expect(startSpanMock).toHaveBeenCalledWith("invoke_agent research", { + attributes: { + "gen_ai.operation.name": "invoke_agent", + "gen_ai.agent.name": "research", + "gen_ai.usage.input_tokens": 100, + "gen_ai.usage.output_tokens": 50, + "gen_ai.usage.cache_read.input_tokens": 10, + }, + }); + expect(endSpanMock).toHaveBeenCalledTimes(1); + expect(resumeHookMock).toHaveBeenCalledWith("tok123", { + kind: "runtime-action-result", + results: [ + { callId: "call-1", kind: "subagent-result", output: "done", subagentName: "research" }, + ], + }); + }); + + it("does not emit a usage span for a malformed usage payload", async () => { + resumeHookMock.mockResolvedValue(undefined); + + const response = await handleSessionCallbackRequest( + new Request("https://app.example.com/eve/v1/callback/tok123", { + body: JSON.stringify({ + callId: "call-1", + kind: "session.completed", + output: "done", + sessionId: "remote-session", + subagentName: "research", + usage: { inputTokens: "lots", outputTokens: 50, cacheReadTokens: 10 }, + }), + method: "POST", + }), + createRouteContext({ token: "tok123" }), + ); + + expect(response.status).toBe(202); + expect(startSpanMock).not.toHaveBeenCalled(); + }); + + it("does not emit a usage span for a failed callback", async () => { + resumeHookMock.mockResolvedValue(undefined); + + const response = await handleSessionCallbackRequest( + new Request("https://app.example.com/eve/v1/callback/tok123", { + body: JSON.stringify({ + callId: "call-1", + error: { code: "SESSION_FAILED", message: "boom" }, + kind: "session.failed", + sessionId: "remote-session", + subagentName: "research", + }), + method: "POST", + }), + createRouteContext({ token: "tok123" }), + ); + + expect(response.status).toBe(202); + expect(startSpanMock).not.toHaveBeenCalled(); }); }); diff --git a/packages/eve/src/runtime/session-callback-route.ts b/packages/eve/src/runtime/session-callback-route.ts index f7bc7fa18..c7aa0458f 100644 --- a/packages/eve/src/runtime/session-callback-route.ts +++ b/packages/eve/src/runtime/session-callback-route.ts @@ -1,4 +1,7 @@ +import { trace } from "#compiled/@opentelemetry/api/index.js"; import { resumeHook } from "#internal/workflow/runtime.js"; +import { z } from "#compiled/zod/index.js"; +import { createLogger } from "#internal/logging.js"; import { EVE_CALLBACK_ROUTE_PATTERN } from "#protocol/routes.js"; import type { ChannelMethod, RouteContext } from "#public/definitions/channel.js"; import type { ResolvedChannelDefinition } from "#runtime/types.js"; @@ -9,6 +12,16 @@ export const HTTP_SESSION_CALLBACK_CHANNEL_NAME_PREFIX = "eve/v1/callback"; const HANDLED_METHODS: readonly ChannelMethod[] = ["POST"]; +const log = createLogger("runtime.session-callback"); + +const sessionCallbackUsageSchema = z.object({ + inputTokens: z.number().int().nonnegative(), + outputTokens: z.number().int().nonnegative(), + cacheReadTokens: z.number().int().nonnegative(), +}); + +type SessionCallbackUsage = z.infer; + type SessionTerminalCallbackPayload = | { readonly callId: string; @@ -16,6 +29,7 @@ type SessionTerminalCallbackPayload = readonly output: string; readonly sessionId: string; readonly subagentName: string; + readonly usage?: SessionCallbackUsage; } | { readonly callId: string; @@ -71,6 +85,8 @@ export async function handleSessionCallbackRequest( return result; } + recordRemoteAgentUsageSpan(body); + try { await resumeHook(token, { kind: "runtime-action-result", @@ -83,6 +99,39 @@ export async function handleSessionCallbackRequest( return Response.json({ ok: true }, { status: 202 }); } +function recordRemoteAgentUsageSpan(body: unknown): void { + if (body === null || typeof body !== "object") { + return; + } + const payload = body as Partial; + if (payload.kind !== "session.completed" || typeof payload.subagentName !== "string") { + return; + } + const usage = parseSessionCallbackUsage((payload as { usage?: unknown }).usage); + if (usage === undefined) { + return; + } + try { + const span = trace.getTracer("eve").startSpan(`invoke_agent ${payload.subagentName}`, { + attributes: { + "gen_ai.operation.name": "invoke_agent", + "gen_ai.agent.name": payload.subagentName, + "gen_ai.usage.input_tokens": usage.inputTokens, + "gen_ai.usage.output_tokens": usage.outputTokens, + "gen_ai.usage.cache_read.input_tokens": usage.cacheReadTokens, + }, + }); + span.end(); + } catch (error) { + log.warn("failed to emit remote-agent usage span", { error }); + } +} + +function parseSessionCallbackUsage(value: unknown): SessionCallbackUsage | undefined { + const parsed = sessionCallbackUsageSchema.safeParse(value); + return parsed.success ? parsed.data : undefined; +} + function projectSessionCallbackResult( value: unknown, ): RuntimeSubagentResultActionResult | Response {