diff --git a/.changeset/mcp-stateful-sessions.md b/.changeset/mcp-stateful-sessions.md new file mode 100644 index 000000000..1fd1385b2 --- /dev/null +++ b/.changeset/mcp-stateful-sessions.md @@ -0,0 +1,5 @@ +--- +"eve": patch +--- + +Add opt-in `session: { mode: "stateful" }` support for MCP connections. Stateful MCP connections persist Streamable HTTP session metadata across eve step boundaries and reattach through the native AI SDK MCP session hooks, retrying with a fresh session when the server expires the saved one. diff --git a/docs/connections/mcp.mdx b/docs/connections/mcp.mdx index 5b4624c32..4f048afdf 100644 --- a/docs/connections/mcp.mdx +++ b/docs/connections/mcp.mdx @@ -54,8 +54,74 @@ MCP connections support the shared connection options: See [Connections](/docs/connections) for the shared auth, headers, and approval shapes. +## Stateful sessions + +By default, every eve step opens a new MCP client session. Keep that default for MCP servers whose tools are independent from call to call. + +For Streamable HTTP MCP servers that keep server-side session state, opt in with `session: { mode: "stateful" }`: + +```ts title="agent/connections/stateful-server.ts" +import { defineMcpClientConnection } from "eve/connections"; + +export default defineMcpClientConnection({ + url: "https://mcp.example.com/mcp", + description: "Stateful MCP server.", + session: { mode: "stateful" }, +}); +``` + +When the server negotiates an MCP session, eve stores the protocol session id and initialize result in framework-owned session state. On the next step, eve reattaches through the MCP Streamable HTTP session hooks instead of sending another `initialize` request. + +The stored session is scoped to the current eve session, connection name, and authenticated principal. If the MCP server returns `404` for a stored session, eve clears it and retries once with a fresh MCP session. + +### Pair stateful MCP with `defineState` + +`session: { mode: "stateful" }` is transport continuity. It preserves the MCP protocol session metadata that the server negotiated, and eve owns that metadata internally. Do not store raw MCP session ids in authored state. + +Use [`defineState`](../guides/state) for semantic application state that your agent understands, such as the workspace, project, or account the user selected. That state survives even if the upstream MCP server expires its protocol session and eve has to initialize a fresh one. + +For example, a stateful MCP server might remember the active workspace after a setup tool runs: + +```ts title="agent/connections/workspace-server.ts" +import { defineMcpClientConnection } from "eve/connections"; + +export default defineMcpClientConnection({ + url: "https://mcp.example.com/mcp", + description: "Workspace MCP server.", + session: { mode: "stateful" }, +}); +``` + +Then keep the user's chosen workspace in authored state, because that is a business fact rather than MCP transport metadata: + +```ts title="agent/lib/workspace-state.ts" +import { defineState } from "eve/context"; + +export const activeWorkspace = defineState("my-agent.active-workspace", () => ({ + workspaceId: null as string | null, +})); +``` + +```ts title="agent/tools/remember-workspace.ts" +import { defineTool } from "eve/tools"; +import { z } from "zod"; +import { activeWorkspace } from "../lib/workspace-state.js"; + +export default defineTool({ + description: "Remember the workspace the user selected for later MCP calls.", + inputSchema: z.object({ workspaceId: z.string() }), + execute({ workspaceId }) { + activeWorkspace.update(() => ({ workspaceId })); + return { workspaceId }; + }, +}); +``` + +With that split, normal follow-up MCP calls can reuse the server-side MCP session. If the server later returns `404` and eve reinitializes the MCP session, the agent still has the selected `workspaceId` in `defineState()` and can repeat the server's setup call or include the workspace id in future tool arguments. + ## What to read next - [OpenAPI connections](./openapi): generate tools from OpenAPI operations. +- [State](../guides/state): durable per-session memory with `defineState`. - [Auth & route protection](../guides/auth-and-route-protection): the full interactive-OAuth flow with Vercel Connect. - [Security model](../concepts/security-model): how connection credentials stay out of the model's reach. diff --git a/docs/guides/state.md b/docs/guides/state.md index 93532b786..2adf3ada3 100644 --- a/docs/guides/state.md +++ b/docs/guides/state.md @@ -71,6 +71,8 @@ Every [subagent](../subagents) starts with its own fresh state, whether it's a b `defineState` holds conversation-scoped working memory that lives and dies with the session, including counters, the current plan, and what the user has told you this conversation. It is the agent's short-term memory, persisted durably for the life of the session. Anything that has to outlive the session, be shared across sessions or users, or be queried independently of a turn belongs in an external store, either a [connection](../connections) or your own database. +Connection protocol metadata is separate from authored state. For example, a stateful [MCP connection](../connections/mcp#stateful-sessions) stores its negotiated MCP session id in framework-owned session state so eve can reattach across step boundaries. Use `defineState` for the durable semantic fact behind that workflow, such as the selected workspace id, not for raw protocol session ids. + ## What to read next - Read state inside dynamic resolvers → [Dynamic capabilities](./dynamic-capabilities) diff --git a/packages/eve/src/compiler/manifest.ts b/packages/eve/src/compiler/manifest.ts index 125303a17..4607b87fe 100644 --- a/packages/eve/src/compiler/manifest.ts +++ b/packages/eve/src/compiler/manifest.ts @@ -468,6 +468,16 @@ const compiledConnectionDefinitionSchema = z * runtime. */ protocol: z.enum(["mcp", "openapi"]).default("mcp"), + /** + * MCP session policy. Omitted for older manifests and for connections + * using the default stateless behavior. + */ + session: z + .object({ + mode: z.enum(["stateful", "stateless"]), + }) + .strict() + .optional(), sourceId: z.string(), sourceKind: z.literal("module"), /** diff --git a/packages/eve/src/compiler/normalize-connection.ts b/packages/eve/src/compiler/normalize-connection.ts index a2684ae7f..216fc909c 100644 --- a/packages/eve/src/compiler/normalize-connection.ts +++ b/packages/eve/src/compiler/normalize-connection.ts @@ -68,6 +68,7 @@ export async function compileConnectionDefinition( ...shared, description: normalized.description, protocol: "mcp", + session: normalized.session, url: normalized.url, }; auth = normalized.auth; diff --git a/packages/eve/src/context/providers/connection.test.ts b/packages/eve/src/context/providers/connection.test.ts new file mode 100644 index 000000000..8d0b273a4 --- /dev/null +++ b/packages/eve/src/context/providers/connection.test.ts @@ -0,0 +1,181 @@ +import { describe, expect, it } from "vitest"; + +import { ContextContainer } from "#context/container.js"; +import { AuthKey, type SessionAuthContext } from "#context/keys.js"; +import { connectionProvider } from "#context/providers/connection.js"; +import type { HarnessSession } from "#harness/types.js"; +import { createBundledRuntimeCompiledArtifactsSource } from "#runtime/compiled-artifacts-source.js"; +import { + mcpSessionStateKey, + type DurableMcpSessionState, + type McpSessionSlot, +} from "#runtime/connections/mcp-session-store.js"; +import { ConnectionRegistryImpl } from "#runtime/connections/registry.js"; +import { BundleKey, type CompiledBundle } from "#runtime/sessions/runtime-context-keys.js"; +import type { ResolvedConnectionDefinition } from "#runtime/types.js"; + +const initializeResult = { + capabilities: {}, + protocolVersion: "2025-11-25", + serverInfo: { name: "test-server", version: "1.0.0" }, +} as const; + +function durableState(sessionId: string): DurableMcpSessionState { + return { initializeResult, sessionId }; +} + +function createHarnessSession(state?: Record): HarnessSession { + return { + agent: { + modelReference: { id: "openai/gpt-5.4" }, + system: "", + tools: [], + }, + compaction: { + recentWindowSize: 0, + threshold: 0, + }, + continuationToken: "", + history: [], + sessionId: "session_1", + state, + }; +} + +function makeMcpConnection( + name: string, + overrides: Partial = {}, +): ResolvedConnectionDefinition { + return { + connectionName: name, + description: "test connection", + logicalPath: `connections/${name}.ts`, + protocol: "mcp", + sourceId: `connections/${name}`, + sourceKind: "module", + url: `https://example.com/${name}`, + ...overrides, + }; +} + +function createBundle(connections: readonly ResolvedConnectionDefinition[]): CompiledBundle { + return { + compiledArtifactsSource: createBundledRuntimeCompiledArtifactsSource(), + graph: { + root: { + agent: { + connections, + }, + nodeId: "__root__", + }, + }, + } as CompiledBundle; +} + +function userAuth(principalId: string, issuer = "test-issuer"): SessionAuthContext { + return { + attributes: {}, + authenticator: "test", + issuer, + principalId, + principalType: "user", + }; +} + +describe("connectionProvider.create", () => { + it("seeds stateful MCP slots from session.state", async () => { + const connectionName = "linear"; + const principalKey = "test-issuer:user-7"; + const stateKey = mcpSessionStateKey(connectionName, principalKey); + + const ctx = new ContextContainer(); + ctx.set( + BundleKey, + createBundle([makeMcpConnection(connectionName, { session: { mode: "stateful" } })]), + ); + ctx.set(AuthKey, userAuth("user-7")); + + const session = createHarnessSession({ [stateKey]: durableState("persisted-session") }); + const result = await connectionProvider.create(ctx, session); + + expect(result).toBeDefined(); + expect(result!.value.collectMcpSessionUpdates()).toEqual([]); + }); + + it('uses the "anonymous" state key when no AuthKey is set', async () => { + const connectionName = "anonymous-mcp"; + const stateKey = mcpSessionStateKey(connectionName, undefined); + + const ctx = new ContextContainer(); + ctx.set( + BundleKey, + createBundle([makeMcpConnection(connectionName, { session: { mode: "stateful" } })]), + ); + + const session = createHarnessSession({ [stateKey]: durableState("anon-session") }); + const result = await connectionProvider.create(ctx, session); + + expect(result).toBeDefined(); + expect(result!.value.collectMcpSessionUpdates()).toEqual([]); + }); + + it("returns undefined when there are no connections", () => { + const ctx = new ContextContainer(); + ctx.set(BundleKey, createBundle([])); + + expect(connectionProvider.create(ctx, createHarnessSession())).toBeUndefined(); + }); +}); + +describe("connectionProvider.commit", () => { + it("writes updated MCP session metadata into session.state", () => { + const connectionName = "linear"; + const stateKey = mcpSessionStateKey(connectionName, "test-issuer:user-42"); + const slot: McpSessionSlot = { + current: durableState("new-session"), + initial: durableState("old-session"), + stateKey, + }; + const registry = new ConnectionRegistryImpl( + [makeMcpConnection(connectionName, { session: { mode: "stateful" } })], + new Map([[connectionName, slot]]), + ); + + const session = createHarnessSession({ existingKey: "should-survive" }); + const committed = connectionProvider.commit!(registry, session) as HarnessSession; + + expect(committed.state?.[stateKey]).toEqual(durableState("new-session")); + expect(committed.state?.existingKey).toBe("should-survive"); + }); + + it("deletes expired MCP session metadata from session.state", () => { + const connectionName = "linear"; + const stateKey = mcpSessionStateKey(connectionName, "test-issuer:user-42"); + const slot: McpSessionSlot = { + initial: durableState("expired-session"), + stateKey, + }; + const registry = new ConnectionRegistryImpl( + [makeMcpConnection(connectionName, { session: { mode: "stateful" } })], + new Map([[connectionName, slot]]), + ); + + const session = createHarnessSession({ [stateKey]: durableState("expired-session") }); + const committed = connectionProvider.commit!(registry, session) as HarnessSession; + + expect(committed.state?.[stateKey]).toBeUndefined(); + }); + + it("returns the same session reference when no slot changed", () => { + const connectionName = "notion"; + const stateKey = mcpSessionStateKey(connectionName, "anonymous"); + const unchanged = durableState("same-session"); + const registry = new ConnectionRegistryImpl( + [makeMcpConnection(connectionName, { session: { mode: "stateful" } })], + new Map([[connectionName, { current: unchanged, initial: unchanged, stateKey }]]), + ); + const session = createHarnessSession({ [stateKey]: unchanged }); + + expect(connectionProvider.commit!(registry, session)).toBe(session); + }); +}); diff --git a/packages/eve/src/context/providers/connection.ts b/packages/eve/src/context/providers/connection.ts index 271f32627..565595d89 100644 --- a/packages/eve/src/context/providers/connection.ts +++ b/packages/eve/src/context/providers/connection.ts @@ -1,5 +1,11 @@ import { ContextKey } from "#context/key.js"; +import { AuthKey } from "#context/keys.js"; import { ConnectionRegistryImpl } from "#runtime/connections/registry.js"; +import { + mcpSessionStateKey, + readMcpSessionState, + type McpSessionSlot, +} from "#runtime/connections/mcp-session-store.js"; import type { ConnectionRegistry } from "#runtime/connections/types.js"; import { BundleKey } from "#runtime/sessions/runtime-context-keys.js"; import { getActiveRuntimeNode } from "#context/node.js"; @@ -17,13 +23,46 @@ export const ConnectionRegistryKey = new ContextKey("eve.con export const connectionProvider: FrameworkContextProvider = { key: ConnectionRegistryKey, - create(ctx, _session) { + create(ctx, session) { const bundle = ctx.get(BundleKey); if (bundle === undefined) return undefined; const node = getActiveRuntimeNode(ctx); const connections = node.agent?.connections; if (!connections || connections.length === 0) return undefined; - return { value: new ConnectionRegistryImpl(connections) }; + const auth = ctx.get(AuthKey); + const principalKey = + auth !== undefined && auth !== null ? `${auth.issuer}:${auth.principalId}` : undefined; + + const slots = new Map(); + for (const connection of connections) { + if (connection.protocol !== "mcp" || connection.session?.mode !== "stateful") { + continue; + } + const stateKey = mcpSessionStateKey(connection.connectionName, principalKey); + const persisted = readMcpSessionState(session.state?.[stateKey]); + slots.set(connection.connectionName, { + current: persisted, + initial: persisted, + stateKey, + }); + } + + return { value: new ConnectionRegistryImpl(connections, slots) }; + }, + + commit(registry, session) { + const updates = registry.collectMcpSessionUpdates(); + if (updates.length === 0) return session; + + const state: Record = { ...session.state }; + for (const update of updates) { + if (update.state === undefined) { + delete state[update.stateKey]; + } else { + state[update.stateKey] = update.state; + } + } + return { ...session, state }; }, }; diff --git a/packages/eve/src/internal/authored-definition/connection.test.ts b/packages/eve/src/internal/authored-definition/connection.test.ts index b6e078543..bb7735d8a 100644 --- a/packages/eve/src/internal/authored-definition/connection.test.ts +++ b/packages/eve/src/internal/authored-definition/connection.test.ts @@ -102,6 +102,15 @@ describe("normalizeMcpClientConnectionDefinition", () => { expect(result.url).toBe("http://localhost:3000/mcp"); }); + it("accepts stateful MCP session mode", () => { + const result = normalizeMcpClientConnectionDefinition( + validInput({ session: { mode: "stateful" } }), + MSG, + ); + + expect(result.session).toEqual({ mode: "stateful" }); + }); + it("preserves the optional vercelConnect marker on auth", () => { // The `connect()` helper from `@vercel/connect/eve` attaches a // `vercelConnect: { connector }` marker so downstream tooling can @@ -179,6 +188,20 @@ describe("normalizeMcpClientConnectionDefinition", () => { }); }); + describe("session validation", () => { + it("rejects unknown session modes", () => { + expect(() => + normalizeMcpClientConnectionDefinition(validInput({ session: { mode: "sticky" } }), MSG), + ).toThrow(/session\.mode.*must be "stateful" or "stateless"/); + }); + + it("rejects the legacy string session shape", () => { + expect(() => + normalizeMcpClientConnectionDefinition(validInput({ session: "stateful" }), MSG), + ).toThrow(/The "session" field.*must be an object/); + }); + }); + describe("description validation", () => { it("rejects a non-string description", () => { expect(() => diff --git a/packages/eve/src/internal/authored-definition/connection.ts b/packages/eve/src/internal/authored-definition/connection.ts index 059fe41f6..1874949c8 100644 --- a/packages/eve/src/internal/authored-definition/connection.ts +++ b/packages/eve/src/internal/authored-definition/connection.ts @@ -13,6 +13,7 @@ const KNOWN_TOP_LEVEL_KEYS = [ "auth", "description", "headers", + "session", "tools", "url", ] as const; @@ -62,6 +63,7 @@ export function normalizeMcpClientConnectionDefinition( const authorization = normalizeAuthorization(record, message); const headers = normalizeHeaders(record, message); + const session = normalizeMcpSession(record, message); const tools = normalizeToolFilter(record, message); if (authorization !== undefined && headers !== undefined && typeof headers !== "function") { @@ -84,6 +86,9 @@ export function normalizeMcpClientConnectionDefinition( if (headers !== undefined) { result.headers = headers; } + if (session !== undefined) { + result.session = session; + } if (tools !== undefined) { result.tools = tools; } @@ -98,6 +103,26 @@ export function normalizeMcpClientConnectionDefinition( return result; } +function normalizeMcpSession( + record: Record, + message: string, +): McpClientConnectionDefinition["session"] | undefined { + const session = record.session; + if (session === undefined) { + return undefined; + } + if (typeof session !== "object" || session === null || Array.isArray(session)) { + throw new Error(`${message} The "session" field must be an object with a "mode" field.`); + } + const sessionRecord = expectObjectRecord(session, `${message} The "session" field`); + expectOnlyKnownKeys(sessionRecord, ["mode"], `${message} The "session" field`); + + if (sessionRecord.mode !== "stateful" && sessionRecord.mode !== "stateless") { + throw new Error(`${message} The "session.mode" field must be "stateful" or "stateless".`); + } + return { mode: sessionRecord.mode }; +} + /** * Validates one authored OpenAPI connection module export at build time * and returns the public definition type. Mirrors diff --git a/packages/eve/src/public/definitions/connections/mcp.test.ts b/packages/eve/src/public/definitions/connections/mcp.test.ts index a2f0f7a46..69fa258ae 100644 --- a/packages/eve/src/public/definitions/connections/mcp.test.ts +++ b/packages/eve/src/public/definitions/connections/mcp.test.ts @@ -26,4 +26,14 @@ describe("defineMcpClientConnection", () => { expect(typeof definition.auth).toBe("function"); }); + + it("preserves the optional session mode", () => { + const definition = defineMcpClientConnection({ + description: "test connection", + session: { mode: "stateful" }, + url: "https://mcp.example.com", + }); + + expect(definition.session).toEqual({ mode: "stateful" }); + }); }); diff --git a/packages/eve/src/public/definitions/connections/mcp.ts b/packages/eve/src/public/definitions/connections/mcp.ts index e4e586c8b..0208b6c9d 100644 --- a/packages/eve/src/public/definitions/connections/mcp.ts +++ b/packages/eve/src/public/definitions/connections/mcp.ts @@ -1,6 +1,7 @@ import type { ConnectionAuthDefinition, HeadersDefinition, + McpSessionConfig, ToolFilterDefinition, } from "#runtime/connections/types.js"; import { normalizeAuthorizationSpec } from "#runtime/connections/validate-authorization.js"; @@ -80,6 +81,16 @@ export interface McpClientConnectionDefinition { * Specify exactly one of `allow` or `block`. */ tools?: ToolFilterDefinition; + /** + * MCP session policy for this connection. + * + * Omit for the default stateless behavior. Set `{ mode: "stateful" }` + * for MCP servers that attach server-side state to the negotiated session + * and expect later tool calls to reuse it. eve scopes the stored session + * metadata to the current eve session, connection, and authenticated + * principal. + */ + session?: McpSessionConfig; } /** diff --git a/packages/eve/src/runtime/connections/mcp-client.test.ts b/packages/eve/src/runtime/connections/mcp-client.test.ts index 0df8352f6..fb4b8ec53 100644 --- a/packages/eve/src/runtime/connections/mcp-client.test.ts +++ b/packages/eve/src/runtime/connections/mcp-client.test.ts @@ -9,6 +9,10 @@ import { import type { SessionContext } from "#public/definitions/callback-context.js"; import type { ResolvedConnectionDefinition } from "#runtime/types.js"; import { ConnectionAuthorizationTokensKey } from "#runtime/connections/authorization-tokens.js"; +import type { + DurableMcpSessionState, + McpSessionSlot, +} from "#runtime/connections/mcp-session-store.js"; import { isMcpAuthRequiredError, McpConnectionClient, @@ -68,6 +72,16 @@ function makeConnection( }; } +const initializeResult = { + capabilities: {}, + protocolVersion: "2025-11-25", + serverInfo: { name: "test-server", version: "1.0.0" }, +} as const; + +function durableState(sessionId: string): DurableMcpSessionState { + return { initializeResult, sessionId }; +} + describe("McpConnectionClient", () => { beforeEach(() => { createMCPClient.mockReset(); @@ -219,6 +233,145 @@ describe("McpConnectionClient", () => { }, }); }); + + it("reattaches a stateful HTTP client with persisted MCP session metadata", async () => { + const client = { + close: vi.fn(), + initializeResult, + listTools: vi.fn(), + toolsFromDefinitions: vi.fn(), + }; + createMCPClient.mockResolvedValue(client); + + const slot: McpSessionSlot = { + current: durableState("saved-session"), + initial: durableState("saved-session"), + stateKey: "eve.mcp.session.test.anonymous", + }; + const mcpClient = new McpConnectionClient( + makeConnection({ session: { mode: "stateful" } }), + slot, + ); + + await expect(mcpClient.connect()).resolves.toBe(client); + + expect(createMCPClient).toHaveBeenCalledWith({ + initialInitializeResult: initializeResult, + transport: expect.objectContaining({ + headers: { Authorization: "Bearer test-token" }, + initialProtocolVersion: "2025-11-25", + initialSessionId: "saved-session", + terminateSessionOnClose: false, + type: "http", + url: "https://mcp.example.com", + }), + }); + const transport = createMCPClient.mock.calls[0]?.[0]?.transport; + expect(transport.onSessionIdChange).toEqual(expect.any(Function)); + expect(transport.onSessionExpired).toEqual(expect.any(Function)); + expect(slot.current).toEqual(durableState("saved-session")); + }); + + it("captures a newly negotiated MCP session id with the initialize result", async () => { + const client = { + close: vi.fn(), + initializeResult, + listTools: vi.fn(), + toolsFromDefinitions: vi.fn(), + }; + createMCPClient.mockImplementation(async (config) => { + config.transport.onSessionIdChange?.("new-session"); + return client; + }); + + const slot: McpSessionSlot = { stateKey: "eve.mcp.session.test.anonymous" }; + const mcpClient = new McpConnectionClient( + makeConnection({ session: { mode: "stateful" } }), + slot, + ); + + await expect(mcpClient.connect()).resolves.toBe(client); + + expect(slot.current).toEqual(durableState("new-session")); + }); + + it("clears an expired saved session and retries client creation without replay metadata", async () => { + const client = { + close: vi.fn(), + initializeResult, + listTools: vi.fn(), + toolsFromDefinitions: vi.fn(), + }; + createMCPClient + .mockRejectedValueOnce( + Object.assign(new Error("MCP HTTP Transport Error (HTTP 404): expired"), { + statusCode: 404, + }), + ) + .mockImplementationOnce(async (config) => { + config.transport.onSessionIdChange?.("replacement-session"); + return client; + }); + + const slot: McpSessionSlot = { + current: durableState("expired-session"), + initial: durableState("expired-session"), + stateKey: "eve.mcp.session.test.anonymous", + }; + const mcpClient = new McpConnectionClient( + makeConnection({ session: { mode: "stateful" } }), + slot, + ); + + await expect(mcpClient.connect()).resolves.toBe(client); + + expect(createMCPClient).toHaveBeenCalledTimes(2); + expect(createMCPClient.mock.calls[0]?.[0]).toMatchObject({ + initialInitializeResult: initializeResult, + transport: { initialSessionId: "expired-session" }, + }); + expect(createMCPClient.mock.calls[1]?.[0]).not.toHaveProperty("initialInitializeResult"); + expect(createMCPClient.mock.calls[1]?.[0]?.transport.initialSessionId).toBeUndefined(); + expect(slot.current).toEqual(durableState("replacement-session")); + }); + + it("retries once with a fresh MCP session when a saved session expires during tool listing", async () => { + const expired = Object.assign(new Error("MCP HTTP Transport Error (HTTP 404): expired"), { + statusCode: 404, + }); + const firstClient = { + close: vi.fn(), + initializeResult, + listTools: vi.fn().mockRejectedValue(expired), + toolsFromDefinitions: vi.fn(), + }; + const secondClient = { + close: vi.fn(), + initializeResult, + listTools: vi.fn().mockResolvedValue({ tools: [] }), + toolsFromDefinitions: vi.fn().mockReturnValue({}), + }; + createMCPClient.mockResolvedValueOnce(firstClient).mockImplementationOnce(async (config) => { + config.transport.onSessionIdChange?.("replacement-session"); + return secondClient; + }); + + const slot: McpSessionSlot = { + current: durableState("expired-session"), + initial: durableState("expired-session"), + stateKey: "eve.mcp.session.test.anonymous", + }; + const mcpClient = new McpConnectionClient( + makeConnection({ session: { mode: "stateful" } }), + slot, + ); + + await expect(mcpClient.getToolMetadata()).resolves.toEqual([]); + + expect(firstClient.close).toHaveBeenCalledTimes(1); + expect(createMCPClient).toHaveBeenCalledTimes(2); + expect(slot.current).toEqual(durableState("replacement-session")); + }); }); describe("isMcpAuthRequiredError", () => { diff --git a/packages/eve/src/runtime/connections/mcp-client.ts b/packages/eve/src/runtime/connections/mcp-client.ts index cb046f456..e3753e9f4 100644 --- a/packages/eve/src/runtime/connections/mcp-client.ts +++ b/packages/eve/src/runtime/connections/mcp-client.ts @@ -1,10 +1,16 @@ -import { createMCPClient, type MCPClient } from "#compiled/@ai-sdk/mcp/index.js"; +import { + createMCPClient, + type InitializeResult, + type MCPClient, + type MCPClientConfig, +} from "#compiled/@ai-sdk/mcp/index.js"; import type { ToolSet } from "ai"; import { buildCallbackContext } from "#context/build-callback-context.js"; import { ConnectionAuthorizationRequiredError } from "#public/connections/errors.js"; import type { SessionContext } from "#public/definitions/callback-context.js"; import type { ResolvedConnectionDefinition } from "#runtime/types.js"; +import type { McpSessionSlot } from "#runtime/connections/mcp-session-store.js"; import { evictScopedToken, resolveScopedToken } from "#runtime/connections/scoped-authorization.js"; import { resolveConnectionAuthorization } from "#runtime/connections/resolve-authorization.js"; import { isObject } from "#shared/guards.js"; @@ -34,9 +40,11 @@ export class McpConnectionClient implements ConnectionClient { #toolsPromise: Promise | undefined; #tools: McpToolCache | undefined; #connection: ResolvedConnectionDefinition; + #sessionSlot: McpSessionSlot | undefined; - constructor(connection: ResolvedConnectionDefinition) { + constructor(connection: ResolvedConnectionDefinition, sessionSlot?: McpSessionSlot) { this.#connection = connection; + this.#sessionSlot = sessionSlot; } /** @@ -55,7 +63,7 @@ export class McpConnectionClient implements ConnectionClient { return this.#clientPromise; } - this.#clientPromise = this.#createClient(); + this.#clientPromise = this.#withSessionRetry(() => this.#createClient()); try { this.#client = await this.#clientPromise; return this.#client; @@ -68,19 +76,94 @@ export class McpConnectionClient implements ConnectionClient { async #createClient(): Promise { const headers = await resolveHeaders(this.#connection); const url = this.#connection.url; + const saved = this.#sessionSlot?.current; + const replayingSavedSession = saved !== undefined; try { - return await createMCPClient({ - transport: { type: "http", url, headers }, - }); + const config: MCPClientConfig = { + transport: { + headers, + type: "http", + url, + ...this.#statefulHttpSessionOptions(saved), + }, + }; + if (saved !== undefined) { + config.initialInitializeResult = saved.initializeResult; + } + const client = await createMCPClient(config); + this.#captureInitializedSession(client); + return client; } catch (error) { + if (replayingSavedSession && readHttpStatus(error) === 404) { + throw error; + } if (!isMcpHttpFallbackRetryableError(error)) { throw error; } - return await createMCPClient({ + const client = await createMCPClient({ transport: { type: "sse", url, headers }, }); + return client; + } + } + + #statefulHttpSessionOptions( + saved: { readonly initializeResult: InitializeResult; readonly sessionId: string } | undefined, + ): + | { + readonly initialProtocolVersion?: string; + readonly initialSessionId?: string; + readonly onSessionExpired: (sessionId: string) => void; + readonly onSessionIdChange: (sessionId: string | undefined) => void; + readonly terminateSessionOnClose: false; + } + | undefined { + const slot = this.#sessionSlot; + if (slot === undefined) { + return undefined; } + + return { + initialProtocolVersion: saved?.initializeResult.protocolVersion, + initialSessionId: saved?.sessionId, + onSessionExpired() { + slot.current = undefined; + slot.pendingSessionId = undefined; + }, + onSessionIdChange(sessionId) { + if (sessionId === undefined) { + slot.current = undefined; + slot.pendingSessionId = undefined; + return; + } + + const initializeResult = slot.current?.initializeResult ?? saved?.initializeResult; + if (initializeResult === undefined) { + slot.pendingSessionId = sessionId; + return; + } + + slot.current = { initializeResult, sessionId }; + slot.pendingSessionId = undefined; + }, + terminateSessionOnClose: false, + }; + } + + #captureInitializedSession(client: MCPClient): void { + const slot = this.#sessionSlot; + if (slot === undefined) { + return; + } + + const sessionId = slot.pendingSessionId ?? slot.current?.sessionId; + if (sessionId === undefined) { + return; + } + + slot.current = { initializeResult: client.initializeResult, sessionId }; + slot.pendingSessionId = undefined; } /** @@ -117,20 +200,22 @@ export class McpConnectionClient implements ConnectionClient { * opaque transport error. */ async executeTool(toolName: string, args: unknown): Promise { - try { - const { tools } = await this.#ensureTools(); - - const sdkTool = tools[toolName]; - if (sdkTool?.execute === undefined) { - throw new Error( - `Tool "${toolName}" not found in connection "${this.#connection.connectionName}".`, - ); + return await this.#withSessionRetry(async () => { + try { + const { tools } = await this.#ensureTools(); + + const sdkTool = tools[toolName]; + if (sdkTool?.execute === undefined) { + throw new Error( + `Tool "${toolName}" not found in connection "${this.#connection.connectionName}".`, + ); + } + + return await sdkTool.execute(args, {} as never); + } catch (error) { + return await this.#rethrowClassified(error); } - - return await sdkTool.execute(args, {} as never); - } catch (error) { - return await this.#rethrowClassified(error); - } + }); } async #ensureTools(): Promise { @@ -153,11 +238,13 @@ export class McpConnectionClient implements ConnectionClient { } async #fetchTools(): Promise { - try { - return await this.#fetchToolsInner(); - } catch (error) { - return await this.#rethrowClassified(error); - } + return await this.#withSessionRetry(async () => { + try { + return await this.#fetchToolsInner(); + } catch (error) { + return await this.#rethrowClassified(error); + } + }); } async #fetchToolsInner(): Promise { @@ -200,6 +287,21 @@ export class McpConnectionClient implements ConnectionClient { this.#tools = undefined; } + async #withSessionRetry(operation: () => Promise): Promise { + try { + return await operation(); + } catch (error) { + if (this.#sessionSlot === undefined || readHttpStatus(error) !== 404) { + throw error; + } + + this.#sessionSlot.current = undefined; + this.#sessionSlot.pendingSessionId = undefined; + await this.close(); + return await operation(); + } + } + /** * Always rethrows — this only classifies the error first. A non-auth * error (timeout, `5xx`, `403`, "tool not found", network failure) is diff --git a/packages/eve/src/runtime/connections/mcp-session-store.test.ts b/packages/eve/src/runtime/connections/mcp-session-store.test.ts new file mode 100644 index 000000000..8395ddd6d --- /dev/null +++ b/packages/eve/src/runtime/connections/mcp-session-store.test.ts @@ -0,0 +1,68 @@ +import { describe, expect, it } from "vitest"; + +import { + collectMcpSessionUpdates, + mcpSessionStateKey, + readMcpSessionState, + type DurableMcpSessionState, + type McpSessionSlot, +} from "#runtime/connections/mcp-session-store.js"; + +const initializeResult = { + capabilities: {}, + protocolVersion: "2025-11-25", + serverInfo: { name: "test-server", version: "1.0.0" }, +} as const; + +function state(sessionId: string): DurableMcpSessionState { + return { initializeResult, sessionId }; +} + +describe("mcpSessionStateKey", () => { + it("includes connection name and principal", () => { + expect(mcpSessionStateKey("linear", "issuer:user-1")).toBe( + "eve.mcp.session.linear.issuer:user-1", + ); + }); + + it('falls back to "anonymous" for no principal', () => { + expect(mcpSessionStateKey("linear", null)).toBe("eve.mcp.session.linear.anonymous"); + expect(mcpSessionStateKey("linear", undefined)).toBe("eve.mcp.session.linear.anonymous"); + }); +}); + +describe("readMcpSessionState", () => { + it("accepts stored session metadata", () => { + expect(readMcpSessionState(state("session-1"))).toEqual(state("session-1")); + }); + + it("ignores legacy or malformed stored values", () => { + expect(readMcpSessionState("session-1")).toBeUndefined(); + expect(readMcpSessionState({ sessionId: "session-1" })).toBeUndefined(); + expect( + readMcpSessionState({ + initializeResult: { capabilities: {}, protocolVersion: "2025-11-25" }, + sessionId: "session-1", + }), + ).toBeUndefined(); + }); +}); + +describe("collectMcpSessionUpdates", () => { + it("returns changed, created, and cleared session states", () => { + const unchanged = state("unchanged"); + const slots = new Map([ + ["created", { current: state("created"), stateKey: "k.created" }], + ["unchanged", { current: unchanged, initial: unchanged, stateKey: "k.unchanged" }], + ["rotated", { current: state("new"), initial: state("old"), stateKey: "k.rotated" }], + ["cleared", { initial: state("expired"), stateKey: "k.cleared" }], + ["empty", { stateKey: "k.empty" }], + ]); + + expect(collectMcpSessionUpdates(slots)).toEqual([ + { state: state("created"), stateKey: "k.created" }, + { state: state("new"), stateKey: "k.rotated" }, + { state: undefined, stateKey: "k.cleared" }, + ]); + }); +}); diff --git a/packages/eve/src/runtime/connections/mcp-session-store.ts b/packages/eve/src/runtime/connections/mcp-session-store.ts new file mode 100644 index 000000000..816bc039a --- /dev/null +++ b/packages/eve/src/runtime/connections/mcp-session-store.ts @@ -0,0 +1,91 @@ +import type { InitializeResult } from "#compiled/@ai-sdk/mcp/index.js"; +import { isObject } from "#shared/guards.js"; + +/** Namespace prefix for framework-owned MCP session entries in `session.state`. */ +export const MCP_SESSION_STATE_PREFIX = "eve.mcp.session"; + +/** + * Streamable HTTP session metadata needed to resume an MCP client without + * sending a second initialize request. + */ +export interface DurableMcpSessionState { + readonly initializeResult: InitializeResult; + readonly sessionId: string; +} + +/** + * Per-step, per-connection holder for stateful MCP session metadata. + */ +export interface McpSessionSlot { + readonly stateKey: string; + readonly initial?: DurableMcpSessionState; + current?: DurableMcpSessionState; + pendingSessionId?: string; +} + +/** Map of connection name to live session slot for one step. */ +export type McpSessionSlots = ReadonlyMap; + +/** A single durable session-state mutation applied by the provider. */ +export interface McpSessionUpdate { + readonly state?: DurableMcpSessionState; + readonly stateKey: string; +} + +export function mcpSessionStateKey( + connectionName: string, + principalKey: string | null | undefined, +): string { + return `${MCP_SESSION_STATE_PREFIX}.${connectionName}.${principalKey ?? "anonymous"}`; +} + +export function readMcpSessionState(value: unknown): DurableMcpSessionState | undefined { + if (!isObject(value) || typeof value.sessionId !== "string") { + return undefined; + } + if (!isInitializeResult(value.initializeResult)) { + return undefined; + } + return { + initializeResult: value.initializeResult, + sessionId: value.sessionId, + }; +} + +export function collectMcpSessionUpdates(slots: McpSessionSlots): readonly McpSessionUpdate[] { + const updates: McpSessionUpdate[] = []; + for (const slot of slots.values()) { + if (!sameMcpSessionState(slot.initial, slot.current)) { + updates.push({ state: slot.current, stateKey: slot.stateKey }); + } + } + return updates; +} + +function sameMcpSessionState( + left: DurableMcpSessionState | undefined, + right: DurableMcpSessionState | undefined, +): boolean { + if (left === right) { + return true; + } + if (left === undefined || right === undefined) { + return false; + } + return ( + left.sessionId === right.sessionId && + JSON.stringify(left.initializeResult) === JSON.stringify(right.initializeResult) + ); +} + +function isInitializeResult(value: unknown): value is InitializeResult { + if ( + !isObject(value) || + typeof value.protocolVersion !== "string" || + !isObject(value.capabilities) || + !isObject(value.serverInfo) + ) { + return false; + } + return typeof value.serverInfo.name === "string" && typeof value.serverInfo.version === "string"; +} diff --git a/packages/eve/src/runtime/connections/registry.test.ts b/packages/eve/src/runtime/connections/registry.test.ts index 5acdc8d2c..4895395cb 100644 --- a/packages/eve/src/runtime/connections/registry.test.ts +++ b/packages/eve/src/runtime/connections/registry.test.ts @@ -1,5 +1,9 @@ import { describe, expect, it } from "vitest"; +import type { + DurableMcpSessionState, + McpSessionSlot, +} from "#runtime/connections/mcp-session-store.js"; import type { ResolvedConnectionDefinition } from "#runtime/types.js"; import { ConnectionRegistryImpl } from "#runtime/connections/registry.js"; @@ -19,6 +23,16 @@ function makeConnection(name: string): ResolvedConnectionDefinition { }; } +const initializeResult = { + capabilities: {}, + protocolVersion: "2025-11-25", + serverInfo: { name: "test-server", version: "1.0.0" }, +} as const; + +function durableState(sessionId: string): DurableMcpSessionState { + return { initializeResult, sessionId }; +} + describe("ConnectionRegistryImpl", () => { it("returns connection names", () => { const registry = new ConnectionRegistryImpl([ @@ -86,4 +100,19 @@ describe("ConnectionRegistryImpl", () => { expect(before).not.toBe(after); }); + + it("reports MCP session updates from slots", () => { + const slot: McpSessionSlot = { + current: durableState("negotiated"), + stateKey: "eve.mcp.session.linear.anonymous", + }; + const registry = new ConnectionRegistryImpl( + [{ ...makeConnection("linear"), session: { mode: "stateful" } }], + new Map([["linear", slot]]), + ); + + expect(registry.collectMcpSessionUpdates()).toEqual([ + { state: durableState("negotiated"), stateKey: "eve.mcp.session.linear.anonymous" }, + ]); + }); }); diff --git a/packages/eve/src/runtime/connections/registry.ts b/packages/eve/src/runtime/connections/registry.ts index 71637fd8d..781977ebc 100644 --- a/packages/eve/src/runtime/connections/registry.ts +++ b/packages/eve/src/runtime/connections/registry.ts @@ -1,6 +1,11 @@ import type { Approval } from "#public/definitions/approval.js"; import type { ResolvedConnectionDefinition } from "#runtime/types.js"; import { McpConnectionClient } from "#runtime/connections/mcp-client.js"; +import { + collectMcpSessionUpdates, + type McpSessionSlots, + type McpSessionUpdate, +} from "#runtime/connections/mcp-session-store.js"; import { OpenApiConnectionClient } from "#runtime/connections/openapi-client.js"; import type { ConnectionClient, ConnectionRegistry } from "#runtime/connections/types.js"; @@ -14,9 +19,14 @@ import type { ConnectionClient, ConnectionRegistry } from "#runtime/connections/ export class ConnectionRegistryImpl implements ConnectionRegistry { #clients = new Map(); #connections: readonly ResolvedConnectionDefinition[]; + #sessionSlots: McpSessionSlots; - constructor(connections: readonly ResolvedConnectionDefinition[]) { + constructor( + connections: readonly ResolvedConnectionDefinition[], + sessionSlots?: McpSessionSlots, + ) { this.#connections = connections; + this.#sessionSlots = sessionSlots ?? new Map(); } /** @@ -37,7 +47,7 @@ export class ConnectionRegistryImpl implements ConnectionRegistry { const client: ConnectionClient = connection.protocol === "openapi" ? new OpenApiConnectionClient(connection) - : new McpConnectionClient(connection); + : new McpConnectionClient(connection, this.#sessionSlots.get(connectionName)); this.#clients.set(connectionName, client); return client; } @@ -65,6 +75,10 @@ export class ConnectionRegistryImpl implements ConnectionRegistry { return this.#connections; } + collectMcpSessionUpdates(): readonly McpSessionUpdate[] { + return collectMcpSessionUpdates(this.#sessionSlots); + } + /** * Closes all active client connections. */ diff --git a/packages/eve/src/runtime/connections/types.ts b/packages/eve/src/runtime/connections/types.ts index 8a281ecb9..125e30fb0 100644 --- a/packages/eve/src/runtime/connections/types.ts +++ b/packages/eve/src/runtime/connections/types.ts @@ -12,6 +12,7 @@ import type { ConnectionAuthorizationChallenge } from "#public/connections/error import type { Approval } from "#public/definitions/approval.js"; import type { SessionContext } from "#public/definitions/callback-context.js"; import type { JsonValue } from "#public/types/json.js"; +import type { McpSessionUpdate } from "#runtime/connections/mcp-session-store.js"; import type { ResolvedConnectionDefinition } from "#runtime/types.js"; /** @@ -40,6 +41,18 @@ export interface TokenResult { */ export type ConnectionProtocol = "mcp" | "openapi"; +/** + * MCP session policy for a connection. + * + * - `{ mode: "stateless" }` (default): every step opens and closes its own MCP client. + * - `{ mode: "stateful" }`: eve stores the Streamable HTTP session metadata in + * framework-owned session state and reattaches on later steps. + */ +export type McpSessionMode = "stateful" | "stateless"; +export interface McpSessionConfig { + readonly mode: McpSessionMode; +} + /** A single header value, supporting static strings and per-caller resolution. */ export type HeaderValue = | string @@ -439,6 +452,7 @@ export interface ConnectionClient { /** Per-session container mapping connection names to clients. */ export interface ConnectionRegistry { + collectMcpSessionUpdates(): readonly McpSessionUpdate[]; dispose(): Promise; getClient(connectionName: string): ConnectionClient; getConnectionApproval(connectionName: string): Approval | undefined; diff --git a/packages/eve/src/runtime/resolve-connection.test.ts b/packages/eve/src/runtime/resolve-connection.test.ts index d7cebe27f..3a9bbfbdc 100644 --- a/packages/eve/src/runtime/resolve-connection.test.ts +++ b/packages/eve/src/runtime/resolve-connection.test.ts @@ -45,4 +45,35 @@ describe("resolveConnectionDefinition", () => { expect(resolved.authorization).toBe(auth); expect(resolved.headers).toBe(headers); }); + + it("carries compiled MCP session mode through to the resolved definition", async () => { + const definition: CompiledConnectionDefinition = { + connectionName: "warehouse", + description: "Tenant warehouse", + logicalPath: "connections/warehouse.ts", + protocol: "mcp", + session: { mode: "stateful" }, + sourceId: "connections/warehouse", + sourceKind: "module", + url: "https://warehouse.example.com/mcp", + }; + const moduleMap: CompiledModuleMap = { + nodes: { + [ROOT_COMPILED_AGENT_NODE_ID]: { + modules: { + [definition.sourceId]: { + default: { + description: definition.description, + url: definition.url, + }, + }, + }, + }, + }, + }; + + const resolved = await resolveConnectionDefinition(definition, moduleMap, undefined); + + expect(resolved.session).toEqual({ mode: "stateful" }); + }); }); diff --git a/packages/eve/src/runtime/resolve-connection.ts b/packages/eve/src/runtime/resolve-connection.ts index bb43b2631..832b84a31 100644 --- a/packages/eve/src/runtime/resolve-connection.ts +++ b/packages/eve/src/runtime/resolve-connection.ts @@ -71,6 +71,7 @@ export async function resolveConnectionDefinition( headers?: Readonly; logicalPath: string; protocol: ResolvedConnectionDefinition["protocol"]; + session?: ResolvedConnectionDefinition["session"]; sourceId: string; sourceKind: "module"; spec?: ResolvedConnectionDefinition["spec"]; @@ -113,6 +114,10 @@ export async function resolveConnectionDefinition( result.tools = filter as Readonly; } + if (definition.protocol === "mcp" && definition.session !== undefined) { + result.session = definition.session; + } + if (definition.protocol === "openapi" && resolvedRecord.spec !== undefined) { result.spec = resolvedRecord.spec as ResolvedConnectionDefinition["spec"]; } diff --git a/packages/eve/src/runtime/types.ts b/packages/eve/src/runtime/types.ts index 4ec16f788..168198548 100644 --- a/packages/eve/src/runtime/types.ts +++ b/packages/eve/src/runtime/types.ts @@ -15,6 +15,7 @@ import type { ConnectionAuthResolver, ConnectionProtocol, HeadersDefinition, + McpSessionConfig, ToolFilterDefinition, } from "#runtime/connections/types.js"; import type { OpenAPISpecSource } from "#public/definitions/connections/openapi.js"; @@ -105,6 +106,11 @@ export interface ResolvedConnectionDefinition extends ResolvedModuleSourceRef { * OpenAPI connections). */ readonly protocol: ConnectionProtocol; + /** + * MCP session policy. Present only for MCP connections that opt out of the + * default stateless behavior. + */ + readonly session?: McpSessionConfig; /** * OpenAPI document source (URL or inline object). Present only for * `protocol: "openapi"` connections; the OpenAPI client fetches and