class AutoEvictMiddleware(AgentMiddleware):
def __init__(
self,
backend,
tool_token_limit_before_evict: int = 1000,
evict_dir: str = "/large_tool_results"
):
self.backend = backend
self.evict_dir = evict_dir
self.tool_token_limit_before_evict = tool_token_limit_before_evict
def _get_backend(self, runtime: ToolRuntime) -> BackendProtocol:
"""Get the resolved backend instance from backend or factory.
Args:
runtime: The tool runtime context.
Returns:
Resolved backend instance.
"""
if callable(self.backend):
return self.backend(runtime)
return self.backend
def _process_large_message(
self,
message: ToolMessage,
resolved_backend: BackendProtocol,
) -> tuple[ToolMessage, dict[str, FileData] | None]:
content = message.content
if not isinstance(content, str) or len(content) <= 4 * self.tool_token_limit_before_evict:
return message, None
sanitized_id = sanitize_tool_call_id(message.tool_call_id)
file_path = f"/large_tool_results/{sanitized_id}"
result = resolved_backend.write(file_path, content)
if result.error:
return message, None
content_sample = format_content_with_line_numbers([line[:1000] for line in content.splitlines()[:10]], start_line=1)
processed_message = ToolMessage(
TOO_LARGE_TOOL_MSG.format(
tool_call_id=message.tool_call_id,
file_path=file_path,
content_sample=content_sample,
),
tool_call_id=message.tool_call_id,
)
return processed_message, result.files_update
def _intercept_large_tool_result(self, tool_result: ToolMessage | Command, runtime: ToolRuntime) -> ToolMessage | Command:
if isinstance(tool_result, ToolMessage) and isinstance(tool_result.content, str):
if not (self.tool_token_limit_before_evict and len(tool_result.content) > 4 * self.tool_token_limit_before_evict):
return tool_result
resolved_backend = self._get_backend(runtime)
processed_message, files_update = self._process_large_message(
tool_result,
resolved_backend,
)
return (
Command(
update={
"files": files_update,
"messages": [processed_message],
}
)
if files_update is not None
else processed_message
)
if isinstance(tool_result, Command):
update = tool_result.update
if update is None:
return tool_result
command_messages = update.get("messages", [])
accumulated_file_updates = dict(update.get("files", {}))
resolved_backend = self._get_backend(runtime)
processed_messages = []
for message in command_messages:
if not (
self.tool_token_limit_before_evict
and isinstance(message, ToolMessage)
and isinstance(message.content, str)
and len(message.content) > 4 * self.tool_token_limit_before_evict
):
processed_messages.append(message)
continue
processed_message, files_update = self._process_large_message(
message,
resolved_backend,
)
processed_messages.append(processed_message)
if files_update is not None:
accumulated_file_updates.update(files_update)
return Command(update={**update, "messages": processed_messages, "files": accumulated_file_updates})
return tool_result
async def awrap_tool_call(
self,
request: ToolCallRequest,
handler: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]],
) -> ToolMessage | Command:
"""(async)Check the size of the tool call result and evict to filesystem if too large.
Args:
request: The tool call request being processed.
handler: The handler function to call with the modified request.
Returns:
The raw ToolMessage, or a pseudo tool message with the ToolResult in state.
"""
if self.tool_token_limit_before_evict is None or request.tool_call["name"] in TOOL_GENERATORS:
return await handler(request)
tool_result = await handler(request)
return self._intercept_large_tool_result(tool_result, request.runtime)