diff --git a/agent/middleware/apiBasedTools.ts b/agent/middleware/apiBasedTools.ts index 4f21f1a..500282c 100644 --- a/agent/middleware/apiBasedTools.ts +++ b/agent/middleware/apiBasedTools.ts @@ -1,4 +1,5 @@ import { ToolMessage } from "@langchain/core/messages"; +import { isGraphInterrupt } from "@langchain/langgraph"; import { createMiddleware } from "langchain"; import { logger, type AdminUser, type IAdminForth } from "adminforth"; import { @@ -13,6 +14,7 @@ import { ALWAYS_AVAILABLE_API_TOOL_NAMES } from "../tools/index.js"; import { createApiTool } from "../tools/apiTool.js"; import type { AgentEventEmitter } from "../../agentEvents.js"; import type { SequenceDebugCollector } from "./sequenceDebug.js"; +import { isAbortError } from "../../errors.js"; function getEnabledApiToolNames(messages: unknown[]) { const enabledToolNames = new Set(); @@ -82,8 +84,14 @@ export function createApiBasedToolsMiddleware( async wrapToolCall(request, handler) { const startedAt = Date.now(); const toolInput = JSON.stringify(request.toolCall.args ?? {}); - const { adminUser, emit, sequenceDebugSink, userTimeZone } = request.runtime.context as { + if (!request.toolCall.id) { + throw new Error(`Tool call "${request.toolCall.name}" has no id.`); + } + + const toolCallId = request.toolCall.id; + const { adminUser, abortSignal, emit, sequenceDebugSink, userTimeZone } = request.runtime.context as { adminUser: AdminUser; + abortSignal?: AbortSignal; emit?: AgentEventEmitter; sequenceDebugSink: SequenceDebugCollector; userTimeZone: string; @@ -113,7 +121,7 @@ export function createApiBasedToolsMiddleware( } const toolCallTracker = createToolCallTracker({ emit: emitToolCall, - toolCallId: request.toolCall.id, + toolCallId, toolName: request.toolCall.name, toolInfo, input: toolArgs, @@ -125,39 +133,37 @@ export function createApiBasedToolsMiddleware( ); try { - let result; - - if (request.tool) { - result = await handler(request); - } else { - const enabledApiToolNames = getEnabledApiToolNames(request.state.messages); - if (enabledApiToolNames.has(request.toolCall.name)) { - result = await handler({ + const result = getEnabledApiToolNames(request.state.messages).has(request.toolCall.name) + ? await handler({ ...request, tool: dynamicTools[request.toolCall.name], - }); - } else { - result = new ToolMessage({ - content: `Tool "${request.toolCall.name}" is not loaded. Call fetch_tool_schema first.`, - tool_call_id: request.toolCall.id ?? "", - name: request.toolCall.name, - status: "error", - }); - } - } + }) + : await handler(request); toolCallTracker.finishSuccess(result); return result; } catch (error) { - const errorDetails = - error instanceof Error ? error.stack ?? error.message : String(error); + if ( + isGraphInterrupt(error) + || abortSignal?.aborted + || isAbortError(error) + ) { + throw error; + } + + const message = error instanceof Error ? error.message : String(error); logger.error( - `Tool "${request.toolCall.name}" failed after ${Date.now() - startedAt}ms with input: ${toolInput}\n${errorDetails}`, + `Error calling tool "${request.toolCall.name}": ${error instanceof Error ? error.stack ?? error.message : String(error)}`, ); - toolCallTracker.finishError(error); - throw error; + toolCallTracker.finishError(`Error: ${message}`); + return new ToolMessage({ + name: request.toolCall.name, + tool_call_id: toolCallId, + status: "error", + content: `Error: ${message}`, + }) } finally { logger.info( `Tool "${request.toolCall.name}" finished in ${Date.now() - startedAt}ms`, diff --git a/agent/runtime/AgentRuntime.ts b/agent/runtime/AgentRuntime.ts index bdfd3f2..ef8a8f8 100644 --- a/agent/runtime/AgentRuntime.ts +++ b/agent/runtime/AgentRuntime.ts @@ -1,5 +1,5 @@ import type { IAdminForth } from "adminforth"; -import { createAgent, summarizationMiddleware } from "langchain"; +import { createAgent, summarizationMiddleware, humanInTheLoopMiddleware } from "langchain"; import type { BaseCheckpointSaver } from "@langchain/langgraph"; import { createApiBasedToolsMiddleware } from "../middleware/apiBasedTools.js"; import { createSequenceDebugMiddleware } from "../middleware/sequenceDebug.js"; @@ -7,6 +7,22 @@ import { createAgentLlmMetricsLogger } from "../simpleAgent.js"; import type { AgentToolProvider } from "../tools/AgentToolProvider.js"; import type { AgentRuntimeRunInput } from "../turn/turnTypes.js"; import { contextSchema, toLangchainAgentContext } from "./AgentContext.js"; +import type { ApiBasedTool } from "../../apiBasedTools.js"; + +function createHumanInTheLoopInterrupts( + apiBasedTools: Record, +): Record { + return Object.fromEntries( + Object.entries(apiBasedTools) + .filter(([, apiBasedTool]) => apiBasedTool.agent?.isDangerous === true) + .map(([toolName]) => [ + toolName, + { + allowedDecisions: ["approve", "reject"], + }, + ]), + ); +} export type AgentRuntimeOptions = { name: string; @@ -29,8 +45,13 @@ export class AgentRuntime { const sequenceDebugMiddleware = createSequenceDebugMiddleware( input.observability.sequenceDebugSink, ); + const hitlMiddleware = humanInTheLoopMiddleware({ + interruptOn: createHumanInTheLoopInterrupts(apiBasedTools), + descriptionPrefix: "Tool execution pending approval", + }); const middleware = [ apiBasedToolsMiddleware, + hitlMiddleware, ...(input.models.modelMiddleware ?? []), sequenceDebugMiddleware, summarizationMiddleware({ @@ -49,8 +70,8 @@ export class AgentRuntime { middleware, }); - return agent.stream({ messages: input.messages } as any, { - streamMode: "messages", + return agent.stream(input.input as any, { + streamMode: ["messages", "updates"], recursionLimit: 100, callbacks: [createAgentLlmMetricsLogger()], signal: input.context.abortSignal, diff --git a/agent/systemPrompt.ts b/agent/systemPrompt.ts index 473f2b3..010324d 100644 --- a/agent/systemPrompt.ts +++ b/agent/systemPrompt.ts @@ -25,11 +25,8 @@ export const DEFAULT_AGENT_SYSTEM_PROMPT = [ "Do not add extra explanations or suggestions unless the user asks.", "Adapt to the user's tone and style of speaking, mirroring their vibe and wording.", "if the user speaks casually, you should respond casually too", - "Never mutate data without user confirmation for a clearly described mutation plan.", - "One confirmation may cover one mutation or one explicitly described batch/sequence of related mutations.", - "If the confirmed plan has multiple steps, you may execute the whole confirmed plan without asking again between those steps.", - "If the plan changes, expands, or you want to do anything beyond the confirmed plan, ask for confirmation again.", - "Do not reuse an old confirmation for a new mutation plan.", + "Before calling a dangerous tool, briefly describe the exact action, target, and important changes in chat.", + "Do not ask the user for textual confirmation; dangerous tools are approved by the runtime approval UI.", ].join(" "); export function appendCustomSystemPrompt( @@ -124,7 +121,7 @@ export async function buildAgentSystemPrompt( "If the user wants to fetch records, load fetch_data first. If the user wants analytics or charts, load analyze_data first.", "Only call fetch_tool_schema for tool names that are explicitly mentioned in a fetched skill and are not already available as base tools.", "If a fetched skill lists a non-base tool you need, call fetch_tool_schema for it immediately instead of telling the user the tool is unavailable.", - "For example: for record creation load mutate_data, read its tool list, call fetch_tool_schema for create_record, and then use create_record after confirmation.", + "For example: for record creation load mutate_data, read its tool list, call fetch_tool_schema for create_record, describe the planned record, and then use create_record.", "When fetch_tool_schema succeeds, that tool becomes available on the next step.", "All admin links must be root-relative and start with '/'.", "Build record links as '/resource/{resourceId}/show/{primary key}'. Never use bare 'resource/{resourceId}/show/{primary key}' without the leading slash.", diff --git a/agent/turn/TurnLifecycleService.ts b/agent/turn/TurnLifecycleService.ts index 4bf315d..6987f1c 100644 --- a/agent/turn/TurnLifecycleService.ts +++ b/agent/turn/TurnLifecycleService.ts @@ -1,4 +1,5 @@ import type { AgentSessionStore } from "../../sessionStore.js"; +import type { PluginOptions } from "../../types.js"; import type { BaseAgentTurnInput } from "./turnTypes.js"; import { TurnPersistenceService } from "./TurnPersistenceService.js"; @@ -6,6 +7,7 @@ export class TurnLifecycleService { constructor( private readonly sessionStore: AgentSessionStore, private readonly persistence: TurnPersistenceService, + private readonly options: PluginOptions, ) {} async start(input: BaseAgentTurnInput) { @@ -19,6 +21,22 @@ export class TurnLifecycleService { }; } + async resume(input: BaseAgentTurnInput) { + const latestTurn = await this.sessionStore.getLatestTurn(input.sessionId); + + if (!latestTurn) { + throw new Error(`No agent turn found for session "${input.sessionId}".`); + } + + return { + turnId: latestTurn[this.options.turnResource.idField], + previousUserMessages: await this.sessionStore.getPreviousUserMessages(input.sessionId), + initialResponse: latestTurn[this.options.turnResource.responseField] === "not_finished" + ? "" + : String(latestTurn[this.options.turnResource.responseField]), + }; + } + async finish(input: { turnId: string; responseText: string; diff --git a/agent/turn/TurnStreamConsumer.ts b/agent/turn/TurnStreamConsumer.ts index 5e0344d..8f306a9 100644 --- a/agent/turn/TurnStreamConsumer.ts +++ b/agent/turn/TurnStreamConsumer.ts @@ -3,19 +3,27 @@ import { VegaLiteStreamBuffer } from "./VegaLiteStreamBuffer.js"; export class TurnStreamConsumer { async consume(input: { - stream: AsyncIterable<[any, any]>; + stream: AsyncIterable<["messages", [any, any]] | ["updates", Record]>; abortSignal?: AbortSignal; emit?: AgentEventEmitter; + onInterrupt?: (interrupt: unknown) => void | Promise; }) { let fullResponse = ""; const textBuffer = new VegaLiteStreamBuffer(); - for await (const rawChunk of input.stream) { + for await (const [mode, chunk] of input.stream) { if (input.abortSignal?.aborted) { throw new DOMException("This operation was aborted", "AbortError"); } - const [token, metadata] = rawChunk; + if (mode === "updates") { + if ("__interrupt__" in chunk) { + await input.onInterrupt?.(chunk.__interrupt__); + } + continue; + } + + const [token, metadata] = chunk; const nodeName = typeof metadata?.langgraph_node === "string" ? metadata.langgraph_node diff --git a/agent/turn/turnTypes.ts b/agent/turn/turnTypes.ts index 01813b4..a260297 100644 --- a/agent/turn/turnTypes.ts +++ b/agent/turn/turnTypes.ts @@ -1,5 +1,6 @@ import type { AdminUser, AudioAdapter } from "adminforth"; import type { Messages } from "@langchain/langgraph"; +import type { Command } from "@langchain/langgraph"; import type { AgentChatModel, AgentMiddleware } from "../simpleAgent.js"; import type { SequenceDebugCollector } from "../middleware/sequenceDebug.js"; import type { PreviousUserMessage } from "../languageDetect.js"; @@ -20,6 +21,7 @@ export type BaseAgentTurnInput = { export type TextAgentTurnInput = BaseAgentTurnInput & { emit: AgentEventEmitter; + approvalDecision?: "approve" | "reject"; failureLogMessage?: string; abortLogMessage?: string; }; @@ -60,6 +62,11 @@ export type PreparedAgentTurn = { modeName?: string | null; context: AgentTurnContext; observability: AgentTurnObservability; + resume?: { + decision: "approve" | "reject"; + interrupts?: { id: string; count: number }[]; + }; + initialResponse?: string; }; export type AgentTurnModels = { @@ -70,13 +77,14 @@ export type AgentTurnModels = { export type AgentRuntimeRunInput = { models: AgentTurnModels; - messages: Messages; + input: { messages: Messages } | Command; context: AgentTurnContext; observability: AgentTurnObservability; }; export type RunAndPersistAgentResponseInput = BaseAgentTurnInput & { emit?: AgentEventEmitter; + approvalDecision?: "approve" | "reject"; failureLogMessage: string; abortLogMessage: string; }; diff --git a/agentEvents.ts b/agentEvents.ts index 727d75d..1625cda 100644 --- a/agentEvents.ts +++ b/agentEvents.ts @@ -22,6 +22,11 @@ export type AgentEvent = phase: "start" | "end"; label: string; } + | { + type: "interrupt"; + sessionId: string; + interrupt: unknown; + } | { type: "open-page"; targetPath: string; diff --git a/agentTurnService.ts b/agentTurnService.ts index bb1accf..b49cc24 100644 --- a/agentTurnService.ts +++ b/agentTurnService.ts @@ -1,5 +1,6 @@ import { logger } from "adminforth"; import { randomUUID } from "crypto"; +import { Command } from "@langchain/langgraph"; import { AgentModelFactory } from "./agent/models/AgentModelFactory.js"; import { AgentModeResolver } from "./agent/models/AgentModeResolver.js"; import { createSequenceDebugCollector } from "./agent/middleware/sequenceDebug.js"; @@ -25,7 +26,94 @@ export type { RunAndPersistAgentResponseResult, } from "./agent/turn/turnTypes.js"; +function getApprovalDecision(input: BaseAgentTurnInput) { + return "approvalDecision" in input + && (input.approvalDecision === "approve" || input.approvalDecision === "reject") + ? input.approvalDecision + : undefined; +} + +function getInterruptItems(interrupt: unknown): unknown[] { + return Array.isArray(interrupt) ? interrupt : [interrupt]; +} + +function getHitlInterrupts(interrupt: unknown): { id: string; count: number }[] { + return getInterruptItems(interrupt).flatMap((item) => { + const value = item && typeof item === "object" && "value" in item + ? (item as { value: unknown }).value + : item; + const actionRequests = value && typeof value === "object" + ? (value as { actionRequests?: unknown }).actionRequests + : undefined; + const interruptId = item && typeof item === "object" + ? (item as { id?: unknown }).id + : undefined; + + return typeof interruptId === "string" && Array.isArray(actionRequests) + ? [{ id: interruptId, count: actionRequests.length }] + : []; + }); +} + +function buildHitlDecision(decision: "approve" | "reject", prompt?: string) { + if (decision === "approve") { + return { type: "approve" as const }; + } + + return { + type: "reject" as const, + message: prompt + ? `User rejected the pending tool execution and sent a new instruction instead: ${prompt}` + : "User rejected executing this tool", + }; +} + +function buildHitlResumeValue(input: { + decision: "approve" | "reject"; + count: number; + prompt?: string; +}) { + return { + decisions: Array.from({ length: input.count }, () => ( + buildHitlDecision(input.decision, input.prompt) + )), + }; +} + +function buildLangGraphResume(input: { + decision: "approve" | "reject"; + interrupts?: { id: string; count: number }[]; + prompt?: string; +}) { + const interrupts = input.interrupts ?? []; + + if (interrupts.length === 0) { + throw new Error("No pending approval interrupt found for resume."); + } + + if (interrupts.length === 1) { + return buildHitlResumeValue({ + decision: input.decision, + count: interrupts[0].count, + prompt: input.prompt, + }); + } + + return Object.fromEntries( + interrupts.map((interrupt) => [ + interrupt.id, + buildHitlResumeValue({ + decision: input.decision, + count: interrupt.count, + prompt: input.prompt, + }), + ]), + ); +} + export class AgentTurnService { + private readonly pendingInterrupts = new Map(); + constructor( private readonly lifecycle: TurnLifecycleService, private readonly contextBuilder: TurnContextBuilder, @@ -38,23 +126,40 @@ export class AgentTurnService { private async prepareTurn(input: BaseAgentTurnInput): Promise { const sequenceDebugCollector = createSequenceDebugCollector(); - const { turnId, previousUserMessages } = await this.lifecycle.start(input); + const approvalDecision = getApprovalDecision(input); + const shouldResume = Boolean(approvalDecision); + const pendingInterrupts = this.pendingInterrupts.get(input.sessionId); + if (shouldResume && (!pendingInterrupts || pendingInterrupts.length === 0)) { + throw new Error(`No pending approval interrupt found for session "${input.sessionId}".`); + } + const lifecycleTurn = shouldResume + ? await this.lifecycle.resume(input) + : await this.lifecycle.start(input); const context = await this.contextBuilder.build({ base: input, - turnId, + turnId: lifecycleTurn.turnId, }); return { prompt: input.prompt, sessionId: input.sessionId, - turnId, - previousUserMessages, + turnId: lifecycleTurn.turnId, + previousUserMessages: lifecycleTurn.previousUserMessages, modeName: input.modeName, context, observability: { emit: undefined, sequenceDebugSink: sequenceDebugCollector, }, + resume: shouldResume + ? { + decision: approvalDecision!, + interrupts: pendingInterrupts, + } + : undefined, + initialResponse: shouldResume && "initialResponse" in lifecycleTurn + ? (lifecycleTurn as { initialResponse?: string }).initialResponse + : undefined, }; } @@ -73,16 +178,56 @@ export class AgentTurnService { ]); const stream = await this.runtime.stream({ models, - messages, + input: input.resume + ? new Command({ + resume: buildLangGraphResume({ + decision: input.resume.decision, + interrupts: input.resume.interrupts, + prompt: input.prompt, + }), + }) + : { messages }, context: input.context, observability: input.observability, }); - return this.streamConsumer.consume({ - stream: stream as AsyncIterable<[any, any]>, - abortSignal: input.context.abortSignal, - emit: input.observability.emit, - }); + let interrupted = false; + try { + return await this.streamConsumer.consume({ + stream: stream as AsyncIterable<["messages", [any, any]] | ["updates", Record]>, + abortSignal: input.context.abortSignal, + emit: input.observability.emit, + onInterrupt: async (interrupt) => { + interrupted = true; + const interrupts = getHitlInterrupts(interrupt); + const pendingInterrupts = this.pendingInterrupts.get(input.sessionId) ?? []; + const mergedInterrupts = new Map( + pendingInterrupts.map((pendingInterrupt) => [ + pendingInterrupt.id, + pendingInterrupt.count, + ]), + ); + + for (const pendingInterrupt of interrupts) { + mergedInterrupts.set(pendingInterrupt.id, pendingInterrupt.count); + } + + this.pendingInterrupts.set( + input.sessionId, + [...mergedInterrupts.entries()].map(([id, count]) => ({ id, count })), + ); + await input.observability.emit?.({ + type: "interrupt", + sessionId: input.sessionId, + interrupt, + }); + }, + }); + } finally { + if (!interrupted) { + this.pendingInterrupts.delete(input.sessionId); + } + } } async runAndPersistAgentResponse( @@ -91,13 +236,13 @@ export class AgentTurnService { const preparedTurn = await this.prepareTurn(input); preparedTurn.observability.emit = input.emit; - let fullResponse = ""; + let fullResponse = preparedTurn.initialResponse ?? ""; let aborted = false; let failed = false; try { const agentResponse = await this.runAgentTurn(preparedTurn); - fullResponse = agentResponse.text; + fullResponse += agentResponse.text; } catch (error) { if (input.abortSignal?.aborted || isAbortError(error)) { aborted = true; @@ -138,6 +283,7 @@ export class AgentTurnService { currentPage: input.currentPage, chatSurface: input.chatSurface, adminPublicOrigin: input.adminPublicOrigin, + approvalDecision: input.approvalDecision, abortSignal: input.abortSignal, adminUser: input.adminUser, emit: input.emit, diff --git a/apiBasedTools.ts b/apiBasedTools.ts index e1f1c4e..0dfe9f2 100644 --- a/apiBasedTools.ts +++ b/apiBasedTools.ts @@ -39,10 +39,15 @@ type GetResourceDataToolResponse = { type DateTimeColumnType = AdminForthDataTypes.DATETIME | AdminForthDataTypes.TIME; type RegisteredApiToolSchema = IRegisteredApiSchema & { handler: (input: unknown) => void | Promise; + agent?: AgentToolMeta; }; const DEFAULT_USER_TIME_ZONE = 'UTC'; +type AgentToolMeta = { + isDangerous?: boolean; +}; + function hasRegisteredApiToolHandler(schema: IRegisteredApiSchema): schema is RegisteredApiToolSchema { return typeof (schema as { handler?: unknown }).handler === 'function'; } @@ -175,6 +180,7 @@ export type ApiBasedToolCallParams = { export type ApiBasedTool = { description?: string; input_schema?: unknown; + agent?: AgentToolMeta; call: (params?: ApiBasedToolCallParams) => Promise; }; @@ -615,6 +621,7 @@ export function prepareApiBasedTools( apiBasedTools[toolName] = { description: schema.description, input_schema: schema.request_schema, + agent: schema.agent, call: async ({ adminUser, adminuser, abortSignal, inputs, userTimeZone, acceptLanguage } = {}) => { if (isHiddenResourceCall(hiddenResourceIdSet, inputs)) { return YAML.stringify({ diff --git a/custom/ChatFooter.vue b/custom/ChatFooter.vue index ca2f47d..c484d8d 100644 --- a/custom/ChatFooter.vue +++ b/custom/ChatFooter.vue @@ -20,7 +20,8 @@ 'min-h-12 px-4 pt-4 rounded-xl w-full resize-none overflow-hidden text-lightInputText dark:text-darkInputText rounded-md bg-transparent text-sm bg-gray-50 dark:bg-gray-700 dark:border-gray-600 focus:outline-none', { '!text-base': coreStore.isIos } ]" - :placeholder="agentStore.userMessagePlaceholder" + :placeholder="agentStore.hasPendingToolApproval ? 'Approve or reject the pending action to continue' : agentStore.userMessagePlaceholder" + :disabled="agentStore.isMessageInputBlocked" @keydown.enter.exact.prevent="sendMessage" />
+ @@ -30,6 +34,7 @@ import { getMessageParts } from '../utils'; import ProcessingTimeline from './ProcessingTimeline.vue'; import SystemMessageRenderer from './SystemMessageRenderer.vue'; + import ToolApprovalRenderer from './ToolApprovalRenderer.vue'; import { RESERVED_SYSTEM_MESSAGE_CONTENT } from '../composables/agentStore/constants'; const props = defineProps<{ diff --git a/custom/conversation_area/ToolApprovalRenderer.vue b/custom/conversation_area/ToolApprovalRenderer.vue new file mode 100644 index 0000000..a3ea6e7 --- /dev/null +++ b/custom/conversation_area/ToolApprovalRenderer.vue @@ -0,0 +1,98 @@ + + + diff --git a/custom/skills/mutate_data/SKILL.md b/custom/skills/mutate_data/SKILL.md index ae65e56..8f16fac 100644 --- a/custom/skills/mutate_data/SKILL.md +++ b/custom/skills/mutate_data/SKILL.md @@ -16,41 +16,22 @@ Use `start_custom_action` and `start_custom_bulk_action` for resource actions. - if there is a dedicated action for some routine (result of `get_resource` tool call, field actions), prefer this to manual updating of records, for example, if you want to approve some comment, prefer calling `approve` action instead of updating `approved` field of comment record (because in action there might be some additional logic like sending notification to user, updating some counters and so on) -## Confirmation +## Mutation context -Before performing any state mutation including action calls edit/delete please fetch record which is going to be edited/deleted and show user record in format field → value (show several most important fields which can help user to understand what exactly record he is going to edit or delete). +Before performing any state mutation including action calls edit/delete please fetch record which is going to be edited/deleted and show user record in format field → value (show several most important fields which can help user to understand what exactly record he is going to edit or delete). -Every confirmation must describe one exact fetched row. Never combine `_label`, primary key, link, or field values from different rows or different resources in one confirmation. +Every mutation description must describe one exact fetched row. Never combine `_label`, primary key, link, or field values from different rows or different resources in one description. For field values with long texts show only several first words and add "..." at the end. Also please add related link to record with will be changed. Build it as `{ADMIN_BASE_PATH}resource/{resourceId}/show/{primary key}`. Use _label from `get_resource_data` as anchor text for link (use markdown link). Links should always be relative paths and must start with `ADMIN_BASE_PATH`. Do not add an extra slash after `ADMIN_BASE_PATH`. -Before sending the confirmation, verify that the `resourceId`, `{primary key}`, `_label`, and all shown fields come from the same exact fetched row. +Before calling the mutation tool, verify that the `resourceId`, `{primary key}`, `_label`, and all shown fields come from the same exact fetched row. -Never show information about more than 10 records in one message. If a mutation plan affects more than 10 records, show only the 10 most important examples plus the total count if known, and ask the user to confirm the clearly described full batch. +Never show information about more than 10 records in one message. If a mutation plan affects more than 10 records, show only the 10 most important examples plus the total count if known. -And in the same message ask user for final confirmation. +When creating new record, show user all data which you gonna create. -When creating new record, show user all data which you gona create and in same message ask for confirmation. - -Accept any positive confirmation from user like "yes", "sure", "+", anything non-negative call to action, can be considered as confirmation. - -A confirmation is valid only for the clearly described mutation plan from the immediately previous assistant message. - -Never reuse an older confirmation for a later mutation. - -One confirmation may cover: -- one single mutation -- one explicitly described batch -- one short sequence of related mutations that together implement the same user request - -If the confirmed plan contains several related mutation steps, execute that whole confirmed plan without asking again between those steps. - -Ask for confirmation again if the plan changes in any way: different record, different fields, different values, different number of records, different action, or any extra mutation that was not listed in the confirmation message. - -If you are creating or deleting multiple records in one batch, you may ask once for that exact batch, but list the whole batch explicitly in the confirmation message. Any extra record outside that described batch requires a new confirmation. - -After the confirmed plan is finished, do not treat that confirmation as still active for later requests. +Do not ask user for textual confirmation. Dangerous tools are approved by the runtime approval UI. # Calling actions @@ -60,7 +41,7 @@ Before calling any of this action you should understand whether this action is a ### Example -If you want to block some user you can confirm that this action by saying: +If you want to block some user you can describe the action by saying: ```I am going to block user: * Username: john_doe @@ -69,7 +50,6 @@ If you want to block some user you can confirm that this action by saying: * Currently blocked: No // show this field only if it exists in user record View [John Doe]({ADMIN_BASE_PATH}resource/users/show/123) -Are you sure? ``` ## Updating @@ -81,7 +61,7 @@ In addition to instructions above show user the table of edits (old value/new va ### Examples -For example if you gonna modify user record, in confirmation please share full user info (not only username but also email, ip country - anything which help adminto check that that is correct user). Message could look like this: +For example if you gonna modify user record, please share full user info (not only username but also email, ip country - anything which help adminto check that that is correct user). Message could look like this: ``` I am going to update user: @@ -91,8 +71,6 @@ I am going to update user: I am going to change email from john_doe@example.com to new_email@example.com View [John Doe]({ADMIN_BASE_PATH}resource/users/show/123) - -Are you sure? ``` @@ -102,7 +80,7 @@ To delete some record you can use `delete_record` tool. To delete record `allowe ### Example -If you gonna delete user record, in confirmation please share full user info (not only username but also email, ip country - anything which help adminto check that that is correct user). Message could look like this: +If you gonna delete user record, please share full user info (not only username but also email, ip country - anything which help adminto check that that is correct user). Message could look like this: ```I am going to delete user: * Username: john_doe @@ -111,8 +89,6 @@ If you gonna delete user record, in confirmation please share full user info (no * IP Country: USA View [John Doe]({ADMIN_BASE_PATH}resource/users/show/123) - -Are you sure? ``` ## Creating @@ -142,6 +118,4 @@ I am going to create user: * Email: john_doe@example.com View [John Doe]({ADMIN_BASE_PATH}resource/users/show/421) # 421 is id of new created record - -Are you sure? ``` diff --git a/custom/types.ts b/custom/types.ts index 711b6e7..47bdf30 100644 --- a/custom/types.ts +++ b/custom/types.ts @@ -1,15 +1,18 @@ export interface IPartData { toolCallId?: string; toolName?: string; + sessionId?: string; phase?: 'start' | 'end'; label?: string; input?: any; output?: any; durationMs?: number; toolInfo?: string; + status?: 'pending' | 'processing' | 'approved' | 'rejected'; + messages?: string[]; } export interface IPart { - type: 'reasoning' | 'data-tool-call' | 'data-rendering' | 'text'; + type: 'reasoning' | 'data-tool-call' | 'data-rendering' | 'data-tool-approval' | 'text'; text?: string; state?: 'started' | 'thinking' | 'processing' | 'streaming' | 'done'; data?: IPartData; diff --git a/endpoints/core.ts b/endpoints/core.ts index 882a1d5..8bce20a 100644 --- a/endpoints/core.ts +++ b/endpoints/core.ts @@ -20,6 +20,11 @@ const agentResponseBodySchema = z.object({ currentPage: z.custom().optional(), }).strict(); +const agentApprovalBodySchema = z.object({ + sessionId: z.string(), + decision: z.enum(["approve", "reject"]), +}).strict(); + const agentSpeechResponseBodySchema = agentResponseBodySchema.omit({ message: true }); export function setupCoreEndpoints(ctx: CoreEndpointsContext, server: IHttpServer) { @@ -71,6 +76,31 @@ export function setupCoreEndpoints(ctx: CoreEndpointsContext, server: IHttpServe } }); + server.endpoint({ + method: 'POST', + path: `/agent/approval`, + handler: async ({ body, adminUser, response, _raw_express_res, abortSignal }) => { + const data = ctx.parseBody(agentApprovalBodySchema, body, response); + if (!data) return; + const emit = createSseEventEmitter(_raw_express_res, { + vercelAiUiMessageStream: true, + closeActiveBlockOnToolStart: true, + }); + + await ctx.handleTurn({ + prompt: "", + sessionId: data.sessionId, + approvalDecision: data.decision, + abortSignal, + adminUser: adminUser!, + emit, + failureLogMessage: "Agent approval response streaming failed", + abortLogMessage: "Agent approval response streaming aborted by the client", + }); + return null; + } + }); + server.endpoint({ method: 'POST', path: `/agent/speech-response`, diff --git a/index.ts b/index.ts index 310860e..ee1dc41 100644 --- a/index.ts +++ b/index.ts @@ -85,7 +85,7 @@ export default class AdminForthAgentPlugin extends AdminForthPlugin { }); const persistence = new TurnPersistenceService(() => this.adminforth, this.options); this.agentTurnService = new AgentTurnService( - new TurnLifecycleService(this.sessionStore, persistence), + new TurnLifecycleService(this.sessionStore, persistence, this.options), new TurnContextBuilder(() => this.adminforth), new AgentModeResolver(this.options), new AgentModelFactory(this.options.maxTokens ?? 1000), diff --git a/package.json b/package.json index c167b7e..bed08a1 100644 --- a/package.json +++ b/package.json @@ -33,7 +33,7 @@ "@langchain/core": "^1.1.40", "@langchain/langgraph": "^1.2.8", "@langchain/langgraph-checkpoint": "^1.0.1", - "adminforth": "2.70.0", + "adminforth": "2.72.0", "dayjs": "^1.11.20", "langchain": "^1.3.3", "multer": "^2.1.1", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f528a8e..4d73e4b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -18,8 +18,8 @@ importers: specifier: ^1.0.1 version: 1.0.1(@langchain/core@1.1.40(openai@6.34.0(ws@8.20.0)(zod@4.3.6))(ws@8.20.0)) adminforth: - specifier: 2.70.0 - version: 2.70.0(@types/node@25.6.0)(typescript@5.9.3) + specifier: 2.72.0 + version: 2.72.0(@types/node@25.6.0)(typescript@5.9.3) dayjs: specifier: ^1.11.20 version: 1.11.20 @@ -464,8 +464,8 @@ packages: '@types/node@25.6.0': resolution: {integrity: sha512-+qIYRKdNYJwY3vRCZMdJbPLJAtGjQBudzZzdzwQYkEPQd+PJGixUL5QfvCLDaULoLv+RhT3LDkwEfKaAkgSmNQ==} - '@types/node@25.9.1': - resolution: {integrity: sha512-xfrlY7UD5rMJk3ZVJP8BNzS28J36YJg+xp+LPXV1TdWxr8uMH5A860QNxYDGQe/ylDSgjxE52Q9VnO7p75tJxg==} + '@types/node@25.9.3': + resolution: {integrity: sha512-603BddQMv3pUcr4U2dhujk83N2tTDVr/34wII2B6bJy6g+8WD6yUb11jszNs0gdi4PesVWl7ABt8nYMVpnLUcg==} '@types/normalize-package-data@2.4.4': resolution: {integrity: sha512-37i+OaWTh9qeK4LSHPsyRC7NahnGotNuZvjLSgcPzblpHB3rrCJxAOgI5gCdKm7coonsaX1Of0ILiTcnZjbfxA==} @@ -530,8 +530,8 @@ packages: resolution: {integrity: sha512-PYAthTa2m2VKxuvSD3DPC/Gy+U+sOA1LAuT8mkmRuvw+NACSaeXEQ+NHcVF7rONl6qcaxV3Uuemwawk+7+SJLw==} engines: {node: '>= 0.6'} - adminforth@2.70.0: - resolution: {integrity: sha512-E/zvEtAU7Woh3nLR7xrfgaVtH0Y+NAQnbSlZhTMpSpjrwwwGPAmE6lKsEaVAD7T+y1TvNhk+yCh/Mb6me3owzA==} + adminforth@2.72.0: + resolution: {integrity: sha512-GhJKCmVCKGqmMXAJ9zYmo0uDdvtZ7vsNkj5g3YOJ7qyLlUHnogzIGi7Zxz+k6gmhG5SYygRCpQ99W2xdnWmVoQ==} hasBin: true agent-base@7.1.4: @@ -3204,15 +3204,15 @@ snapshots: '@types/body-parser@1.19.6': dependencies: '@types/connect': 3.4.38 - '@types/node': 25.9.1 + '@types/node': 25.9.3 '@types/connect@3.4.38': dependencies: - '@types/node': 25.9.1 + '@types/node': 25.9.3 '@types/express-serve-static-core@4.19.8': dependencies: - '@types/node': 25.9.1 + '@types/node': 25.9.3 '@types/qs': 6.15.0 '@types/range-parser': 1.2.7 '@types/send': 1.2.1 @@ -3238,7 +3238,7 @@ snapshots: dependencies: undici-types: 7.19.2 - '@types/node@25.9.1': + '@types/node@25.9.3': dependencies: undici-types: 7.24.6 @@ -3251,16 +3251,16 @@ snapshots: '@types/send@0.17.6': dependencies: '@types/mime': 1.3.5 - '@types/node': 25.9.1 + '@types/node': 25.9.3 '@types/send@1.2.1': dependencies: - '@types/node': 25.9.1 + '@types/node': 25.9.3 '@types/serve-static@1.15.10': dependencies: '@types/http-errors': 2.0.5 - '@types/node': 25.9.1 + '@types/node': 25.9.3 '@types/send': 0.17.6 '@types/unist@2.0.11': {} @@ -3332,7 +3332,7 @@ snapshots: mime-types: 2.1.35 negotiator: 0.6.3 - adminforth@2.70.0(@types/node@25.6.0)(typescript@5.9.3): + adminforth@2.72.0(@types/node@25.6.0)(typescript@5.9.3): dependencies: '@babel/parser': 7.29.2 '@clickhouse/client': 1.18.2 diff --git a/sessionStore.ts b/sessionStore.ts index 0a288ea..dc16784 100644 --- a/sessionStore.ts +++ b/sessionStore.ts @@ -65,6 +65,17 @@ export class AgentSessionStore { })); } + async getLatestTurn(sessionId: string) { + const turns = await this.getAdminforth().resource(this.options.turnResource.resourceId).list( + [Filters.EQ(this.options.turnResource.sessionIdField, sessionId)], + 1, + undefined, + [Sorts.DESC(this.options.turnResource.createdAtField)] + ); + + return turns[0]; + } + getChatSurfaceSessionId(incoming: ChatSurfaceIncomingMessage) { return `${incoming.surface}:${incoming.externalConversationId}`; } diff --git a/surfaces/web-sse/createSseEventEmitter.ts b/surfaces/web-sse/createSseEventEmitter.ts index 7c7d041..9205c30 100644 --- a/surfaces/web-sse/createSseEventEmitter.ts +++ b/surfaces/web-sse/createSseEventEmitter.ts @@ -123,6 +123,17 @@ function createAgentEventStream( }); }, + interrupt(sessionId: string, interrupt: unknown) { + stream.endActiveBlock(); + stream.send({ + type: isAiUiMessageStream ? "data-interrupt" : "interrupt", + data: { + sessionId, + interrupt, + }, + }); + }, + openPage(targetPath: string) { stream.send({ type: isAiUiMessageStream ? "data-open-page" : "open-page", @@ -259,6 +270,9 @@ export function createSseEventEmitter( case "rendering": stream.rendering(event.phase, event.label); break; + case "interrupt": + stream.interrupt(event.sessionId, event.interrupt); + break; case "open-page": stream.openPage(event.targetPath); break;