Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/remote-agent-usage-callback.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"eve": patch
---

Remote agents now report their token usage back to the caller. When a `defineRemoteAgent` task completes, the terminal callback carries the run's token totals, and the caller emits a local `invoke_agent` span (`gen_ai.operation.name=invoke_agent`, `gen_ai.agent.name`, `gen_ai.usage.*`) so caller-side observability can attribute a remote agent's tokens. Usage is best-effort and optional, so older callees keep working unchanged. Both the calling agent and the remote agent must run this version for remote usage to appear.
107 changes: 107 additions & 0 deletions packages/eve/src/execution/session-callback-step.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,37 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";

import {
readDurableSession,
type DurableSession,
type DurableSessionState,
} from "#execution/durable-session-store.js";
import { fireSessionCallbackStep } from "#execution/session-callback-step.js";
import type { SessionStateMap } from "#harness/types.js";

vi.mock("#execution/durable-session-store.js", () => ({
readDurableSession: vi.fn(),
}));

const readDurableSessionMock = vi.mocked(readDurableSession);

const TURN_USAGE_STATE_KEY = "eve.harness.turnUsage";
const SESSION_STATE = { sessionId: "remote-session" } as DurableSessionState;

function durableSessionWithState(state: SessionStateMap): DurableSession {
return {
agent: { system: "" },
continuationToken: "tok",
history: [],
sessionId: "remote-session",
state,
};
}

describe("fireSessionCallbackStep", () => {
let errorSpy: ReturnType<typeof vi.spyOn>;

beforeEach(() => {
readDurableSessionMock.mockReset();
errorSpy = vi.spyOn(console, "error").mockImplementation(() => {});
});

Expand Down Expand Up @@ -92,6 +118,79 @@ describe("fireSessionCallbackStep", () => {
});
});

it("includes token usage when the completed session reports it", async () => {
const fetchMock = vi.fn().mockResolvedValue(new Response(null, { status: 202 }));
vi.stubGlobal("fetch", fetchMock);
readDurableSessionMock.mockResolvedValue(
durableSessionWithState({
[TURN_USAGE_STATE_KEY]: {
turnId: "turn_0",
inputTokens: 100,
outputTokens: 50,
cacheReadTokens: 10,
cacheWriteTokens: 5,
},
}),
);

await fireSessionCallbackStep({
output: "done",
serializedContext: createSerializedContext(),
sessionState: SESSION_STATE,
status: "completed",
});

expect(fetchMock).toHaveBeenCalledWith("https://caller.example.com/eve/v1/callback/tok123", {
body: JSON.stringify({
callId: "call-1",
kind: "session.completed",
output: "done",
sessionId: "remote-session",
subagentName: "research",
usage: { inputTokens: 100, outputTokens: 50, cacheReadTokens: 10 },
}),
headers: {
"content-type": "application/json",
},
method: "POST",
redirect: "error",
signal: expect.any(AbortSignal),
});
expect(errorSpy).not.toHaveBeenCalled();
});

it("omits usage when the completed session reports none", async () => {
const fetchMock = vi.fn().mockResolvedValue(new Response(null, { status: 202 }));
vi.stubGlobal("fetch", fetchMock);
readDurableSessionMock.mockResolvedValue(durableSessionWithState({}));

await fireSessionCallbackStep({
output: "done",
serializedContext: createSerializedContext(),
sessionState: SESSION_STATE,
status: "completed",
});

expect(parsePostedBody(fetchMock).usage).toBeUndefined();
expect(errorSpy).not.toHaveBeenCalled();
});

it("still posts the callback when usage cannot be read", async () => {
const fetchMock = vi.fn().mockResolvedValue(new Response(null, { status: 202 }));
vi.stubGlobal("fetch", fetchMock);
readDurableSessionMock.mockRejectedValue(new Error("snapshot unavailable"));

await fireSessionCallbackStep({
output: "done",
serializedContext: createSerializedContext(),
sessionState: SESSION_STATE,
status: "completed",
});

expect(fetchMock).toHaveBeenCalledTimes(1);
expect(parsePostedBody(fetchMock).usage).toBeUndefined();
});

