From 645b892e203e7e31154017fcd5218fc368b15b14 Mon Sep 17 00:00:00 2001 From: hilr Date: Sat, 20 Jun 2026 04:30:51 +0000 Subject: [PATCH 1/2] fix: wire can_use_tool callback so AskUserQuestion/ExitPlanMode replies take effect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit bypassPermissions does not bypass requiresUserInteraction tools, so the CLI still sent can_use_tool requests for AskUserQuestion/ExitPlanMode. With no callback the SDK threw, the CLI treated it as a cancel, and the LLM fell back to defaults — ignoring the user's actual reply. Register a can_use_tool callback that allows normal tools (bypass) and, for the two user-interaction tools, parks on an asyncio.Future shared with run() until the user's reply arrives. AskUserQuestion answers are injected via updated_input (the CLI formats them into the tool_result); ExitPlanMode maps to allow / deny-with-feedback. Drops the stdin-hack _send_tool_result and _format_answers_for_tool_result; _build_plan_approval_result becomes _build_exit_plan_permission returning a PermissionResult. The flaky LLM-dependent resume test is replaced by deterministic future-mechanism and callback tests. Co-Authored-By: Claude Opus 4.7 --- src/agent_box/agents/claude_code.py | 334 ++++++++++++++++------------ tests/test_agents.py | 333 ++++++++++++++------------- 2 files changed, 351 insertions(+), 316 deletions(-) diff --git a/src/agent_box/agents/claude_code.py b/src/agent_box/agents/claude_code.py index fd3f147..d615af5 100644 --- a/src/agent_box/agents/claude_code.py +++ b/src/agent_box/agents/claude_code.py @@ -2,7 +2,7 @@ from __future__ import annotations -import json +import asyncio import logging import os import re @@ -16,10 +16,13 @@ AssistantMessage, ClaudeAgentOptions, ClaudeSDKClient, + PermissionResultAllow, + PermissionResultDeny, ResultMessage, SessionMessage, SystemMessage, TextBlock, + ToolPermissionContext, ToolUseBlock, get_session_messages, ) @@ -317,33 +320,14 @@ async def _parse_user_answer(questions: list[dict], user_reply: str) -> dict: return json.loads(text) -def _format_answers_for_tool_result(answers: dict) -> str: - """Render answers as the natural-language string Claude Code expects. - - Matches the CLI's own format: - ``User has answered your questions: ""="", ... You can now - continue with the user's answers in mind.`` - """ - pairs = [] - for question, answer in answers.items(): - if isinstance(answer, list): - answer = ", ".join(str(a) for a in answer) - pairs.append(f'"{question}"="{answer}"') - return ( - "User has answered your questions: " - + ", ".join(pairs) - + ". You can now continue with the user's answers in mind." - ) - - # ── ExitPlanMode approval parser ── # Words that count as approval / rejection when the user replies to a plan. # Matched as a prefix on the reply (case-insensitive) so Chinese phrases # without word boundaries (不要用 / 不行,) work just as well as English # ("Yes, please", "No, change step 2"). Anything that doesn't match either -# defaults to approval so the conversation doesn't stall — the plan is -# already visible to the user. +# defaults to rejection so the agent never runs a plan the user didn't +# explicitly approve. _PLAN_APPROVE_WORDS = ( "yes", "y", "ok", "okay", "approve", "approved", "lgtm", "是", "好的", "好", "可以", "同意", "批准", "行", @@ -354,13 +338,16 @@ def _format_answers_for_tool_result(answers: dict) -> str: ) -def _build_plan_approval_result(user_reply: str) -> str: - """Translate the user's Yes/No reply into the tool_result for ExitPlanMode. +def _build_exit_plan_permission( + user_reply: str, +) -> PermissionResultAllow | PermissionResultDeny: + """Translate the user's Yes/No reply into a permission decision for + ExitPlanMode. - - Approval word prefix → "User has approved your plan..." - - Rejection word prefix + following text → rejection with the rest as feedback - - Anything else → rejection with the reply as feedback (don't run a plan - the user didn't explicitly approve) + Returning ``allow`` lets the CLI run the tool, which exits plan mode and + emits ``"User has approved your plan..."``. Returning ``deny`` with a + message makes that message the ``is_error`` tool_result the LLM sees, so + it reads as rejection feedback. """ reply = (user_reply or "").strip() reply_lower = reply.lower() @@ -372,21 +359,24 @@ def _build_plan_approval_result(user_reply: str) -> str: ) if reject_match: feedback = reply[len(reject_match):].lstrip(" ::,,、").strip() - if feedback: - return ( - f"User rejected the plan. Feedback: {feedback}. " - "Please revise the plan based on this feedback and try again." - ) - return "User rejected the plan. Please revise and try again." + message = ( + f"User rejected the plan. Feedback: {feedback}. " + "Please revise the plan based on this feedback and try again." + if feedback + else "User rejected the plan. Please revise and try again." + ) + return PermissionResultDeny(behavior="deny", message=message) if any(reply_lower.startswith(w.lower()) for w in _PLAN_APPROVE_WORDS): - return "User has approved your plan. You can now start coding." - - # Ambiguous — default to rejection so the agent doesn't run with a plan - # the user didn't explicitly approve. The reply is attached as feedback. - return ( - f"User rejected the plan. Feedback: {reply}. " - "Please revise the plan based on this feedback and try again." + return PermissionResultAllow(behavior="allow") + + # Ambiguous — default to rejection with the reply as feedback. + return PermissionResultDeny( + behavior="deny", + message=( + f"User rejected the plan. Feedback: {reply}. " + "Please revise the plan based on this feedback and try again." + ), ) @@ -396,10 +386,12 @@ class ClaudeCodeAgent(BaseAgent): def __init__(self, project: ProjectInfo) -> None: super().__init__(project) self._client: ClaudeSDKClient | None = None - # When the agent calls AskUserQuestion (or similar), we store the - # tool_use_id here. The *next* ``run()`` call sends the user's - # message as a ``tool_result`` instead of a fresh query. - self._pending_ask: dict[str, Any] | None = None + # When the agent calls AskUserQuestion / ExitPlanMode, the + # ``can_use_tool`` callback parks here on an ``asyncio.Future``. + # ``run()`` surfaces the question to the IM channel and returns; + # the next ``run()`` call resolves the future with the user's reply, + # unblocking the callback so it can return the PermissionResult. + self._pending_permission: dict[str, Any] | None = None def _build_options(self) -> ClaudeAgentOptions: opts = ClaudeAgentOptions( @@ -413,6 +405,7 @@ def _build_options(self) -> ClaudeAgentOptions: "preset": "claude_code", "append": _SEND_FILE_INSTRUCTION, }, + can_use_tool=self._can_use_tool, stderr=lambda line: log.warning("claude stderr [%s]: %s", self.project.name, line.rstrip()), ) if self.project.session_id: @@ -519,43 +512,109 @@ def _format_question(block: ToolUseBlock) -> str: return question - async def _send_tool_result( + # ------------------------------------------------------------------ + # Permission callback (can_use_tool) + # ------------------------------------------------------------------ + + def _get_or_create_permission_future( self, - client: ClaudeSDKClient, + tool_name: str, tool_use_id: str, - answer: str, - session_id: str, - ) -> None: - """Write a ``tool_result`` message to CLI stdin so it can continue.""" - message = { - "type": "user", - "message": { - "role": "user", - "content": [ - { - "type": "tool_result", - "tool_use_id": tool_use_id, - "content": answer, - } - ], - }, - "parent_tool_use_id": None, - "session_id": session_id or "", + input: dict[str, Any], + ) -> dict[str, Any]: + """Return the pending-permission entry for *tool_use_id*, creating + it if neither ``run()`` nor the callback has yet. + + Both the block detection in ``run()`` (which sees the AssistantMessage + first) and the ``can_use_tool`` callback (which the CLI fires right + after) can win the race; whichever fires first creates the entry and + the other reuses it, so there is exactly one shared future. + """ + existing = self._pending_permission + if existing is not None and existing.get("tool_use_id") == tool_use_id: + return existing + questions = ( + input.get("questions", []) if tool_name == "AskUserQuestion" else [] + ) + self._pending_permission = { + "tool_name": tool_name, + "tool_use_id": tool_use_id, + "input": input, + "questions": questions, + "future": asyncio.get_running_loop().create_future(), } - # Use the transport directly — ``client.query()`` only sends plain - # text user messages, but we need to attach a tool_result payload. - payload = json.dumps(message) + "\n" + return self._pending_permission + + async def _can_use_tool( + self, + tool_name: str, + input: dict[str, Any], + context: ToolPermissionContext, + ) -> PermissionResultAllow | PermissionResultDeny: + """Permission callback wired into ``ClaudeAgentOptions``. + + Normal tools are allowed unconditionally (the agent runs in + ``bypassPermissions``). The user-interaction tools — AskUserQuestion + and ExitPlanMode — still trigger a ``can_use_tool`` request from the + CLI even in bypass mode, so we intercept them here: park on a future + until the user's reply arrives via the next ``run()`` call, then + return the matching permission decision. + """ + if tool_name not in _TOOLS_REQUIRING_USER_INPUT: + return PermissionResultAllow(behavior="allow") + + pending = self._get_or_create_permission_future( + tool_name, context.tool_use_id, input, + ) + future: asyncio.Future = pending["future"] log.info( - "sending tool_result to CLI: tool_use_id=%s session_id=%s content_preview=%r", - tool_use_id, session_id or "(none)", answer[:200], + "can_use_tool: awaiting user reply for %s tool_use_id=%s", + tool_name, context.tool_use_id, + ) + try: + reply = await future + except asyncio.CancelledError: + # Agent shutting down — deny so the CLI unblocks cleanly. + self._pending_permission = None + return PermissionResultDeny( + behavior="deny", message="Agent shutting down", + ) + self._pending_permission = None + log.info( + "can_use_tool: got user reply (%d chars) for %s, building result", + len(reply or ""), tool_name, + ) + + if tool_name == "ExitPlanMode": + return _build_exit_plan_permission(reply) + + # AskUserQuestion — map free-text reply to structured answers via the + # small model, then inject them as ``updatedInput.answers``. The CLI's + # tool execution formats them into the tool_result the LLM sees. + questions = pending.get("questions", []) + try: + parsed = await _parse_user_answer(questions, reply) + answers = parsed.get("answers", {}) + except Exception: + log.warning( + "failed to parse AskUserQuestion answer with LLM, " + "falling back to raw user reply", + exc_info=True, + ) + answers = ( + {q.get("question", f"question_{i}"): reply + for i, q in enumerate(questions) if isinstance(q, dict)} + or {"question": reply} + ) + log.info("can_use_tool: parsed AskUserQuestion answers: %s", answers) + return PermissionResultAllow( + behavior="allow", updated_input={**input, "answers": answers}, ) - await client._transport.write(payload) - log.info("tool_result written to CLI stdin for tool_use_id=%s", tool_use_id) @property def has_pending_question(self) -> bool: """True when the agent asked a question and is waiting for user reply.""" - return self._pending_ask is not None + return self._pending_permission is not None # ------------------------------------------------------------------ # Main run loop @@ -565,55 +624,34 @@ async def run(self, prompt: str, user_id: str = "", channel: str = "") -> AsyncI client = await self._ensure_client() # Diagnostic: trace every run() entry — what prompt, what pending state. - pending_kind = self._pending_ask.get("kind") if self._pending_ask else None - pending_tool = self._pending_ask.get("tool_use_id") if self._pending_ask else None + pending_tool = ( + self._pending_permission.get("tool_name") + if self._pending_permission else None + ) log.info( - "run() entry: project=%s user=%s channel=%s prompt_preview=%r pending_kind=%s pending_tool_use_id=%s", + "run() entry: project=%s user=%s channel=%s prompt_preview=%r pending_permission=%s", self.project.name, user_id, channel, - (prompt or "")[:200], pending_kind, pending_tool, + (prompt or "")[:200], pending_tool, ) - # If a previous run() paused with a pending AskUserQuestion or - # ExitPlanMode, the current prompt is the user's answer — send it - # as a tool_result so the blocked CLI can continue. - if self._pending_ask is not None: - ask = self._pending_ask - self._pending_ask = None - kind = ask.get("kind", "question") + if self._pending_permission is not None: + # Resume: the prompt is the user's reply to a pending permission. + # Resolve the future so the ``can_use_tool`` callback can return + # its PermissionResult and unblock the CLI. Do NOT send a fresh + # query — the CLI is mid-turn, still waiting on the permission + # response. + pending = self._pending_permission log.info( - "consuming pending ask: kind=%s tool_use_id=%s session_id=%s user_reply_preview=%r", - kind, ask["tool_use_id"], ask.get("session_id") or "(none)", + "run(): resolving pending permission future with user reply " + "(tool=%s tool_use_id=%s reply_preview=%r)", + pending["tool_name"], pending["tool_use_id"], (prompt or "")[:200], ) - - if kind == "exit_plan_mode": - content = _build_plan_approval_result(prompt) - log.info("ExitPlanMode reply parsed → tool_result content: %r", content[:200]) - else: - questions = ask.get("questions", []) - try: - parsed = await _parse_user_answer(questions, prompt) - content = _format_answers_for_tool_result(parsed["answers"]) - log.info("parsed AskUserQuestion answers: %s", parsed["answers"]) - except Exception: - log.warning( - "failed to parse AskUserQuestion answer with LLM, " - "falling back to raw user reply", - exc_info=True, - ) - content = ( - "User has answered your questions: " - f"{prompt}. You can now continue with the user's " - "answers in mind." - ) - await self._send_tool_result( - client, - tool_use_id=ask["tool_use_id"], - answer=content, - session_id=ask.get("session_id") or "", - ) + future: asyncio.Future = pending["future"] + if not future.done(): + future.set_result(prompt) else: - log.info("no pending ask — sending prompt as fresh query to CLI") + log.info("run(): no pending permission — sending prompt as fresh query") await client.query(prompt) # Build prefixes to strip from file paths in tool summaries. @@ -625,11 +663,13 @@ async def run(self, prompt: str, user_id: str = "", channel: str = "") -> AsyncI if isinstance(msg, AssistantMessage): for block in msg.content: # --- Tools that require user interaction --- - # Both AskUserQuestion and ExitPlanMode block the CLI - # waiting for a tool_result, so we surface their content - # to the IM channel, set _pending_ask, and return. The - # next run() with the user's reply feeds back the - # tool_result so the CLI can continue. + # AskUserQuestion and ExitPlanMode trigger a ``can_use_tool`` + # request from the CLI even in bypassPermissions mode (they + # are ``requiresUserInteraction`` tools). We surface the + # question/plan to the IM channel, ensure a pending future + # exists (shared with the callback), and return so the user + # can reply. The next run() resolves the future; the + # ``_can_use_tool`` callback then returns the PermissionResult. if ( isinstance(block, ToolUseBlock) and block.name in _TOOLS_REQUIRING_USER_INPUT @@ -641,9 +681,9 @@ async def run(self, prompt: str, user_id: str = "", channel: str = "") -> AsyncI body = "(agent 请求退出 plan 模式,但没有提供计划内容)" text = body + _PLAN_APPROVAL_HINT log.info( - "ExitPlanMode detected — setting pending ask: " - "tool_use_id=%s session_id=%s", - block.id, msg.session_id, + "ExitPlanMode detected — surfacing plan, " + "tool_use_id=%s", + block.id, ) yield OutgoingMessage( text=text, @@ -652,22 +692,20 @@ async def run(self, prompt: str, user_id: str = "", channel: str = "") -> AsyncI type=MessageType.text, data={"id": block.id, "name": block.name, "input": block.input}, ) - self._pending_ask = { - "tool_use_id": block.id, - "session_id": msg.session_id, - "kind": "exit_plan_mode", - } + self._get_or_create_permission_future( + "ExitPlanMode", block.id, block.input or {}, + ) log.info( - "ExitPlanMode pending ask saved, returning from run() — " + "ExitPlanMode pending permission saved, returning from run() — " "waiting for user reply to approve/reject" ) return # Stop yielding; next run() will resume # AskUserQuestion log.info( - "AskUserQuestion detected — setting pending ask: " - "tool_use_id=%s session_id=%s input=%s", - block.id, msg.session_id, block.input, + "AskUserQuestion detected — surfacing question, " + "tool_use_id=%s input=%s", + block.id, block.input, ) question_text = self._format_question(block) yield OutgoingMessage( @@ -676,14 +714,11 @@ async def run(self, prompt: str, user_id: str = "", channel: str = "") -> AsyncI channel=channel, type=MessageType.text, ) - self._pending_ask = { - "tool_use_id": block.id, - "session_id": msg.session_id, - "questions": block.input.get("questions", []) if block.input else [], - "kind": "question", - } + self._get_or_create_permission_future( + "AskUserQuestion", block.id, block.input or {}, + ) log.info( - "AskUserQuestion pending ask saved, returning from run() — " + "AskUserQuestion pending permission saved, returning from run() — " "waiting for user reply (question_preview=%r)", question_text[:200], ) @@ -798,9 +833,9 @@ async def _recover_from_context_limit( async for msg in client.receive_response(): if isinstance(msg, AssistantMessage): for block in msg.content: - # ExitPlanMode blocks the CLI waiting for approval — - # surface plan + hint, set pending state, and pause so - # the next run() can feed back the user's Yes/No reply. + # ExitPlanMode triggers a ``can_use_tool`` request — + # surface plan + hint, ensure a pending future, and pause + # so the next run() can resolve it with the user's reply. if ( isinstance(block, ToolUseBlock) and block.name == "ExitPlanMode" @@ -814,11 +849,9 @@ async def _recover_from_context_limit( user_id=user_id, channel=channel, type=MessageType.text, data={"id": block.id, "name": block.name, "input": block.input}, ) - self._pending_ask = { - "tool_use_id": block.id, - "session_id": msg.session_id, - "kind": "exit_plan_mode", - } + self._get_or_create_permission_future( + "ExitPlanMode", block.id, block.input or {}, + ) return if isinstance(block, TextBlock): cleaned = block.text.strip() @@ -851,7 +884,14 @@ async def _recover_from_context_limit( ) async def close(self) -> None: - self._pending_ask = None + # Cancel any pending permission future so the ``can_use_tool`` callback + # (awaiting in a background task) unblocks and returns a deny instead + # of hanging on a client that's about to disconnect. + if self._pending_permission is not None: + future: asyncio.Future = self._pending_permission["future"] + if not future.done(): + future.cancel() + self._pending_permission = None if self._client: await self._client.disconnect() self._client = None diff --git a/tests/test_agents.py b/tests/test_agents.py index eff7e71..db0e524 100644 --- a/tests/test_agents.py +++ b/tests/test_agents.py @@ -58,6 +58,9 @@ def test_build_options(sample_project: ProjectInfo): assert opts.cwd == sample_project.path assert opts.continue_conversation is True assert opts.permission_mode == "bypassPermissions" + # The can_use_tool callback is what intercepts AskUserQuestion/ExitPlanMode + # — without it the SDK throws on the permission request. + assert opts.can_use_tool is not None def test_initial_client_is_none(sample_project: ProjectInfo): @@ -195,20 +198,23 @@ async def fake_receive(): assert "Which file?" in msgs[0].text assert "main.py" in msgs[0].text - # Pending state should be saved - assert agent._pending_ask is not None - assert agent._pending_ask["tool_use_id"] == "toolu_123" - assert agent._pending_ask["session_id"] == "sess-ask" + # Pending permission state should be saved (shared with the can_use_tool + # callback via _get_or_create_permission_future). + assert agent._pending_permission is not None + assert agent._pending_permission["tool_use_id"] == "toolu_123" + assert agent._pending_permission["tool_name"] == "AskUserQuestion" + assert agent._pending_permission["future"] is not None + assert not agent._pending_permission["future"].done() assert agent.has_pending_question is True @pytest.mark.anyio async def test_ask_user_question_resume(sample_project: ProjectInfo): - """Second run() with a pending ask should send tool_result, not a new query.""" + """Second run() with a pending permission resolves the future (so the + can_use_tool callback can return) and does NOT send a fresh query.""" from claude_agent_sdk import AssistantMessage, ResultMessage, TextBlock mock_client = AsyncMock() - mock_client._transport = AsyncMock() mock_client.query = AsyncMock() async def fake_receive(): @@ -222,26 +228,21 @@ async def fake_receive(): agent = ClaudeCodeAgent(sample_project) agent._client = mock_client - agent._pending_ask = { - "tool_use_id": "toolu_123", - "session_id": "sess-ask", - } + pending = agent._get_or_create_permission_future( + "AskUserQuestion", "toolu_123", {"questions": [{"question": "Which file?"}]}, + ) + future = pending["future"] msgs = [m async for m in agent.run("main.py")] - # Should NOT have called query() — instead sends tool_result via transport + # Should NOT have queried — the CLI is mid-turn, waiting on the permission + # response. run() only resolves the future so the callback can return. mock_client.query.assert_not_awaited() - mock_client._transport.write.assert_awaited_once() - written = mock_client._transport.write.call_args[0][0] - assert "tool_result" in written - assert "toolu_123" in written - assert "main.py" in written - - # Pending ask should be cleared - assert agent._pending_ask is None - assert agent.has_pending_question is False + assert future.done() + assert future.result() == "main.py" - # Should have yielded the assistant response and result + # Should have yielded the continuation messages that arrived after the + # callback unblocked the CLI. texts = [m.text for m in msgs if m.type.value == "text"] assert any("OK, editing main.py" in t for t in texts) @@ -275,7 +276,7 @@ async def fake_receive(): assert len(msgs) == 1 assert "What is your name?" in msgs[0].text - assert agent._pending_ask["tool_use_id"] == "toolu_456" + assert agent._pending_permission["tool_use_id"] == "toolu_456" @pytest.mark.anyio @@ -342,16 +343,23 @@ async def fake_receive(): @pytest.mark.anyio -async def test_close_clears_pending_ask(sample_project: ProjectInfo): - """close() should clear any pending ask state.""" +async def test_close_cancels_pending_permission(sample_project: ProjectInfo): + """close() cancels a pending permission future so the can_use_tool + callback unblocks (returns deny) instead of hanging on disconnect.""" + import asyncio + mock_client = AsyncMock() agent = ClaudeCodeAgent(sample_project) agent._client = mock_client - agent._pending_ask = {"tool_use_id": "toolu_789", "session_id": "s"} + pending = agent._get_or_create_permission_future( + "AskUserQuestion", "toolu_789", {"questions": [{"question": "q"}]}, + ) + future: asyncio.Future = pending["future"] await agent.close() - assert agent._pending_ask is None + assert agent._pending_permission is None assert agent._client is None + assert future.cancelled() @pytest.mark.anyio @@ -383,7 +391,7 @@ async def fake_receive(): # Tool should be yielded as text (not tool_use) so IM channels can send it texts = [m for m in msgs if m.type.value == "text"] assert any("ls -la" in m.text for m in texts) - assert agent._pending_ask is None + assert agent._pending_permission is None # ── ExitPlanMode plan surfacing ── @@ -431,11 +439,11 @@ async def fake_receive(): assert "Do thing A" in text_msgs[0].text assert "Yes" in text_msgs[0].text and "No" in text_msgs[0].text - # Pending state set so next run() can resolve the approval - assert agent._pending_ask is not None - assert agent._pending_ask["kind"] == "exit_plan_mode" - assert agent._pending_ask["tool_use_id"] == "toolu_plan" - assert agent._pending_ask["session_id"] == "sess-plan" + # Pending permission set so next run() can resolve the approval + assert agent._pending_permission is not None + assert agent._pending_permission["tool_name"] == "ExitPlanMode" + assert agent._pending_permission["tool_use_id"] == "toolu_plan" + assert agent._pending_permission["future"] is not None @pytest.mark.anyio @@ -470,7 +478,7 @@ async def fake_receive(): # Generic placeholder used when the agent didn't supply a plan body assert "plan 模式" in text_msgs[0].text assert "Yes" in text_msgs[0].text - assert agent._pending_ask["kind"] == "exit_plan_mode" + assert agent._pending_permission["tool_name"] == "ExitPlanMode" @pytest.mark.anyio @@ -503,124 +511,157 @@ async def fake_receive(): text_msgs = [m for m in msgs if m.type.value == "text"] assert len(text_msgs) == 1 assert "plan 模式" in text_msgs[0].text - assert agent._pending_ask["kind"] == "exit_plan_mode" + assert agent._pending_permission["tool_name"] == "ExitPlanMode" -# ── ExitPlanMode Yes/No reply parsing ── +# ── _build_exit_plan_permission (Yes/No → PermissionResult) ── -@pytest.mark.anyio -async def test_exit_plan_mode_resume_yes_approves(sample_project: ProjectInfo): - """Reply starting with Yes should send an approval tool_result.""" - from claude_agent_sdk import AssistantMessage, ResultMessage, TextBlock +def test_build_exit_plan_permission_yes_allows(): + from agent_box.agents.claude_code import _build_exit_plan_permission + assert _build_exit_plan_permission("Yes").behavior == "allow" + assert _build_exit_plan_permission("yes, please").behavior == "allow" + assert _build_exit_plan_permission("Y").behavior == "allow" - mock_client = AsyncMock() - mock_client._transport = AsyncMock() - mock_client.query = AsyncMock() - async def fake_receive(): - yield AssistantMessage(content=[TextBlock(text="Starting coding")], model="test") - yield ResultMessage( - subtype="result", is_error=False, duration_ms=100, duration_api_ms=90, - num_turns=1, total_cost_usd=0.01, usage=None, session_id="s-plan", - ) +def test_build_exit_plan_permission_no_with_feedback_denies(): + from agent_box.agents.claude_code import _build_exit_plan_permission + r = _build_exit_plan_permission("No change step 2") + assert r.behavior == "deny" + assert "change step 2" in r.message - mock_client.receive_response = fake_receive - agent = ClaudeCodeAgent(sample_project) - agent._client = mock_client - agent._pending_ask = { - "tool_use_id": "toolu_plan", - "session_id": "s-plan", - "kind": "exit_plan_mode", - } +def test_build_exit_plan_permission_no_without_feedback_denies(): + from agent_box.agents.claude_code import _build_exit_plan_permission + r = _build_exit_plan_permission("No") + assert r.behavior == "deny" - msgs = [m async for m in agent.run("Yes")] - # Should NOT query — sends tool_result via transport instead - mock_client.query.assert_not_awaited() - mock_client._transport.write.assert_awaited_once() - written = mock_client._transport.write.call_args[0][0] - assert "tool_result" in written - assert "toolu_plan" in written - assert "approved" in written - assert agent._pending_ask is None +def test_build_exit_plan_permission_chinese_approve_words(): + from agent_box.agents.claude_code import _build_exit_plan_permission + for w in ("好的", "可以", "同意", "批准", "是", "行"): + assert _build_exit_plan_permission(w).behavior == "allow", w + + +def test_build_exit_plan_permission_chinese_reject_words(): + from agent_box.agents.claude_code import _build_exit_plan_permission + for w in ("否", "不", "不要", "不行", "拒绝"): + r = _build_exit_plan_permission(w + " 改一下") + assert r.behavior == "deny", w + assert "改一下" in r.message, w + + +def test_build_exit_plan_permission_ambiguous_defaults_to_deny(): + from agent_box.agents.claude_code import _build_exit_plan_permission + # Doesn't start with Yes/No — defaults to deny, keeps the text as feedback + r = _build_exit_plan_permission("看起来不错,但是 step 3 能省就省") + assert r.behavior == "deny" + assert "step 3" in r.message + + +# ── _can_use_tool callback ── @pytest.mark.anyio -async def test_exit_plan_mode_resume_no_with_feedback(sample_project: ProjectInfo): - """Reply starting with No should send rejection + feedback as tool_result.""" - from claude_agent_sdk import ResultMessage +async def test_can_use_tool_normal_tool_is_bypassed(sample_project: ProjectInfo): + """Tools that don't require user interaction are allowed unconditionally.""" + from claude_agent_sdk import ToolPermissionContext - mock_client = AsyncMock() - mock_client._transport = AsyncMock() - mock_client.query = AsyncMock() + agent = ClaudeCodeAgent(sample_project) + ctx = ToolPermissionContext(tool_use_id="t_bash") + result = await agent._can_use_tool("Bash", {"command": "rm -rf /"}, ctx) + assert result.behavior == "allow" + assert agent._pending_permission is None # no pending state created - async def fake_receive(): - yield ResultMessage( - subtype="result", is_error=False, duration_ms=100, duration_api_ms=90, - num_turns=1, total_cost_usd=0.01, usage=None, session_id="s-plan", - ) - mock_client.receive_response = fake_receive +@pytest.mark.anyio +async def test_can_use_tool_exit_plan_mode_allow(sample_project: ProjectInfo): + """can_use_tool for ExitPlanMode returns the parsed approval decision.""" + from claude_agent_sdk import ToolPermissionContext agent = ClaudeCodeAgent(sample_project) - agent._client = mock_client - agent._pending_ask = { - "tool_use_id": "toolu_plan", - "session_id": "s-plan", - "kind": "exit_plan_mode", - } + # Pre-seed a resolved future so the callback doesn't block on a real user. + pending = agent._get_or_create_permission_future( + "ExitPlanMode", "toolu_plan", {"plan": "..."}, + ) + pending["future"].set_result("Yes") - await agent.run("No step 2 should use Postgres not MySQL").__anext__() + ctx = ToolPermissionContext(tool_use_id="toolu_plan") + result = await agent._can_use_tool("ExitPlanMode", {"plan": "..."}, ctx) + assert result.behavior == "allow" + assert agent._pending_permission is None # cleared after resolving - mock_client.query.assert_not_awaited() - mock_client._transport.write.assert_awaited_once() - written = mock_client._transport.write.call_args[0][0] - assert "tool_result" in written - assert "rejected" in written - assert "Postgres" in written and "MySQL" in written + +@pytest.mark.anyio +async def test_can_use_tool_exit_plan_mode_deny_with_feedback(sample_project: ProjectInfo): + from claude_agent_sdk import ToolPermissionContext + + agent = ClaudeCodeAgent(sample_project) + pending = agent._get_or_create_permission_future( + "ExitPlanMode", "toolu_plan", {"plan": "..."}, + ) + pending["future"].set_result("No use Postgres not MySQL") + + ctx = ToolPermissionContext(tool_use_id="toolu_plan") + result = await agent._can_use_tool("ExitPlanMode", {"plan": "..."}, ctx) + assert result.behavior == "deny" + assert "Postgres" in result.message and "MySQL" in result.message @pytest.mark.anyio -async def test_exit_plan_mode_resume_chinese_yes(sample_project: ProjectInfo): - """Chinese approval words (好的/可以/同意) should also approve.""" - from claude_agent_sdk import ResultMessage +async def test_can_use_tool_ask_user_question_injects_answers(sample_project: ProjectInfo): + """can_use_tool for AskUserQuestion injects parsed answers into updated_input.""" + from claude_agent_sdk import ToolPermissionContext - mock_client = AsyncMock() - mock_client._transport = AsyncMock() - mock_client.query = AsyncMock() + questions = [{"question": "Which file?", "options": [{"label": "main.py"}, {"label": "utils.py"}]}] + inp = {"questions": questions} - async def fake_receive(): - yield ResultMessage( - subtype="result", is_error=False, duration_ms=100, duration_api_ms=90, - num_turns=1, total_cost_usd=0.01, usage=None, session_id="s-plan", - ) + agent = ClaudeCodeAgent(sample_project) + pending = agent._get_or_create_permission_future("AskUserQuestion", "toolu_a", inp) + pending["future"].set_result("main.py please") - mock_client.receive_response = fake_receive + with patch( + "agent_box.agents.claude_code._parse_user_answer", + new=AsyncMock(return_value={"answers": {"Which file?": "main.py"}}), + ): + ctx = ToolPermissionContext(tool_use_id="toolu_a") + result = await agent._can_use_tool("AskUserQuestion", inp, ctx) + + assert result.behavior == "allow" + assert result.updated_input["answers"] == {"Which file?": "main.py"} + # original question structure preserved alongside injected answers + assert result.updated_input["questions"] == questions + + +@pytest.mark.anyio +async def test_can_use_tool_ask_user_question_parse_fallback(sample_project: ProjectInfo): + """If the LLM answer-parser fails, the raw reply is used as each answer.""" + from claude_agent_sdk import ToolPermissionContext + + questions = [{"question": "Which file?"}] + inp = {"questions": questions} agent = ClaudeCodeAgent(sample_project) - agent._client = mock_client - agent._pending_ask = { - "tool_use_id": "toolu_plan", - "session_id": "s-plan", - "kind": "exit_plan_mode", - } + pending = agent._get_or_create_permission_future("AskUserQuestion", "toolu_a", inp) + pending["future"].set_result("just main.py") + + def _boom(*a, **k): + raise RuntimeError("LLM down") - await agent.run("好的").__anext__() + with patch("agent_box.agents.claude_code._parse_user_answer", side_effect=_boom): + ctx = ToolPermissionContext(tool_use_id="toolu_a") + result = await agent._can_use_tool("AskUserQuestion", inp, ctx) - mock_client._transport.write.assert_awaited_once() - written = mock_client._transport.write.call_args[0][0] - assert "approved" in written + assert result.behavior == "allow" + assert result.updated_input["answers"] == {"Which file?": "just main.py"} @pytest.mark.anyio -async def test_exit_plan_mode_resume_chinese_no_with_feedback(sample_project: ProjectInfo): - """Chinese rejection (不要...) should send feedback.""" +async def test_exit_plan_mode_resume_resolves_future(sample_project: ProjectInfo): + """Resuming an ExitPlanMode pause resolves the future (no fresh query).""" from claude_agent_sdk import ResultMessage mock_client = AsyncMock() - mock_client._transport = AsyncMock() mock_client.query = AsyncMock() async def fake_receive(): @@ -633,62 +674,16 @@ async def fake_receive(): agent = ClaudeCodeAgent(sample_project) agent._client = mock_client - agent._pending_ask = { - "tool_use_id": "toolu_plan", - "session_id": "s-plan", - "kind": "exit_plan_mode", - } - - await agent.run("不要用 Postgres,换 MySQL").__anext__() - - written = mock_client._transport.write.call_args[0][0] - assert "rejected" in written - assert "Postgres" in written and "MySQL" in written - - -# ── _build_plan_approval_result unit tests ── - - -def test_build_plan_approval_yes(): - from agent_box.agents.claude_code import _build_plan_approval_result - assert "approved" in _build_plan_approval_result("Yes") - assert "approved" in _build_plan_approval_result("yes, please") - assert "approved" in _build_plan_approval_result("Y") - - -def test_build_plan_approval_no_with_feedback(): - from agent_box.agents.claude_code import _build_plan_approval_result - r = _build_plan_approval_result("No change step 2") - assert "rejected" in r - assert "change step 2" in r - - -def test_build_plan_approval_no_without_feedback(): - from agent_box.agents.claude_code import _build_plan_approval_result - r = _build_plan_approval_result("No") - assert "rejected" in r - - -def test_build_plan_approval_chinese_approve_words(): - from agent_box.agents.claude_code import _build_plan_approval_result - for w in ("好的", "可以", "同意", "批准", "是", "行"): - assert "approved" in _build_plan_approval_result(w), w - - -def test_build_plan_approval_chinese_reject_words(): - from agent_box.agents.claude_code import _build_plan_approval_result - for w in ("否", "不", "不要", "不行", "拒绝"): - r = _build_plan_approval_result(w + " 改一下") - assert "rejected" in r, w - assert "改一下" in r, w + pending = agent._get_or_create_permission_future( + "ExitPlanMode", "toolu_plan", {"plan": "..."}, + ) + future = pending["future"] + await agent.run("Yes").__anext__() -def test_build_plan_approval_ambiguous_defaults_to_reject(): - from agent_box.agents.claude_code import _build_plan_approval_result - # Doesn't start with Yes/No — defaults to rejection, keeps the text as feedback - r = _build_plan_approval_result("看起来不错,但是 step 3 能省就省") - assert "rejected" in r - assert "step 3" in r + mock_client.query.assert_not_awaited() + assert future.done() + assert future.result() == "Yes" # ── Tool summary formatting ── From 6259e355f4fe8703ec5315d6b578f30ad158640d Mon Sep 17 00:00:00 2001 From: hilr Date: Sat, 20 Jun 2026 04:45:17 +0000 Subject: [PATCH 2/2] docs: document the can_use_tool permission handshake in claude_code.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In-code writeup of the AskUserQuestion/ExitPlanMode reply bug at the callback it fixes: symptom, root cause (bypassPermissions doesn't bypass requiresUserInteraction tools), and the run()↔callback future handshake. Co-Authored-By: Claude Opus 4.7 --- src/agent_box/agents/claude_code.py | 50 +++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/src/agent_box/agents/claude_code.py b/src/agent_box/agents/claude_code.py index d615af5..03202d4 100644 --- a/src/agent_box/agents/claude_code.py +++ b/src/agent_box/agents/claude_code.py @@ -515,6 +515,56 @@ def _format_question(block: ToolUseBlock) -> str: # ------------------------------------------------------------------ # Permission callback (can_use_tool) # ------------------------------------------------------------------ + # + # Why AskUserQuestion / ExitPlanMode need a callback (problem writeup) + # ─────────────────────────────────────────────────────────────────── + # + # SYMPTOM + # The user's reply to an AskUserQuestion (and the Yes/No on an + # ExitPlanMode plan) was silently ignored: the question reached the IM + # channel, the user answered, yet the LLM proceeded with the default + # option exactly as if the user had cancelled. + # + # ROOT CAUSE + # The agent runs in ``bypassPermissions``, but that mode does NOT + # bypass tools flagged ``requiresUserInteraction()``. In the CLI's + # permission check the requiresUserInteraction short-circuit (the "1e" + # step in utils/permissions/permissions.ts) runs *before* the bypass + # check, so AskUserQuestion and ExitPlanMode still emit a + # ``can_use_tool`` request. With no callback registered, the SDK + # raised "canUseTool callback is not provided" + # (claude_agent_sdk/_internal/query.py), the CLI read that error as a + # user cancellation, and handed the LLM a cancelled tool_result — + # hence "user cancelled, use defaults". + # + # FIX — a two-phase handshake bridging the CLI permission request to the + # IM channel, via an asyncio.Future keyed by tool_use_id: + # + # 1. The CLI streams the AssistantMessage carrying the tool_use block, + # then sends ``can_use_tool``. Either ``run()`` (seeing the block) + # or the callback (receiving the request) fires first; + # ``_get_or_create_permission_future`` guarantees exactly one shared + # future regardless of who wins the race. + # 2. ``run()`` surfaces the question/plan to the IM channel and + # returns, so the channel can deliver it and the user can reply. + # Meanwhile the callback awaits the future, holding the CLI's + # permission request open (the CLI blocks until we answer). + # 3. The user's reply arrives as the next ``run()`` call. ``run()`` + # resolves the future with the reply text *instead of* issuing a + # fresh query (the CLI is mid-turn). The callback wakes and returns + # the decision: + # - AskUserQuestion → allow, with the parsed answers injected into + # ``updated_input.answers``. The CLI's tool execution formats + # those into the tool_result the LLM sees — this is the + # supported injection channel (the interactive UI uses it too). + # - ExitPlanMode → allow on approval (the CLI runs call() and + # exits plan mode), or deny with feedback whose message becomes + # the is_error tool_result. + # + # The previous approach wrote a raw tool_result to stdin directly, + # bypassing the control protocol — flaky and ignored once the CLI had + # already moved past the (errored) permission request. Going through + # the callback is what makes the reply actually take effect. def _get_or_create_permission_future( self,