it("posts the failed callback with the normalized error message", async () => {
const fetchMock = vi.fn().mockResolvedValue(new Response(null, { status: 202 }));
vi.stubGlobal("fetch", fetchMock);
Expand Down Expand Up @@ -238,6 +337,14 @@ describe("fireSessionCallbackStep", () => {
});
});

function parsePostedBody(fetchMock: ReturnType<typeof vi.fn>): { usage?: unknown } {
const call = fetchMock.mock.calls[0];
if (call === undefined) {
throw new Error("expected fetch to have been called");
}
return JSON.parse((call[1] as { body: string }).body) as { usage?: unknown };
}

function createSerializedContext(): Record<string, unknown> {
return {
"eve.sessionCallback": {
Expand Down
89 changes: 70 additions & 19 deletions packages/eve/src/execution/session-callback-step.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import type { SessionCallback } from "#channel/types.js";
import { parseSessionCallback } from "#channel/session-callback.js";
import { SessionCallbackKey } from "#context/keys.js";
import { readDurableSession, type DurableSessionState } from "#execution/durable-session-store.js";
import { getTurnUsageState } from "#harness/turn-tag-state.js";
import { createLogger } from "#internal/logging.js";
import { toErrorMessage } from "#shared/errors.js";

const SESSION_CALLBACK_TIMEOUT_MS = 30_000;
const log = createLogger("execution.session-callback");

export interface SessionCallbackUsage {
readonly inputTokens: number;
readonly outputTokens: number;
readonly cacheReadTokens: number;
}

/**
* Sends the configured session terminal callback.
*
Expand All @@ -21,6 +29,7 @@ export async function fireSessionCallbackStep(input: {
readonly error?: unknown;
readonly output?: unknown;
readonly serializedContext: Record<string, unknown>;
readonly sessionState?: DurableSessionState;
readonly status: "completed" | "failed";
}): Promise<void> {
"use step";
Expand All @@ -33,25 +42,47 @@ export async function fireSessionCallbackStep(input: {

try {
const callback = parseSerializedSessionCallback(value);
const body =
input.status === "completed"
? {
callId: callback.callId,
kind: "session.completed" as const,
output: input.output ?? "",
sessionId,
subagentName: callback.subagentName,
}
: {
callId: callback.callId,
error: {
code: "SESSION_FAILED",
message: toErrorMessage(input.error),
},
kind: "session.failed" as const,
sessionId,
subagentName: callback.subagentName,
};
let body:
| {
callId: string;
kind: "session.completed";
output: unknown;
sessionId: string;
subagentName: string;
usage?: SessionCallbackUsage;
}
| {
callId: string;
error: { code: string; message: string };
kind: "session.failed";
sessionId: string;
subagentName: string;
};
if (input.status === "completed") {
body = {
callId: callback.callId,
kind: "session.completed",
output: input.output ?? "",
sessionId,
subagentName: callback.subagentName,
};
const usage =
input.sessionState !== undefined ? await readCompletedUsage(input.sessionState) : undefined;
if (usage !== undefined) {
body.usage = usage;
}
} else {
body = {
callId: callback.callId,
error: {
code: "SESSION_FAILED",
message: toErrorMessage(input.error),
},
kind: "session.failed",
sessionId,
subagentName: callback.subagentName,
};
}

const response = await fetch(callback.url, {
body: JSON.stringify(body),
Expand All @@ -78,6 +109,26 @@ export async function fireSessionCallbackStep(input: {
}
}

async function readCompletedUsage(
state: DurableSessionState,
): Promise<SessionCallbackUsage | undefined> {
try {
const durable = await readDurableSession(state);
const turn = getTurnUsageState(durable.state);
if (turn === undefined) {
return undefined;
}
return {
inputTokens: turn.inputTokens,
outputTokens: turn.outputTokens,
cacheReadTokens: turn.cacheReadTokens,
};
} catch (error) {
log.warn("failed to read remote-agent usage for session callback", { error });
return undefined;
}
}

function parseSerializedSessionCallback(value: unknown): SessionCallback {
const parsed = parseSessionCallback(value);
if (!parsed.ok) {
Expand Down
1 change: 1 addition & 0 deletions packages/eve/src/execution/workflow-entry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ async function finalizeDone(input: {
error: failed ? output : undefined,
output: failed ? undefined : output,
serializedContext,
sessionState: failed ? undefined : input.action.sessionState,
status: failed ? "failed" : "completed",
});
await notifyDelegatedParentStep({
Expand Down
97 changes: 97 additions & 0 deletions packages/eve/src/runtime/session-callback-route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,25 @@ vi.mock("#compiled/@workflow/core/runtime.js", () => ({
resumeHook: (token: string, payload: unknown) => resumeHookMock(token, payload),
}));

const startSpanMock = vi.fn();
const endSpanMock = vi.fn();

vi.mock("#compiled/@opentelemetry/api/index.js", () => ({
trace: {
getTracer: () => ({
startSpan: (name: string, options: unknown) => {
startSpanMock(name, options);
return { end: endSpanMock };
},
}),
},
}));

describe("session callback route", () => {
beforeEach(() => {
resumeHookMock.mockReset();
startSpanMock.mockReset();
endSpanMock.mockReset();
});

it("registers the POST framework callback route", () => {
Expand Down Expand Up @@ -65,6 +81,87 @@ describe("session callback route", () => {
},
],
});
expect(startSpanMock).not.toHaveBeenCalled();
});

it("emits an invoke_agent usage span when a completed callback reports usage", async () => {
resumeHookMock.mockResolvedValue(undefined);

const response = await handleSessionCallbackRequest(
new Request("https://app.example.com/eve/v1/callback/tok123", {
body: JSON.stringify({
callId: "call-1",
kind: "session.completed",
output: "done",
sessionId: "remote-session",
subagentName: "research",
usage: { inputTokens: 100, outputTokens: 50, cacheReadTokens: 10 },
}),
method: "POST",
}),
createRouteContext({ token: "tok123" }),
);

expect(response.status).toBe(202);
expect(startSpanMock).toHaveBeenCalledWith("invoke_agent research", {
attributes: {
"gen_ai.operation.name": "invoke_agent",
"gen_ai.agent.name": "research",
"gen_ai.usage.input_tokens": 100,
"gen_ai.usage.output_tokens": 50,
"gen_ai.usage.cache_read.input_tokens": 10,
},
});
expect(endSpanMock).toHaveBeenCalledTimes(1);
expect(resumeHookMock).toHaveBeenCalledWith("tok123", {
kind: "runtime-action-result",
results: [
{ callId: "call-1", kind: "subagent-result", output: "done", subagentName: "research" },
],
});
});

it("does not emit a usage span for a malformed usage payload", async () => {
resumeHookMock.mockResolvedValue(undefined);

const response = await handleSessionCallbackRequest(
new Request("https://app.example.com/eve/v1/callback/tok123", {
body: JSON.stringify({
callId: "call-1",
kind: "session.completed",
output: "done",
sessionId: "remote-session",
subagentName: "research",
usage: { inputTokens: "lots", outputTokens: 50, cacheReadTokens: 10 },
}),
method: "POST",
}),
createRouteContext({ token: "tok123" }),
);

expect(response.status).toBe(202);
expect(startSpanMock).not.toHaveBeenCalled();
});

it("does not emit a usage span for a failed callback", async () => {
resumeHookMock.mockResolvedValue(undefined);

const response = await handleSessionCallbackRequest(
new Request("https://app.example.com/eve/v1/callback/tok123", {
body: JSON.stringify({
callId: "call-1",
error: { code: "SESSION_FAILED", message: "boom" },
kind: "session.failed",
sessionId: "remote-session",
subagentName: "research",
}),
method: "POST",
}),
createRouteContext({ token: "tok123" }),
);

expect(response.status).toBe(202);
expect(startSpanMock).not.toHaveBeenCalled();
});
});

Expand Down
Loading
Loading