diff --git a/openhands/agenthub/opencode_agent/__init__.py b/openhands/agenthub/opencode_agent/__init__.py index 991d822fc974..02b21f434daa 100644 --- a/openhands/agenthub/opencode_agent/__init__.py +++ b/openhands/agenthub/opencode_agent/__init__.py @@ -1,4 +1,12 @@ from openhands.agenthub.opencode_agent.opencode_agent import OpenCodeAgent +from openhands.agenthub.opencode_agent.subagents.explore import ( + OpenCodeExploreSubAgent, +) +from openhands.agenthub.opencode_agent.subagents.general import ( + OpenCodeGeneralSubAgent, +) from openhands.controller.agent import Agent Agent.register('OpenCodeAgent', OpenCodeAgent) +Agent.register('OpenCodeGeneralSubAgent', OpenCodeGeneralSubAgent) +Agent.register('OpenCodeExploreSubAgent', OpenCodeExploreSubAgent) diff --git a/openhands/agenthub/opencode_agent/function_calling.py b/openhands/agenthub/opencode_agent/function_calling.py index e62fe887e165..6f5a1b8428d8 100644 --- a/openhands/agenthub/opencode_agent/function_calling.py +++ b/openhands/agenthub/opencode_agent/function_calling.py @@ -18,6 +18,7 @@ from openhands.agenthub.opencode_agent.tools.list_dir import ListDirTool from openhands.agenthub.opencode_agent.tools.question import QUESTION_TOOL_NAME from openhands.agenthub.opencode_agent.tools.read import ReadTool +from openhands.agenthub.opencode_agent.tools.task import SUBAGENT_NAME_MAP, TASK_TOOL_NAME from openhands.agenthub.opencode_agent.tools.think import ThinkTool from openhands.agenthub.opencode_agent.tools.todo import ( TODO_READ_TOOL_NAME, @@ -32,6 +33,7 @@ from openhands.core.logger import openhands_logger as logger from openhands.events.action import ( Action, + AgentDelegateAction, AgentFinishAction, AgentThinkAction, ApplyPatchAction, @@ -246,6 +248,32 @@ def response_to_actions( todos=arguments["todos"], ) + # ================================================ + # Task (Subagent Delegation) + # ================================================ + elif tool_call.function.name == TASK_TOOL_NAME: + if 'prompt' not in arguments: + raise FunctionCallValidationError( + f'Missing required argument "prompt" in tool call {tool_call.function.name}' + ) + if 'subagent_type' not in arguments: + raise FunctionCallValidationError( + f'Missing required argument "subagent_type" in tool call {tool_call.function.name}' + ) + agent_name = SUBAGENT_NAME_MAP.get(arguments['subagent_type']) + if not agent_name: + raise FunctionCallValidationError( + f'Unknown subagent_type: {arguments["subagent_type"]}. ' + f'Valid types: {", ".join(SUBAGENT_NAME_MAP.keys())}' + ) + action = AgentDelegateAction( + agent=agent_name, + inputs={ + 'task': arguments['prompt'], + 'description': arguments.get('description', ''), + }, + ) + # ================================================ # Think # ================================================ diff --git a/openhands/agenthub/opencode_agent/opencode_agent.py b/openhands/agenthub/opencode_agent/opencode_agent.py index cbe5299d41a0..7a0b1a99ccc1 100644 --- a/openhands/agenthub/opencode_agent/opencode_agent.py +++ b/openhands/agenthub/opencode_agent/opencode_agent.py @@ -21,6 +21,7 @@ from openhands.agenthub.opencode_agent.tools.list_dir import ListDirTool from openhands.agenthub.opencode_agent.tools.question import QuestionTool from openhands.agenthub.opencode_agent.tools.read import ReadTool +from openhands.agenthub.opencode_agent.tools.task import TaskTool from openhands.agenthub.opencode_agent.tools.think import ThinkTool from openhands.agenthub.opencode_agent.tools.todo import TodoReadTool, TodoWriteTool from openhands.agenthub.opencode_agent.tools.write import WriteTool @@ -29,8 +30,9 @@ from openhands.core.config import AgentConfig from openhands.core.logger import openhands_logger as logger from openhands.core.message import Message -from openhands.events.action import AgentFinishAction, MessageAction +from openhands.events.action import AgentDelegateAction, AgentFinishAction, MessageAction from openhands.events.event import Event +from openhands.events.observation.delegate import AgentDelegateObservation from openhands.llm.llm_utils import check_tools from openhands.memory.condenser import Condenser from openhands.memory.condenser.condenser import Condensation, View @@ -135,6 +137,9 @@ def _get_tools(self) -> list["ChatCompletionToolParam"]: tools.append(TodoReadTool) tools.append(TodoWriteTool) + # Subagent delegation + tools.append(TaskTool) + # Structured editing # tools.append(ApplyPatchTool) @@ -161,6 +166,11 @@ def step(self, state: State) -> "Action": This includes gathering info on previous steps and prompting the model to make a command to execute. + When the LLM returns multiple task (subagent) tool calls and NO other tool + calls, they are executed in parallel via ``ParallelSubagentRunner``. The + results are injected as synthetic events into state.history and the LLM is + re-called so the agent sees all subagent outputs at once. + Parameters: - state (State): used to get updated info @@ -204,10 +214,97 @@ def step(self, state: State) -> "Action": logger.debug(f"Response from LLM: {response}") actions = self.response_to_actions(response) logger.debug(f"Actions after response_to_actions: {actions}") + + # --- Parallel subagent execution --- + # When ALL actions in the batch are AgentDelegateAction (task tool calls) + # and there are ≥2, run them concurrently instead of sequentially. + delegate_actions = [a for a in actions if isinstance(a, AgentDelegateAction)] + if ( + len(delegate_actions) >= 2 + and len(delegate_actions) == len(actions) + and os.environ.get('OPENHANDS_RUNTIME_URL') + ): + return self._handle_parallel_subagents( + delegate_actions, state, response, messages, params + ) + for action in actions: self.pending_actions.append(action) return self.pending_actions.popleft() + def _handle_parallel_subagents( + self, + delegate_actions: list[AgentDelegateAction], + state: State, + original_response: "ModelResponse", + messages: list[Message], + params: dict, + ) -> "Action": + """Execute multiple subagent task calls in parallel, inject results, re-call LLM. + + This method mirrors how opencode handles concurrent task tool calls: + all tasks run simultaneously, their results are returned as tool results, + and the LLM generates its next response based on the combined outputs. + """ + from openhands.agenthub.opencode_agent.subagents.parallel_runner import ( + ParallelSubagentRunner, + ) + + logger.info( + f'Parallel subagent execution: {len(delegate_actions)} task calls detected' + ) + + runner = ParallelSubagentRunner( + llm_registry=self.llm_registry, + parent_agent_config=self.config, + agent_configs={}, + ) + + results = runner.run_parallel(delegate_actions) + + # Inject synthetic events into state.history so the conversation memory + # picks them up correctly on subsequent calls to _get_messages(). + base_id = 10_000_000 + len(state.history) * 100 + for i, delegate_action in enumerate(delegate_actions): + tc_id = delegate_action.tool_call_metadata.tool_call_id + result = results.get(tc_id, {'content': 'No result', 'outputs': {}}) + + synth_action = AgentDelegateAction( + agent=delegate_action.agent, + inputs=delegate_action.inputs, + ) + synth_action._id = base_id + i * 2 + synth_action.tool_call_metadata = delegate_action.tool_call_metadata + + synth_obs = AgentDelegateObservation( + outputs=result.get('outputs', {}), + content=result.get('content', ''), + ) + synth_obs._id = base_id + i * 2 + 1 + synth_obs._cause = synth_action._id + synth_obs.tool_call_metadata = delegate_action.tool_call_metadata + + state.history.append(synth_action) + state.history.append(synth_obs) + + # Re-build messages including the synthetic events and re-call LLM + condensed_history_new: list[Event] = [] + match self.condenser.condensed_history(state): + case View(events=events): + condensed_history_new = events + case Condensation(action=condensation_action): + return condensation_action + + initial_user_message = self._get_initial_user_message(state.history) + new_messages = self._get_messages(condensed_history_new, initial_user_message) + params["messages"] = new_messages + new_response = self.llm.completion(**params) + new_actions = self.response_to_actions(new_response) + + for action in new_actions: + self.pending_actions.append(action) + return self.pending_actions.popleft() + def _get_initial_user_message(self, history: list[Event]) -> MessageAction: """Finds the initial user message action from the full history.""" initial_user_message: MessageAction | None = None diff --git a/openhands/agenthub/opencode_agent/subagents/__init__.py b/openhands/agenthub/opencode_agent/subagents/__init__.py new file mode 100644 index 000000000000..cf23d704837a --- /dev/null +++ b/openhands/agenthub/opencode_agent/subagents/__init__.py @@ -0,0 +1,19 @@ +"""OpenCode subagents for task delegation.""" + +from openhands.agenthub.opencode_agent.subagents.explore import ( + OpenCodeExploreSubAgent, +) +from openhands.agenthub.opencode_agent.subagents.general import ( + OpenCodeGeneralSubAgent, +) +from openhands.agenthub.opencode_agent.subagents.mixin import SubagentMixin +from openhands.agenthub.opencode_agent.subagents.parallel_runner import ( + ParallelSubagentRunner, +) + +__all__ = [ + 'SubagentMixin', + 'OpenCodeGeneralSubAgent', + 'OpenCodeExploreSubAgent', + 'ParallelSubagentRunner', +] diff --git a/openhands/agenthub/opencode_agent/subagents/explore.py b/openhands/agenthub/opencode_agent/subagents/explore.py new file mode 100644 index 000000000000..0f0c64fc3e33 --- /dev/null +++ b/openhands/agenthub/opencode_agent/subagents/explore.py @@ -0,0 +1,82 @@ +"""Explore subagent for fast, read-only codebase exploration. + +Mirrors opencode's "explore" subagent: restricted to read-only tools +(grep, glob, list, bash, read). No write, edit, todo, or task tools. +""" + +from __future__ import annotations + +import os +from typing import TYPE_CHECKING + +from openhands.agenthub.opencode_agent.opencode_agent import OpenCodeAgent +from openhands.agenthub.opencode_agent.subagents.mixin import SubagentMixin +from openhands.agenthub.opencode_agent.tools.bash import create_cmd_run_tool +from openhands.agenthub.opencode_agent.tools.finish import FinishTool +from openhands.agenthub.opencode_agent.tools.glob import GlobTool +from openhands.agenthub.opencode_agent.tools.grep import GrepTool +from openhands.agenthub.opencode_agent.tools.list_dir import ListDirTool +from openhands.agenthub.opencode_agent.tools.read import ReadTool +from openhands.agenthub.opencode_agent.tools.think import ThinkTool +from openhands.controller.state.state import State +from openhands.core.logger import openhands_logger as logger +from openhands.events.action import AgentFinishAction +from openhands.utils.prompt import PromptManager + +if TYPE_CHECKING: + from litellm import ChatCompletionToolParam + + from openhands.events.action import Action + + +class OpenCodeExploreSubAgent(SubagentMixin, OpenCodeAgent): + """Read-only exploration subagent. + + Matches opencode's explore subagent permissions: + - Read-only tools: read, glob, grep, list_dir + - Bash (for read-only commands like git log, find, etc.) + - Think and finish + - NO write, edit, todo_read, todo_write, or task + """ + + VERSION = '1.0' + + @property + def prompt_manager(self) -> PromptManager: + if self._prompt_manager is None: + prompt_dir = os.path.join(os.path.dirname(__file__), 'prompts') + self._prompt_manager = PromptManager( + prompt_dir=prompt_dir, + system_prompt_filename='explore_system_prompt.j2', + ) + return self._prompt_manager + + def _get_tools(self) -> list['ChatCompletionToolParam']: + tools: list['ChatCompletionToolParam'] = [] + + tools.append(ReadTool) + tools.append(GlobTool) + tools.append(GrepTool) + tools.append(ListDirTool) + tools.append(ThinkTool) + + if self.config.enable_cmd: + use_short_desc = any( + substr in self.llm.config.model + for substr in ['gpt-4', 'o3', 'o1', 'o4'] + ) + tools.append(create_cmd_run_tool(use_short_description=use_short_desc)) + + tools.append(FinishTool) + return tools + + def step(self, state: State) -> 'Action': + action = super().step(state) + + if isinstance(action, AgentFinishAction): + action = self._enrich_finish_action(action, state) + logger.debug( + f'ExploreSubAgent finishing with {len(action.outputs.get("metadata", {}).get("summary", []))} tool executions' + ) + + return action diff --git a/openhands/agenthub/opencode_agent/subagents/general.py b/openhands/agenthub/opencode_agent/subagents/general.py new file mode 100644 index 000000000000..90382b4f557c --- /dev/null +++ b/openhands/agenthub/opencode_agent/subagents/general.py @@ -0,0 +1,87 @@ +"""General-purpose subagent for complex multi-step tasks. + +Mirrors opencode's "general" subagent: has all tools except todo_read, todo_write, +and task (no recursive subagent spawning). +""" + +from __future__ import annotations + +import os +from typing import TYPE_CHECKING + +from openhands.agenthub.opencode_agent.opencode_agent import OpenCodeAgent +from openhands.agenthub.opencode_agent.subagents.mixin import SubagentMixin +from openhands.agenthub.opencode_agent.tools.bash import create_cmd_run_tool +from openhands.agenthub.opencode_agent.tools.edit import EditTool +from openhands.agenthub.opencode_agent.tools.finish import FinishTool +from openhands.agenthub.opencode_agent.tools.glob import GlobTool +from openhands.agenthub.opencode_agent.tools.grep import GrepTool +from openhands.agenthub.opencode_agent.tools.list_dir import ListDirTool +from openhands.agenthub.opencode_agent.tools.read import ReadTool +from openhands.agenthub.opencode_agent.tools.think import ThinkTool +from openhands.agenthub.opencode_agent.tools.write import WriteTool +from openhands.controller.state.state import State +from openhands.core.logger import openhands_logger as logger +from openhands.events.action import AgentFinishAction +from openhands.utils.prompt import PromptManager + +if TYPE_CHECKING: + from litellm import ChatCompletionToolParam + + from openhands.events.action import Action + + +class OpenCodeGeneralSubAgent(SubagentMixin, OpenCodeAgent): + """General-purpose subagent with full tools except todo and task. + + Matches opencode's general subagent permissions: + - All file operations (read, write, edit) + - All search tools (glob, grep, list_dir) + - Bash execution + - Think and finish + - NO todo_read, todo_write, or task (no recursion) + """ + + VERSION = '1.0' + + @property + def prompt_manager(self) -> PromptManager: + if self._prompt_manager is None: + prompt_dir = os.path.join(os.path.dirname(__file__), 'prompts') + self._prompt_manager = PromptManager( + prompt_dir=prompt_dir, + system_prompt_filename='general_system_prompt.j2', + ) + return self._prompt_manager + + def _get_tools(self) -> list['ChatCompletionToolParam']: + tools: list['ChatCompletionToolParam'] = [] + + tools.append(ReadTool) + tools.append(WriteTool) + tools.append(EditTool) + tools.append(GlobTool) + tools.append(GrepTool) + tools.append(ListDirTool) + tools.append(ThinkTool) + + if self.config.enable_cmd: + use_short_desc = any( + substr in self.llm.config.model + for substr in ['gpt-4', 'o3', 'o1', 'o4'] + ) + tools.append(create_cmd_run_tool(use_short_description=use_short_desc)) + + tools.append(FinishTool) + return tools + + def step(self, state: State) -> 'Action': + action = super().step(state) + + if isinstance(action, AgentFinishAction): + action = self._enrich_finish_action(action, state) + logger.debug( + f'GeneralSubAgent finishing with {len(action.outputs.get("metadata", {}).get("summary", []))} tool executions' + ) + + return action diff --git a/openhands/agenthub/opencode_agent/subagents/mixin.py b/openhands/agenthub/opencode_agent/subagents/mixin.py new file mode 100644 index 000000000000..65bd7a9bf0f1 --- /dev/null +++ b/openhands/agenthub/opencode_agent/subagents/mixin.py @@ -0,0 +1,121 @@ +"""Mixin that enriches AgentFinishAction outputs to match the opencode subagent output format. + +The opencode task tool returns: +{ + title: "", + metadata: { + summary: [{ tool, status, title }], + sessionId: "", + model: "" + }, + output: "\n\n\nsession_id: \n" +} + +This mixin intercepts AgentFinishAction in step() and populates outputs accordingly. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from openhands.core.logger import openhands_logger as logger +from openhands.events.action import Action, AgentFinishAction +from openhands.events.observation.observation import Observation + +if TYPE_CHECKING: + from openhands.controller.state.state import State + + +class SubagentMixin: + """Mixin for OpenCode subagents that formats outputs to match the opencode task tool format.""" + + def _collect_tool_summary(self, state: 'State') -> list[dict]: + """Scan state.history for tool actions and their observations to build an opencode-style summary.""" + summary: list[dict] = [] + for event in state.history: + if not hasattr(event, 'tool_call_metadata') or event.tool_call_metadata is None: + continue + if isinstance(event, Observation): + continue + + meta = event.tool_call_metadata + if meta.function_name is None: + continue + + entry: dict = { + 'id': meta.tool_call_id or '', + 'tool': meta.function_name, + 'state': { + 'status': 'completed', + }, + } + + title = self._derive_tool_title(event, meta.function_name) + if title: + entry['state']['title'] = title + + summary.append(entry) + return summary + + def _derive_tool_title(self, action: Action, tool_name: str) -> str: + """Derive a human-readable title for a tool invocation.""" + if tool_name == 'read' and hasattr(action, 'path'): + return f'Read {action.path}' + if tool_name == 'write' and hasattr(action, 'path'): + return f'Write {action.path}' + if tool_name == 'edit' and hasattr(action, 'path'): + return f'Edit {action.path}' + if tool_name == 'glob' and hasattr(action, 'pattern'): + return f'Glob {action.pattern}' + if tool_name == 'grep' and hasattr(action, 'pattern'): + return f'Grep {action.pattern}' + if tool_name == 'list_dir' and hasattr(action, 'path'): + return f'List {action.path}' + if tool_name in ('execute_bash', 'cmd_run') and hasattr(action, 'command'): + cmd_preview = action.command[:60] + return f'Bash: {cmd_preview}' + if tool_name == 'think' and hasattr(action, 'thought'): + return f'Think: {action.thought[:40]}' + return '' + + def _enrich_finish_action( + self, action: AgentFinishAction, state: 'State' + ) -> AgentFinishAction: + """Enrich an AgentFinishAction with opencode-format metadata. + + The 'content' key is used by ConversationMemory as the primary text + shown to the parent agent's LLM (via AgentDelegateObservation). + The 'output' key matches the opencode task tool return format. + """ + description = state.inputs.get('description', '') + session_id = state.session_id or '' + + model_name = '' + if hasattr(self, 'llm') and hasattr(self.llm, 'config'): + model_name = self.llm.config.model + + summary = self._collect_tool_summary(state) + final_text = action.final_thought or action.thought or '' + + output_text = final_text + '\n\n' + '\n'.join([ + '', + f'session_id: {session_id}', + '', + ]) + + action.outputs = { + 'content': output_text, + 'title': description, + 'metadata': { + 'summary': summary, + 'sessionId': session_id, + 'model': model_name, + }, + 'output': output_text, + } + + logger.debug( + f'Subagent enriched finish action: title={description}, ' + f'tools_executed={len(summary)}, model={model_name}' + ) + return action diff --git a/openhands/agenthub/opencode_agent/subagents/parallel_runner.py b/openhands/agenthub/opencode_agent/subagents/parallel_runner.py new file mode 100644 index 000000000000..fc7ae1ba84f0 --- /dev/null +++ b/openhands/agenthub/opencode_agent/subagents/parallel_runner.py @@ -0,0 +1,222 @@ +"""Parallel subagent execution engine. + +Runs multiple subagent loops concurrently using ThreadPoolExecutor. +Each subagent gets its own thread, agent instance, and local state. +Action execution goes directly to the runtime HTTP endpoint, bypassing +the event stream to avoid recursive on_event calls and event interleaving. +""" + +from __future__ import annotations + +import asyncio +import os +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import TYPE_CHECKING, Any + +import httpx + +from openhands.controller.agent import Agent +from openhands.controller.state.state import State +from openhands.core.logger import openhands_logger as logger +from openhands.events.action import ( + AgentDelegateAction, + AgentFinishAction, + AgentThinkAction, + MessageAction, +) +from openhands.events.action.agent import ValidationFailureAction +from openhands.events.event import EventSource +from openhands.events.observation.error import ErrorObservation +from openhands.events.serialization.event import event_to_dict +from openhands.events.serialization.observation import observation_from_dict +from openhands.llm.llm_registry import LLMRegistry + +if TYPE_CHECKING: + from openhands.core.config import AgentConfig + + +MAX_SUBAGENT_STEPS = 50 + + +class ParallelSubagentRunner: + """Runs multiple subagent loops concurrently. + + Each subagent is spawned in a separate thread. Tool actions are sent + to the runtime HTTP server directly (bypassing the event stream) so + multiple subagents can execute independently without interfering with + the parent controller's event-driven flow. + """ + + def __init__( + self, + llm_registry: LLMRegistry, + parent_agent_config: 'AgentConfig', + agent_configs: dict[str, 'AgentConfig'], + runtime_url: str | None = None, + ): + self.llm_registry = llm_registry + self.parent_agent_config = parent_agent_config + self.agent_configs = agent_configs + self.runtime_url = runtime_url or os.environ.get('OPENHANDS_RUNTIME_URL') + + def run_parallel( + self, delegate_actions: list[AgentDelegateAction] + ) -> dict[str, dict[str, Any]]: + """Run subagent tasks in parallel. + + Args: + delegate_actions: List of AgentDelegateAction, each representing + a subagent task. Must have tool_call_metadata set. + + Returns: + Dict mapping tool_call_id -> result dict with 'content' and 'outputs'. + """ + if not self.runtime_url: + raise RuntimeError( + 'Cannot run parallel subagents: OPENHANDS_RUNTIME_URL not set. ' + 'The runtime must be started before parallel execution.' + ) + + logger.info( + f'Starting parallel execution of {len(delegate_actions)} subagents ' + f'(runtime: {self.runtime_url})' + ) + start_time = time.monotonic() + + results: dict[str, dict[str, Any]] = {} + with ThreadPoolExecutor(max_workers=len(delegate_actions)) as executor: + futures = { + executor.submit(self._run_single, action): action + for action in delegate_actions + } + for future in as_completed(futures): + action = futures[future] + tool_call_id = action.tool_call_metadata.tool_call_id + try: + result = future.result() + results[tool_call_id] = result + except Exception as e: + logger.error( + f'Subagent {action.agent} failed: {e}', exc_info=True + ) + results[tool_call_id] = { + 'content': f'Subagent error: {e}', + 'outputs': {}, + } + + elapsed = time.monotonic() - start_time + logger.info( + f'Parallel subagent execution completed in {elapsed:.1f}s ' + f'({len(results)} results)' + ) + return results + + def _run_single(self, delegate_action: AgentDelegateAction) -> dict[str, Any]: + """Run a single subagent loop to completion in the current thread.""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return self._run_subagent_loop(delegate_action) + finally: + loop.close() + + def _run_subagent_loop( + self, delegate_action: AgentDelegateAction + ) -> dict[str, Any]: + agent_name = delegate_action.agent + inputs = delegate_action.inputs + + agent_cls = Agent.get_cls(agent_name) + agent_config = self.agent_configs.get(agent_name, self.parent_agent_config) + agent = agent_cls(config=agent_config, llm_registry=self.llm_registry) + + state = self._create_subagent_state(agent, inputs) + id_counter = len(state.history) + + step_count = 0 + while step_count < MAX_SUBAGENT_STEPS: + step_count += 1 + try: + action = agent.step(state) + except Exception as e: + logger.error(f'Subagent {agent_name} step failed: {e}', exc_info=True) + return { + 'content': f'Subagent step error after {step_count} steps: {e}', + 'outputs': {}, + } + + if action is None: + continue + + action._id = id_counter + id_counter += 1 + action._source = EventSource.AGENT + + if isinstance(action, AgentFinishAction): + outputs = action.outputs or {} + content = outputs.get('content', action.final_thought or '') + return {'content': content, 'outputs': outputs} + + if isinstance(action, (AgentThinkAction, ValidationFailureAction)): + state.history.append(action) + continue + + if action.runnable: + obs = self._execute_action(action) + obs._cause = action.id + obs._id = id_counter + id_counter += 1 + if hasattr(action, 'tool_call_metadata') and action.tool_call_metadata: + obs.tool_call_metadata = action.tool_call_metadata + state.history.append(action) + state.history.append(obs) + else: + state.history.append(action) + + return { + 'content': f'Subagent {agent_name} reached max steps ({MAX_SUBAGENT_STEPS})', + 'outputs': {}, + } + + def _create_subagent_state(self, agent: Agent, inputs: dict) -> State: + """Create a minimal local State for a subagent.""" + state = State( + session_id=f'parallel-subagent-{id(agent)}', + inputs=inputs, + ) + state.agent_state = 'running' + + system_msg = agent.get_system_message() + if system_msg: + system_msg._id = 0 + state.history.append(system_msg) + + task_text = inputs.get('task', '') + task_msg = MessageAction(content='TASK: ' + task_text) + task_msg._source = EventSource.USER + task_msg._id = 1 if system_msg else 0 + state.history.append(task_msg) + + return state + + def _execute_action(self, action: 'Action') -> 'Observation': + """Execute an action via the runtime HTTP endpoint.""" + action_dict = event_to_dict(action) + timeout = getattr(action, 'timeout', None) or 120 + + try: + with httpx.Client(timeout=timeout + 10) as client: + response = client.post( + f'{self.runtime_url}/execute_action', + json={'action': action_dict}, + timeout=timeout + 5, + ) + response.raise_for_status() + return observation_from_dict(response.json()) + except httpx.TimeoutException: + return ErrorObservation( + content=f'Action timed out after {timeout}s' + ) + except Exception as e: + return ErrorObservation(content=f'Action execution failed: {e}') diff --git a/openhands/agenthub/opencode_agent/subagents/prompts/additional_info.j2 b/openhands/agenthub/opencode_agent/subagents/prompts/additional_info.j2 new file mode 100644 index 000000000000..a64103eb3a0a --- /dev/null +++ b/openhands/agenthub/opencode_agent/subagents/prompts/additional_info.j2 @@ -0,0 +1,10 @@ +{% if runtime_info -%} + +{% if runtime_info.working_dir %} +The current working directory is {{ runtime_info.working_dir }} +{% endif %} +{% if runtime_info.date %} +Today's date is {{ runtime_info.date }} (UTC). +{% endif %} + +{% endif %} diff --git a/openhands/agenthub/opencode_agent/subagents/prompts/explore_system_prompt.j2 b/openhands/agenthub/opencode_agent/subagents/prompts/explore_system_prompt.j2 new file mode 100644 index 000000000000..b6a64582aed7 --- /dev/null +++ b/openhands/agenthub/opencode_agent/subagents/prompts/explore_system_prompt.j2 @@ -0,0 +1,52 @@ +You are a fast exploration subagent specialized for codebase discovery and search. + + +You have been delegated an exploration task. Your job is to search, read, and analyze code to answer questions or gather information. You are READ-ONLY -- do not attempt to modify any files. + +Your results will be returned to the parent agent, not directly to the user. Be thorough and precise in your final message. + + + +You have access to read-only tools: + +**Search & Navigation:** +- read: Read file contents with line numbers +- glob: Find files matching patterns (e.g., **/*.py) +- grep: Search file contents with regex +- list_dir: List directory contents + +**Code Execution (read-only):** +- execute_bash: Run read-only shell commands (git log, find, wc, etc.) + +**Other:** +- think: Log reasoning process +- finish: Signal task completion with results + + + +* Use `glob` to find files by pattern before reading them +* Use `grep` to search for specific code patterns across the codebase +* Use `read` to examine file contents in detail +* Use `list_dir` to understand directory structure +* Use `execute_bash` only for read-only commands (git log, git diff, wc, head, etc.) +* Do NOT use bash for file modifications -- you are a read-only agent + + + +1. Start broad: use `glob` and `grep` to find relevant files +2. Narrow down: read specific files or sections that match +3. Cross-reference: follow imports, references, and call chains +4. Synthesize: use `think` to organize findings before reporting + + + +1. Read the exploration prompt carefully. +2. Use search tools systematically to find relevant code. +3. Read key files to understand the code in detail. +4. When finished, call `finish` with a comprehensive report including: + - Direct answers to the questions asked + - Relevant file paths with line numbers + - Key code snippets or structures found + - Relationships between components +5. Be thorough -- the parent agent relies on your findings to make decisions. + diff --git a/openhands/agenthub/opencode_agent/subagents/prompts/general_system_prompt.j2 b/openhands/agenthub/opencode_agent/subagents/prompts/general_system_prompt.j2 new file mode 100644 index 000000000000..3010630d0068 --- /dev/null +++ b/openhands/agenthub/opencode_agent/subagents/prompts/general_system_prompt.j2 @@ -0,0 +1,49 @@ +You are a general-purpose subagent executing an autonomous task on behalf of a parent agent. + + +You have been delegated a specific task. Complete it thoroughly and autonomously using the tools available to you. When finished, use the `finish` tool with a comprehensive summary of what you did and found. + +Your results will be returned to the parent agent, not directly to the user. Be thorough and precise in your final message. + + + +You have access to the following tools: + +**File Operations:** +- read: Read file contents with line numbers +- write: Create or overwrite files +- edit: Make precise string replacements in files +- glob: Find files matching patterns (e.g., **/*.py) +- grep: Search file contents with regex +- list_dir: List directory contents + +**Code Execution:** +- execute_bash: Run shell commands + +**Other:** +- think: Log reasoning process +- finish: Signal task completion with results + + + +* Use the most appropriate tool for each task: + - For reading files: Use `read` instead of `cat` or `less` + - For writing files: Use `write` instead of `echo` or `cat` + - For editing files: Use `edit` instead of `sed` or inline editing + - For finding files: Use `glob` instead of `find` + - For searching content: Use `grep` instead of command-line `grep` + - For directory listing: Use `list_dir` instead of `ls` +* Only use `execute_bash` for tasks that require shell execution (git, npm, docker, etc.) + + + +1. Read and understand the task prompt carefully before starting. +2. Break down complex tasks into steps mentally using the `think` tool. +3. Execute each step using the appropriate tools. +4. When finished, call `finish` with a clear summary including: + - What actions were taken + - What was found or accomplished + - Any relevant file paths, code snippets, or data + - Any issues encountered +5. Be concise but comprehensive in your final message -- the parent agent needs enough context to continue. + diff --git a/openhands/agenthub/opencode_agent/subagents/prompts/microagent_info.j2 b/openhands/agenthub/opencode_agent/subagents/prompts/microagent_info.j2 new file mode 100644 index 000000000000..264828fbe206 --- /dev/null +++ b/openhands/agenthub/opencode_agent/subagents/prompts/microagent_info.j2 @@ -0,0 +1,8 @@ +{% for agent_info in triggered_agents %} + +The following information has been included based on a keyword match for "{{ agent_info.trigger }}". +It may or may not be relevant to the user's request. + +{{ agent_info.content }} + +{% endfor %} diff --git a/openhands/agenthub/opencode_agent/subagents/prompts/user_prompt.j2 b/openhands/agenthub/opencode_agent/subagents/prompts/user_prompt.j2 new file mode 100644 index 000000000000..8b137891791f --- /dev/null +++ b/openhands/agenthub/opencode_agent/subagents/prompts/user_prompt.j2 @@ -0,0 +1 @@ + diff --git a/openhands/agenthub/opencode_agent/tools/task.py b/openhands/agenthub/opencode_agent/tools/task.py new file mode 100644 index 000000000000..229f6fd0f927 --- /dev/null +++ b/openhands/agenthub/opencode_agent/tools/task.py @@ -0,0 +1,73 @@ +from litellm import ChatCompletionToolParam, ChatCompletionToolParamFunctionChunk + +TASK_TOOL_NAME = 'task' + +SUBAGENT_REGISTRY = { + 'general': 'General-purpose agent for researching complex questions and executing multi-step tasks. Use this agent to execute multiple units of work in parallel.', + 'explore': 'Fast agent specialized for exploring codebases. Use this when you need to quickly find files by patterns (eg. "src/components/**/*.tsx"), search code for keywords (eg. "API endpoints"), or answer questions about the codebase (eg. "how do API endpoints work?"). When calling this agent, specify the desired thoroughness level: "quick" for basic searches, "medium" for moderate exploration, or "very thorough" for comprehensive analysis across multiple locations and naming conventions.', +} + +SUBAGENT_NAME_MAP = { + 'general': 'OpenCodeGeneralSubAgent', + 'explore': 'OpenCodeExploreSubAgent', +} + +_TASK_DESCRIPTION_TEMPLATE = """Launch a new agent to handle complex, multistep tasks autonomously. + +Available agent types and the tools they have access to: +{agents} + +When using the Task tool, you must specify a subagent_type parameter to select which agent type to use. + +When to use the Task tool: +- Complex research or multi-step implementation tasks that benefit from autonomous execution +- Codebase exploration that requires searching across many files +- Tasks that can be parallelized by launching multiple agents concurrently + +When NOT to use the Task tool: +- If you want to read a specific file path, use the read tool instead +- If you are searching for a specific class definition like "class Foo", use the glob tool instead +- If you are searching for code within a specific file or set of 2-3 files, use the read tool instead +- Simple single-step tasks that you can handle directly + +Usage notes: +1. Launch multiple agents concurrently whenever possible, to maximize performance; to do that, use a single message with multiple tool uses. +2. When the agent is done, it will return a single message back to you. The result returned by the agent is not visible to the user. To show the user the result, you should send a text message back to the user with a concise summary of the result. +3. Each agent invocation is stateless. Your prompt should contain a highly detailed task description for the agent to perform autonomously and you should specify exactly what information the agent should return back to you in its final message. +4. The agent's outputs should generally be trusted. +5. Clearly tell the agent whether you expect it to write code or just to do research (search, file reads, etc.), since it is not aware of the user's intent.""" + + +def _build_task_description() -> str: + agents_text = '\n'.join( + f'- {name}: {desc}' for name, desc in SUBAGENT_REGISTRY.items() + ) + return _TASK_DESCRIPTION_TEMPLATE.format(agents=agents_text) + + +TaskTool = ChatCompletionToolParam( + type='function', + function=ChatCompletionToolParamFunctionChunk( + name=TASK_TOOL_NAME, + description=_build_task_description(), + parameters={ + 'type': 'object', + 'required': ['description', 'prompt', 'subagent_type'], + 'properties': { + 'description': { + 'type': 'string', + 'description': 'A short (3-5 words) description of the task', + }, + 'prompt': { + 'type': 'string', + 'description': 'The task for the agent to perform. Include all necessary context.', + }, + 'subagent_type': { + 'type': 'string', + 'enum': list(SUBAGENT_REGISTRY.keys()), + 'description': 'The type of specialized agent to use for this task', + }, + }, + }, + ), +) diff --git a/openhands/runtime/base.py b/openhands/runtime/base.py index 58e99077692e..95551d540338 100644 --- a/openhands/runtime/base.py +++ b/openhands/runtime/base.py @@ -325,6 +325,13 @@ def add_env_vars(self, env_vars: dict[str, str]) -> None: def on_event(self, event: Event) -> None: if isinstance(event, Action): + if not os.environ.get('OPENHANDS_RUNTIME_URL'): + try: + os.environ['OPENHANDS_RUNTIME_URL'] = ( + self.action_execution_server_url + ) + except (NotImplementedError, AttributeError): + pass asyncio.get_event_loop().run_until_complete(self._handle_action(event)) async def _export_latest_git_provider_tokens(self, event: Action) -> None: diff --git a/tests/unit/agenthub/test_e2e_parallel_subagents.py b/tests/unit/agenthub/test_e2e_parallel_subagents.py new file mode 100644 index 000000000000..93929920df14 --- /dev/null +++ b/tests/unit/agenthub/test_e2e_parallel_subagents.py @@ -0,0 +1,296 @@ +"""End-to-end test for the full parallel subagent flow. + +Only the LLM and HTTP runtime are mocked. Everything else uses real code: +action conversion, parallel detection, ParallelSubagentRunner, real subagent +classes, SubagentMixin enrichment, state management, and synthetic events. +""" + +import json +import threading +from unittest.mock import Mock, patch + +import pytest +from litellm import ModelResponse + +from openhands.agenthub.opencode_agent.tools.task import TASK_TOOL_NAME +from openhands.controller.state.state import State +from openhands.core.config import AgentConfig, LLMConfig +from openhands.core.config.openhands_config import OpenHandsConfig +from openhands.events.action import AgentDelegateAction, MessageAction +from openhands.events.event import EventSource +from openhands.events.observation.delegate import AgentDelegateObservation +from openhands.llm.llm_registry import LLMRegistry + + +def _make_llm(): + llm = Mock() + llm.config = Mock() + llm.config.model = "test-model" + llm.config.max_message_chars = 10000 + llm.config.custom_llm_provider = None + llm.vision_is_active = Mock(return_value=False) + llm.is_caching_prompt_active = Mock(return_value=False) + return llm + + +def _tc_resp(specs, rid="r"): + calls = [ + { + "function": {"name": n, "arguments": json.dumps(a)}, + "id": cid, + "type": "function", + } + for n, a, cid in specs + ] + return ModelResponse( + id=rid, + choices=[{ + "message": {"tool_calls": calls, "content": None, "role": "assistant"}, + "index": 0, + "finish_reason": "tool_calls", + }], + ) + + +def _txt_resp(text): + return ModelResponse( + id="txt", + choices=[{ + "message": {"content": text, "role": "assistant", "tool_calls": None}, + "index": 0, + "finish_reason": "stop", + }], + ) + + +def _sub_llm(path, msg): + llm = _make_llm() + llm.completion = Mock(side_effect=[ + _tc_resp([("read", {"file_path": path}, "sr")], rid="s1"), + _tc_resp([("finish", {"message": msg}, "sf")], rid="s2"), + ]) + return llm + + +def _http_mock(): + def post(url, **kw): + r = Mock() + r.status_code = 200 + r.raise_for_status = Mock() + p = kw.get("json", {}).get("action", {}).get("args", {}).get("path", "/x") + r.json = Mock(return_value={ + "observation": "read", + "content": f"contents of {p}", + "extras": {"path": p, "impl_source": "default"}, + }) + return r + + c = Mock() + c.post = Mock(side_effect=post) + c.__enter__ = Mock(return_value=c) + c.__exit__ = Mock(return_value=False) + return c + + +def _router(llms): + lock = threading.Lock() + i = [0] + + def get(cfg): + with lock: + n = i[0] + i[0] += 1 + return llms[min(n, len(llms) - 1)] + + return get + + +@pytest.fixture +def registry(): + cfg = OpenHandsConfig() + cfg.set_llm_config(LLMConfig(model="gpt-4o", api_key="test")) + return LLMRegistry(config=cfg) + + +class TestE2EParallelFlow: + + def test_two_subagents_happy_path(self, registry): + from openhands.agenthub.opencode_agent.opencode_agent import OpenCodeAgent + + agent = OpenCodeAgent(config=AgentConfig(), llm_registry=registry) + p = _make_llm() + p.completion = Mock(side_effect=[ + _tc_resp([ + (TASK_TOOL_NAME, {"description": "Explore", "prompt": "Find files", "subagent_type": "explore"}, "tc-e"), + (TASK_TOOL_NAME, {"description": "Analyze", "prompt": "Analyze main", "subagent_type": "general"}, "tc-g"), + ]), + _txt_resp("Both subagents completed."), + ]) + agent.llm = p + + sa = _sub_llm("/src/main.py", "Found 3 functions") + sb = _sub_llm("/src/utils.py", "Found 5 helpers") + orig = registry.get_router + registry.get_router = _router([sa, sb]) + + hc = _http_mock() + state = State(session_id="e2e-1") + um = MessageAction(content="Explore and analyze") + um._source = EventSource.USER + um._id = 0 + state.history = [um] + before = len(state.history) + + with patch.dict("os.environ", {"OPENHANDS_RUNTIME_URL": "http://t:9"}): + with patch("httpx.Client", return_value=hc): + action = agent.step(state) + + assert isinstance(action, MessageAction) + assert "completed" in action.content.lower() + assert p.completion.call_count == 2 + assert sa.completion.call_count == 2 + assert sb.completion.call_count == 2 + assert hc.post.call_count == 2 + + new = state.history[before:] + assert len(new) == 4 + acts = [e for e in new if isinstance(e, AgentDelegateAction)] + obs = [e for e in new if isinstance(e, AgentDelegateObservation)] + assert len(acts) == 2 + assert len(obs) == 2 + + for o in obs: + assert o.tool_call_metadata is not None + assert any(a.id == o.cause for a in acts) + + tc_ids = {o.tool_call_metadata.tool_call_id for o in obs} + assert tc_ids == {"tc-e", "tc-g"} + + registry.get_router = orig + + def test_subagent_error_isolation(self, registry): + from openhands.agenthub.opencode_agent.opencode_agent import OpenCodeAgent + + agent = OpenCodeAgent(config=AgentConfig(), llm_registry=registry) + p = _make_llm() + p.completion = Mock(side_effect=[ + _tc_resp([ + (TASK_TOOL_NAME, {"description": "OK", "prompt": "A", "subagent_type": "explore"}, "tc-ok"), + (TASK_TOOL_NAME, {"description": "Bad", "prompt": "B", "subagent_type": "general"}, "tc-bad"), + ]), + _txt_resp("Partial results."), + ]) + agent.llm = p + + ok = _sub_llm("/ok.py", "Success") + bad = _make_llm() + bad.completion = Mock(side_effect=RuntimeError("crash")) + orig = registry.get_router + registry.get_router = _router([ok, bad]) + + hc = _http_mock() + state = State(session_id="e2e-err") + um = MessageAction(content="Two tasks") + um._source = EventSource.USER + um._id = 0 + state.history = [um] + + with patch.dict("os.environ", {"OPENHANDS_RUNTIME_URL": "http://t:9"}): + with patch("httpx.Client", return_value=hc): + action = agent.step(state) + + assert isinstance(action, MessageAction) + obs = [e for e in state.history if isinstance(e, AgentDelegateObservation)] + assert len(obs) == 2 + contents = " ".join(o.content for o in obs).lower() + assert "error" in contents or "success" in contents + + registry.get_router = orig + + def test_three_subagents(self, registry): + from openhands.agenthub.opencode_agent.opencode_agent import OpenCodeAgent + + agent = OpenCodeAgent(config=AgentConfig(), llm_registry=registry) + p = _make_llm() + p.completion = Mock(side_effect=[ + _tc_resp([ + (TASK_TOOL_NAME, {"description": "T1", "prompt": "P1", "subagent_type": "explore"}, "tc-1"), + (TASK_TOOL_NAME, {"description": "T2", "prompt": "P2", "subagent_type": "explore"}, "tc-2"), + (TASK_TOOL_NAME, {"description": "T3", "prompt": "P3", "subagent_type": "general"}, "tc-3"), + ]), + _txt_resp("All done."), + ]) + agent.llm = p + + subs = [_sub_llm(f"/f{i}.py", f"Result {i}") for i in range(3)] + orig = registry.get_router + registry.get_router = _router(subs) + + hc = _http_mock() + state = State(session_id="e2e-3") + um = MessageAction(content="Three tasks") + um._source = EventSource.USER + um._id = 0 + state.history = [um] + + with patch.dict("os.environ", {"OPENHANDS_RUNTIME_URL": "http://t:9"}): + with patch("httpx.Client", return_value=hc): + action = agent.step(state) + + assert isinstance(action, MessageAction) + assert p.completion.call_count == 2 + assert hc.post.call_count == 3 + + obs = [e for e in state.history if isinstance(e, AgentDelegateObservation)] + assert len(obs) == 3 + tc_ids = {o.tool_call_metadata.tool_call_id for o in obs} + assert tc_ids == {"tc-1", "tc-2", "tc-3"} + + for s in subs: + assert s.completion.call_count == 2 + + registry.get_router = orig + + def test_enrichment_produces_task_metadata(self, registry): + from openhands.agenthub.opencode_agent.opencode_agent import OpenCodeAgent + + agent = OpenCodeAgent(config=AgentConfig(), llm_registry=registry) + p = _make_llm() + p.completion = Mock(side_effect=[ + _tc_resp([ + (TASK_TOOL_NAME, {"description": "Check", "prompt": "List configs", "subagent_type": "explore"}, "tc-m1"), + (TASK_TOOL_NAME, {"description": "Update", "prompt": "Add hints", "subagent_type": "general"}, "tc-m2"), + ]), + _txt_resp("Done."), + ]) + agent.llm = p + + sa = _sub_llm("/a.py", "Found configs") + sb = _sub_llm("/b.py", "Added hints") + orig = registry.get_router + registry.get_router = _router([sa, sb]) + + hc = _http_mock() + state = State(session_id="e2e-meta") + um = MessageAction(content="Check and update") + um._source = EventSource.USER + um._id = 0 + state.history = [um] + + with patch.dict("os.environ", {"OPENHANDS_RUNTIME_URL": "http://t:9"}): + with patch("httpx.Client", return_value=hc): + agent.step(state) + + obs = [e for e in state.history if isinstance(e, AgentDelegateObservation)] + assert len(obs) == 2 + + for o in obs: + assert "" in o.content + assert "session_id:" in o.content + out = o.outputs + assert "output" in out + assert "metadata" in out + assert "title" in out + assert "sessionId" in out["metadata"] + + registry.get_router = orig diff --git a/tests/unit/agenthub/test_opencode_parallel_subagents.py b/tests/unit/agenthub/test_opencode_parallel_subagents.py new file mode 100644 index 000000000000..1395fc331c6a --- /dev/null +++ b/tests/unit/agenthub/test_opencode_parallel_subagents.py @@ -0,0 +1,981 @@ +"""Tests for parallel subagent execution via ParallelSubagentRunner. + +Covers edge cases and integration scenarios beyond what +test_opencode_subagents.py provides: +1. Subagent loop lifecycle (step -> finish, step -> max iterations) +2. Concurrent timing validation +3. Error isolation across threads +4. Runtime URL resolution +5. Integration with OpenCodeAgent.step() parallel detection +""" + +import json +import os +import time +from collections import deque +from unittest.mock import Mock, patch + +import pytest +from litellm import ModelResponse + +from openhands.agenthub.opencode_agent.subagents.parallel_runner import ( + MAX_SUBAGENT_STEPS, + ParallelSubagentRunner, +) +from openhands.agenthub.opencode_agent.tools.task import TASK_TOOL_NAME +from openhands.controller.state.state import State +from openhands.core.config import AgentConfig, LLMConfig +from openhands.core.config.openhands_config import OpenHandsConfig +from openhands.events.action import ( + AgentDelegateAction, + AgentFinishAction, + MessageAction, + OpenCodeReadAction, +) +from openhands.events.event import EventSource +from openhands.events.observation.delegate import AgentDelegateObservation +from openhands.events.tool import ToolCallMetadata +from openhands.llm.llm_registry import LLMRegistry + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _mock_response_multi(tool_calls): + calls = [] + for i, (name, args) in enumerate(tool_calls): + calls.append( + { + "function": {"name": name, "arguments": json.dumps(args)}, + "id": f"tc-{i}", + "type": "function", + } + ) + return ModelResponse( + id="mock", + choices=[ + { + "message": {"tool_calls": calls, "content": None, "role": "assistant"}, + "index": 0, + "finish_reason": "tool_calls", + } + ], + ) + + +def _mock_text_response(text): + return ModelResponse( + id="mock", + choices=[ + { + "message": {"content": text, "role": "assistant", "tool_calls": None}, + "index": 0, + "finish_reason": "stop", + } + ], + ) + + +def _make_delegate_action(agent_name, task, tool_call_id): + action = AgentDelegateAction( + agent=agent_name, + inputs={"task": task, "description": f"Test: {task}"}, + ) + action.tool_call_metadata = ToolCallMetadata( + tool_call_id=tool_call_id, + function_name="task", + model_response=ModelResponse( + id="mock-resp", + choices=[ + { + "message": { + "tool_calls": [ + { + "function": { + "name": "task", + "arguments": json.dumps( + {"prompt": task, "subagent_type": "generalPurpose"} + ), + }, + "id": tool_call_id, + "type": "function", + } + ], + "content": None, + "role": "assistant", + }, + "index": 0, + "finish_reason": "tool_calls", + } + ], + ), + total_calls_in_response=1, + ) + return action + + +@pytest.fixture +def llm_registry(): + cfg = OpenHandsConfig() + cfg.set_llm_config(LLMConfig(model="gpt-4o", api_key="test")) + return LLMRegistry(config=cfg) + + +def _make_agent(llm_registry): + from openhands.agenthub.opencode_agent.opencode_agent import OpenCodeAgent + + agent = OpenCodeAgent(config=AgentConfig(), llm_registry=llm_registry) + mock_llm = Mock() + mock_llm.config = Mock() + mock_llm.config.model = "gpt-4o" + mock_llm.config.max_message_chars = 10000 + mock_llm.vision_is_active = Mock(return_value=False) + mock_llm.is_caching_prompt_active = Mock(return_value=False) + agent.llm = mock_llm + return agent + + +def _make_state(): + state = Mock(spec=State) + user_msg = Mock() + user_msg.content = "do something" + state.get_last_user_message = Mock(return_value=user_msg) + msg = MessageAction(content="do something") + msg._source = EventSource.USER + state.history = [msg] + state.inputs = {} + state.extra_data = {} + state.to_llm_metadata = Mock(return_value={}) + return state + + +def _wire_agent(agent, state): + from openhands.memory.condenser.condenser import View + + agent.condenser = Mock() + agent.condenser.condensed_history = Mock(return_value=View(events=state.history)) + agent.conversation_memory = Mock() + agent.conversation_memory.process_events = Mock(return_value=[]) + agent.conversation_memory.apply_prompt_caching = Mock() + + +# =========================================================================== +# 1. ParallelSubagentRunner runtime URL resolution +# =========================================================================== + + +class TestRuntimeURLResolution: + def test_uses_explicit_url(self): + runner = ParallelSubagentRunner( + llm_registry=Mock(), + parent_agent_config=Mock(), + agent_configs={}, + runtime_url="http://explicit:8000", + ) + assert runner.runtime_url == "http://explicit:8000" + + def test_falls_back_to_env_var(self): + with patch.dict("os.environ", {"OPENHANDS_RUNTIME_URL": "http://env:9000"}): + runner = ParallelSubagentRunner( + llm_registry=Mock(), + parent_agent_config=Mock(), + agent_configs={}, + runtime_url=None, + ) + assert runner.runtime_url == "http://env:9000" + + def test_none_when_no_url_available(self): + with patch.dict("os.environ", {}, clear=True): + runner = ParallelSubagentRunner( + llm_registry=Mock(), + parent_agent_config=Mock(), + agent_configs={}, + runtime_url=None, + ) + assert runner.runtime_url is None + + +# =========================================================================== +# 2. Subagent loop lifecycle +# =========================================================================== + + +class TestSubagentLoop: + + @patch.object(ParallelSubagentRunner, "_execute_action") + def test_loop_returns_on_finish_action(self, mock_exec): + runner = ParallelSubagentRunner( + llm_registry=Mock(), + parent_agent_config=Mock(), + agent_configs={}, + runtime_url="http://test:8000", + ) + + finish = AgentFinishAction( + final_thought="Done", outputs={"content": "All good"} + ) + mock_agent = Mock() + mock_agent.step.return_value = finish + mock_agent.get_system_message.return_value = None + + action = _make_delegate_action("TestAgent", "Do stuff", "c1") + + with patch( + "openhands.agenthub.opencode_agent.subagents.parallel_runner.Agent.get_cls" + ) as mock_cls: + mock_cls.return_value = lambda config, llm_registry: mock_agent + result = runner._run_subagent_loop(action) + + assert result["content"] == "All good" + mock_exec.assert_not_called() + + @patch.object(ParallelSubagentRunner, "_execute_action") + def test_loop_hits_max_steps(self, mock_exec): + from openhands.events.observation.empty import NullObservation + + mock_exec.return_value = NullObservation(content="ok") + + runner = ParallelSubagentRunner( + llm_registry=Mock(), + parent_agent_config=Mock(), + agent_configs={}, + runtime_url="http://test:8000", + ) + + read_action = OpenCodeReadAction(path="/test.py") + read_action.tool_call_metadata = ToolCallMetadata( + tool_call_id="inner-tc", + function_name="read", + model_response=_mock_response_multi([("read", {"file_path": "/test.py"})]), + total_calls_in_response=1, + ) + + mock_agent = Mock() + mock_agent.step.return_value = read_action + mock_agent.get_system_message.return_value = None + + action = _make_delegate_action("TestAgent", "Loop forever", "c2") + + with patch( + "openhands.agenthub.opencode_agent.subagents.parallel_runner.Agent.get_cls" + ) as mock_cls: + mock_cls.return_value = lambda config, llm_registry: mock_agent + result = runner._run_subagent_loop(action) + + assert "max steps" in result["content"].lower() + assert mock_agent.step.call_count == MAX_SUBAGENT_STEPS + + def test_loop_handles_step_exception(self): + runner = ParallelSubagentRunner( + llm_registry=Mock(), + parent_agent_config=Mock(), + agent_configs={}, + runtime_url="http://test:8000", + ) + + mock_agent = Mock() + mock_agent.step.side_effect = RuntimeError("LLM API down") + mock_agent.get_system_message.return_value = None + + action = _make_delegate_action("TestAgent", "Fail", "c3") + + with patch( + "openhands.agenthub.opencode_agent.subagents.parallel_runner.Agent.get_cls" + ) as mock_cls: + mock_cls.return_value = lambda config, llm_registry: mock_agent + result = runner._run_subagent_loop(action) + + assert "error" in result["content"].lower() + + def test_loop_skips_none_actions(self): + runner = ParallelSubagentRunner( + llm_registry=Mock(), + parent_agent_config=Mock(), + agent_configs={}, + runtime_url="http://test:8000", + ) + + call_count = [0] + + def step_fn(state): + call_count[0] += 1 + if call_count[0] <= 2: + return None + return AgentFinishAction( + final_thought="OK", outputs={"content": "Done after nones"} + ) + + mock_agent = Mock() + mock_agent.step.side_effect = step_fn + mock_agent.get_system_message.return_value = None + + action = _make_delegate_action("TestAgent", "Skip nones", "c4") + + with patch( + "openhands.agenthub.opencode_agent.subagents.parallel_runner.Agent.get_cls" + ) as mock_cls: + mock_cls.return_value = lambda config, llm_registry: mock_agent + result = runner._run_subagent_loop(action) + + assert result["content"] == "Done after nones" + assert call_count[0] == 3 + + +# =========================================================================== +# 3. Concurrent timing validation +# =========================================================================== + + +class TestConcurrentTiming: + + @patch.object(ParallelSubagentRunner, "_run_subagent_loop") + def test_parallel_faster_than_sequential(self, mock_loop): + def slow_result(action): + time.sleep(0.1) + return {"content": f"Done {action.agent}", "outputs": {}} + + mock_loop.side_effect = slow_result + + runner = ParallelSubagentRunner( + llm_registry=Mock(), + parent_agent_config=Mock(), + agent_configs={}, + runtime_url="http://test:8000", + ) + + actions = [_make_delegate_action("A", f"T{i}", f"c{i}") for i in range(3)] + + t0 = time.monotonic() + results = runner.run_parallel(actions) + elapsed = time.monotonic() - t0 + + assert len(results) == 3 + assert elapsed < 0.5 # 3 x 0.1s sequential = 0.3s; parallel should be ~0.1s + + @patch.object(ParallelSubagentRunner, "_run_subagent_loop") + def test_single_action_still_works(self, mock_loop): + mock_loop.return_value = {"content": "Solo result", "outputs": {}} + + runner = ParallelSubagentRunner( + llm_registry=Mock(), + parent_agent_config=Mock(), + agent_configs={}, + runtime_url="http://test:8000", + ) + + actions = [_make_delegate_action("A", "Solo", "c0")] + results = runner.run_parallel(actions) + + assert len(results) == 1 + assert results["c0"]["content"] == "Solo result" + + +# =========================================================================== +# 4. Error isolation across threads +# =========================================================================== + + +class TestErrorIsolation: + + @patch.object(ParallelSubagentRunner, "_run_subagent_loop") + def test_one_failure_doesnt_affect_others(self, mock_loop): + def result_or_fail(action): + tc_id = action.tool_call_metadata.tool_call_id + if tc_id == "c1": + raise RuntimeError("Agent crashed") + return {"content": f"OK from {tc_id}", "outputs": {}} + + mock_loop.side_effect = result_or_fail + + runner = ParallelSubagentRunner( + llm_registry=Mock(), + parent_agent_config=Mock(), + agent_configs={}, + runtime_url="http://test:8000", + ) + + actions = [_make_delegate_action("A", f"T{i}", f"c{i}") for i in range(3)] + results = runner.run_parallel(actions) + + assert len(results) == 3 + assert "OK from c0" in results["c0"]["content"] + assert "error" in results["c1"]["content"].lower() + assert "OK from c2" in results["c2"]["content"] + + @patch.object(ParallelSubagentRunner, "_run_subagent_loop") + def test_all_failures_handled(self, mock_loop): + mock_loop.side_effect = RuntimeError("total failure") + + runner = ParallelSubagentRunner( + llm_registry=Mock(), + parent_agent_config=Mock(), + agent_configs={}, + runtime_url="http://test:8000", + ) + + actions = [_make_delegate_action("A", f"T{i}", f"c{i}") for i in range(2)] + results = runner.run_parallel(actions) + + assert len(results) == 2 + for v in results.values(): + assert "error" in v["content"].lower() + + +# =========================================================================== +# 5. Integration: step() parallel detection +# =========================================================================== + + +class TestStepParallelIntegration: + """Tests that OpenCodeAgent.step() correctly routes to parallel execution.""" + + def test_two_task_calls_trigger_parallel(self, llm_registry): + agent = _make_agent(llm_registry) + state = _make_state() + _wire_agent(agent, state) + + task_resp = _mock_response_multi( + [ + (TASK_TOOL_NAME, {"description": "A", "prompt": "p1", "subagent_type": "explore"}), + (TASK_TOOL_NAME, {"description": "B", "prompt": "p2", "subagent_type": "general"}), + ] + ) + final_resp = _mock_text_response("Combined results.") + agent.llm.completion = Mock(side_effect=[task_resp, final_resp]) + + with patch.dict("os.environ", {"OPENHANDS_RUNTIME_URL": "http://test:8000"}): + with patch( + "openhands.agenthub.opencode_agent.subagents.parallel_runner." + "ParallelSubagentRunner.run_parallel" + ) as mock_run: + mock_run.return_value = { + "tc-0": {"content": "R0", "outputs": {"content": "R0"}}, + "tc-1": {"content": "R1", "outputs": {"content": "R1"}}, + } + action = agent.step(state) + + mock_run.assert_called_once() + assert agent.llm.completion.call_count == 2 + + def test_mixed_calls_skip_parallel(self, llm_registry): + agent = _make_agent(llm_registry) + state = _make_state() + _wire_agent(agent, state) + + agent.llm.completion = Mock( + return_value=_mock_response_multi( + [ + ("read", {"file_path": "/x.py"}), + (TASK_TOOL_NAME, {"description": "E", "prompt": "p", "subagent_type": "explore"}), + ] + ) + ) + + with patch.dict("os.environ", {"OPENHANDS_RUNTIME_URL": "http://test:8000"}): + with patch( + "openhands.agenthub.opencode_agent.subagents.parallel_runner." + "ParallelSubagentRunner.run_parallel" + ) as mock_run: + action = agent.step(state) + mock_run.assert_not_called() + assert isinstance(action, OpenCodeReadAction) + + def test_single_task_skips_parallel(self, llm_registry): + agent = _make_agent(llm_registry) + state = _make_state() + _wire_agent(agent, state) + + agent.llm.completion = Mock( + return_value=_mock_response_multi( + [(TASK_TOOL_NAME, {"description": "T", "prompt": "p", "subagent_type": "explore"})] + ) + ) + + with patch.dict("os.environ", {"OPENHANDS_RUNTIME_URL": "http://test:8000"}): + with patch( + "openhands.agenthub.opencode_agent.subagents.parallel_runner." + "ParallelSubagentRunner.run_parallel" + ) as mock_run: + action = agent.step(state) + mock_run.assert_not_called() + assert isinstance(action, AgentDelegateAction) + + def test_no_runtime_url_skips_parallel(self, llm_registry): + agent = _make_agent(llm_registry) + state = _make_state() + _wire_agent(agent, state) + + agent.llm.completion = Mock( + return_value=_mock_response_multi( + [ + (TASK_TOOL_NAME, {"description": "A", "prompt": "p", "subagent_type": "explore"}), + (TASK_TOOL_NAME, {"description": "B", "prompt": "p", "subagent_type": "general"}), + ] + ) + ) + + with patch.dict("os.environ", {}, clear=True): + with patch( + "openhands.agenthub.opencode_agent.subagents.parallel_runner." + "ParallelSubagentRunner.run_parallel" + ) as mock_run: + action = agent.step(state) + mock_run.assert_not_called() + assert isinstance(action, AgentDelegateAction) + + def test_no_task_calls_untouched(self, llm_registry): + agent = _make_agent(llm_registry) + state = _make_state() + _wire_agent(agent, state) + + agent.llm.completion = Mock( + return_value=_mock_response_multi([("read", {"file_path": "/f"})]) + ) + + action = agent.step(state) + assert isinstance(action, OpenCodeReadAction) + + +# =========================================================================== +# 6. Synthetic event injection +# =========================================================================== + + +class TestSyntheticEventInjection: + """Verify that _handle_parallel_subagents correctly injects + AgentDelegateAction + AgentDelegateObservation into state.history.""" + + def test_synthetic_events_structure(self, llm_registry): + agent = _make_agent(llm_registry) + state = State(session_id="test-synth") + initial_msg = MessageAction(content="parallel tasks") + initial_msg._source = EventSource.USER + state.history = [initial_msg] + state.get_last_user_message = Mock(return_value=initial_msg) + + from openhands.memory.condenser.condenser import View + + agent.condenser = Mock() + agent.condenser.condensed_history = Mock( + return_value=View(events=state.history) + ) + agent.conversation_memory = Mock() + agent.conversation_memory.process_events = Mock(return_value=[]) + agent.conversation_memory.apply_prompt_caching = Mock() + + task_resp = _mock_response_multi( + [ + (TASK_TOOL_NAME, {"description": "X", "prompt": "px", "subagent_type": "explore"}), + (TASK_TOOL_NAME, {"description": "Y", "prompt": "py", "subagent_type": "general"}), + ] + ) + finish_resp = _mock_text_response("Done") + agent.llm.completion = Mock(side_effect=[task_resp, finish_resp]) + + before_len = len(state.history) + + with patch.dict("os.environ", {"OPENHANDS_RUNTIME_URL": "http://test:8000"}): + with patch( + "openhands.agenthub.opencode_agent.subagents.parallel_runner." + "ParallelSubagentRunner.run_parallel" + ) as mock_run: + mock_run.return_value = { + "tc-0": {"content": "RX", "outputs": {"content": "RX"}}, + "tc-1": {"content": "RY", "outputs": {"content": "RY"}}, + } + agent.step(state) + + new_events = state.history[before_len:] + assert len(new_events) == 4 + + synth_acts = [e for e in new_events if isinstance(e, AgentDelegateAction)] + synth_obs = [e for e in new_events if isinstance(e, AgentDelegateObservation)] + + assert len(synth_acts) == 2 + assert len(synth_obs) == 2 + + for obs in synth_obs: + assert obs.tool_call_metadata is not None + matching = [a for a in synth_acts if a.id == obs.cause] + assert len(matching) == 1 + + def test_observation_content_matches_result(self, llm_registry): + agent = _make_agent(llm_registry) + state = State(session_id="test-content") + initial_msg = MessageAction(content="tasks") + initial_msg._source = EventSource.USER + state.history = [initial_msg] + state.get_last_user_message = Mock(return_value=initial_msg) + + from openhands.memory.condenser.condenser import View + + agent.condenser = Mock() + agent.condenser.condensed_history = Mock( + return_value=View(events=state.history) + ) + agent.conversation_memory = Mock() + agent.conversation_memory.process_events = Mock(return_value=[]) + agent.conversation_memory.apply_prompt_caching = Mock() + + task_resp = _mock_response_multi( + [ + (TASK_TOOL_NAME, {"description": "A", "prompt": "a", "subagent_type": "explore"}), + (TASK_TOOL_NAME, {"description": "B", "prompt": "b", "subagent_type": "general"}), + ] + ) + finish_resp = _mock_text_response("All done") + agent.llm.completion = Mock(side_effect=[task_resp, finish_resp]) + + with patch.dict("os.environ", {"OPENHANDS_RUNTIME_URL": "http://test:8000"}): + with patch( + "openhands.agenthub.opencode_agent.subagents.parallel_runner." + "ParallelSubagentRunner.run_parallel" + ) as mock_run: + mock_run.return_value = { + "tc-0": {"content": "Found 5 files", "outputs": {"content": "Found 5 files"}}, + "tc-1": {"content": "Refactored OK", "outputs": {"content": "Refactored OK"}}, + } + agent.step(state) + + obs_events = [e for e in state.history if isinstance(e, AgentDelegateObservation)] + contents = [o.content for o in obs_events] + assert "Found 5 files" in contents + assert "Refactored OK" in contents + + +# =========================================================================== +# 7. End-to-end: full parallel flow with real agents +# =========================================================================== + + +class TestE2EFullParallelFlow: + """End-to-end test exercising the complete flow with minimal mocking. + + Only the LLM (completion calls) and the HTTP runtime are mocked. + Everything else -- action conversion, parallel detection, runner, + subagent instantiation, state management, synthetic event injection, + and the re-call -- uses real code. + + Flow tested: + Parent LLM call #1 -> 2 task tool calls + | + v + response_to_actions -> 2 AgentDelegateAction + | + v + Parallel detection triggers (>=2 delegates, all delegates, URL set) + | + v + ParallelSubagentRunner.run_parallel() + | + +-> Thread 1: OpenCodeExploreSubAgent + | step() -> LLM returns read tool call + | _execute_action -> HTTP POST -> FileReadObservation + | step() -> LLM returns finish tool call + | _enrich_finish_action -> outputs with metadata + | + +-> Thread 2: OpenCodeGeneralSubAgent + | step() -> LLM returns read tool call + | _execute_action -> HTTP POST -> FileReadObservation + | step() -> LLM returns finish tool call + | _enrich_finish_action -> outputs with metadata + | + v + Results collected, synthetic events injected into state.history + | + v + Parent LLM call #2 -> text summary MessageAction + | + v + Returned to caller + """ + + @staticmethod + def _make_llm_mock(): + """Create a mock LLM with standard config attributes.""" + llm = Mock() + llm.config = Mock() + llm.config.model = "test-model" + llm.config.max_message_chars = 10000 + llm.config.custom_llm_provider = None + llm.vision_is_active = Mock(return_value=False) + llm.is_caching_prompt_active = Mock(return_value=False) + return llm + + @staticmethod + def _make_tool_call_response(tool_calls_spec, response_id="resp"): + """Build a ModelResponse with tool calls. + + tool_calls_spec: list of (name, arguments_dict, call_id) + """ + calls = [] + for name, args, cid in tool_calls_spec: + calls.append({ + "function": {"name": name, "arguments": json.dumps(args)}, + "id": cid, + "type": "function", + }) + return ModelResponse( + id=response_id, + choices=[{ + "message": { + "tool_calls": calls, + "content": None, + "role": "assistant", + }, + "index": 0, + "finish_reason": "tool_calls", + }], + ) + + @staticmethod + def _make_http_read_response(content, path): + """Build the JSON dict that the mocked runtime returns for a read action.""" + return { + "observation": "read", + "content": content, + "extras": {"path": path, "impl_source": "default"}, + } + + def test_full_parallel_flow_two_subagents(self, llm_registry): + """Two subagents run in parallel, each reads a file and finishes.""" + import threading + + from openhands.agenthub.opencode_agent.opencode_agent import OpenCodeAgent + + # ---- 1. Build parent agent with real constructor ---- + agent = OpenCodeAgent(config=AgentConfig(), llm_registry=llm_registry) + + # Parent LLM: call 1 = two task tool calls, call 2 = summary text + parent_llm = self._make_llm_mock() + parent_task_response = self._make_tool_call_response( + [ + ( + TASK_TOOL_NAME, + {"description": "Explore src", "prompt": "Find all Python files in src/", "subagent_type": "explore"}, + "tc-explore", + ), + ( + TASK_TOOL_NAME, + {"description": "Analyze main", "prompt": "Analyze the main module structure", "subagent_type": "general"}, + "tc-general", + ), + ], + response_id="parent-resp-1", + ) + parent_summary_response = _mock_text_response( + "Both subagents completed. Found Python files and analyzed the main module." + ) + parent_llm.completion = Mock( + side_effect=[parent_task_response, parent_summary_response] + ) + agent.llm = parent_llm + + # ---- 2. Build subagent LLM mocks ---- + # Each subagent does: read a file -> finish + # We create two identical mocks since thread ordering is non-deterministic + def make_subagent_llm(file_path, file_content, finish_msg): + llm = self._make_llm_mock() + read_resp = self._make_tool_call_response( + [("read", {"file_path": file_path}, "sub-read")], + response_id="sub-resp-1", + ) + finish_resp = self._make_tool_call_response( + [("finish", {"message": finish_msg}, "sub-finish")], + response_id="sub-resp-2", + ) + llm.completion = Mock(side_effect=[read_resp, finish_resp]) + return llm + + sub_llm_a = make_subagent_llm("/src/main.py", "def main(): pass", "Found 3 functions") + sub_llm_b = make_subagent_llm("/src/utils.py", "def helper(): pass", "Found 5 utilities") + + # ---- 3. Wire get_router to dispatch LLMs ---- + # Call order: parent __init__ (already done), then 2 subagents in threads + call_lock = threading.Lock() + sub_llms = [sub_llm_a, sub_llm_b] + sub_idx = [0] + + original_get_router = llm_registry.get_router + + def mock_get_router(config): + with call_lock: + idx = sub_idx[0] + sub_idx[0] += 1 + if idx < len(sub_llms): + return sub_llms[idx] + return sub_llms[-1] + + llm_registry.get_router = mock_get_router + + # ---- 4. Mock the HTTP runtime ---- + def mock_http_post(url, **kwargs): + response = Mock() + response.status_code = 200 + response.raise_for_status = Mock() + action_data = kwargs.get("json", {}).get("action", {}) + path = action_data.get("args", {}).get("path", "/unknown") + response.json = Mock( + return_value=self._make_http_read_response( + f"# Contents of {path}\ndef example(): pass\n", path + ) + ) + return response + + mock_client = Mock() + mock_client.post = Mock(side_effect=mock_http_post) + mock_client.__enter__ = Mock(return_value=mock_client) + mock_client.__exit__ = Mock(return_value=False) + + # ---- 5. Build real State ---- + state = State(session_id="e2e-parallel-test") + user_msg = MessageAction( + content="Explore the src directory and analyze the main module" + ) + user_msg._source = EventSource.USER + user_msg._id = 0 + state.history = [user_msg] + + history_before = len(state.history) + + # ---- 6. Execute! ---- + with patch.dict("os.environ", {"OPENHANDS_RUNTIME_URL": "http://localhost:9999"}): + with patch("httpx.Client", return_value=mock_client): + action = agent.step(state) + + # ---- 7. Verify results ---- + + # The final action should be a MessageAction (the parent's summary) + assert isinstance(action, MessageAction), f"Expected MessageAction, got {type(action)}" + assert "subagents completed" in action.content.lower() or "both" in action.content.lower() + + # Parent LLM was called exactly twice + assert parent_llm.completion.call_count == 2 + + # Both subagent LLMs were called exactly twice each (read + finish) + assert sub_llm_a.completion.call_count == 2 + assert sub_llm_b.completion.call_count == 2 + + # HTTP endpoint was called twice (one read per subagent) + assert mock_client.post.call_count == 2 + + # Synthetic events were injected: 2 actions + 2 observations = 4 new events + new_events = state.history[history_before:] + assert len(new_events) == 4 + + synth_actions = [e for e in new_events if isinstance(e, AgentDelegateAction)] + synth_obs = [e for e in new_events if isinstance(e, AgentDelegateObservation)] + assert len(synth_actions) == 2 + assert len(synth_obs) == 2 + + # Each observation has tool_call_metadata and is causally linked to its action + for obs in synth_obs: + assert obs.tool_call_metadata is not None + matching_actions = [a for a in synth_actions if a.id == obs.cause] + assert len(matching_actions) == 1 + + # Observation content should contain the subagent finish messages + obs_contents = sorted([o.content for o in synth_obs]) + assert any("3 functions" in c or "5 utilities" in c for c in obs_contents) + + # The tool_call_ids should match the original task calls + obs_tc_ids = {o.tool_call_metadata.tool_call_id for o in synth_obs} + assert obs_tc_ids == {"tc-explore", "tc-general"} + + # Restore original get_router + llm_registry.get_router = original_get_router + + def test_full_flow_subagent_error_does_not_crash_parent(self, llm_registry): + """If one subagent's LLM throws, the other still completes and the parent gets results.""" + import threading + + from openhands.agenthub.opencode_agent.opencode_agent import OpenCodeAgent + + agent = OpenCodeAgent(config=AgentConfig(), llm_registry=llm_registry) + + parent_llm = self._make_llm_mock() + parent_task_response = self._make_tool_call_response( + [ + (TASK_TOOL_NAME, {"description": "A", "prompt": "Task A", "subagent_type": "explore"}, "tc-a"), + (TASK_TOOL_NAME, {"description": "B", "prompt": "Task B", "subagent_type": "general"}, "tc-b"), + ], + response_id="parent-err-1", + ) + parent_summary = _mock_text_response("Partial results received.") + parent_llm.completion = Mock(side_effect=[parent_task_response, parent_summary]) + agent.llm = parent_llm + + # Subagent A: works fine (read -> finish) + ok_llm = self._make_llm_mock() + ok_llm.completion = Mock(side_effect=[ + self._make_tool_call_response( + [("read", {"file_path": "/ok.py"}, "r1")], response_id="ok-1" + ), + self._make_tool_call_response( + [("finish", {"message": "Success from A"}, "f1")], response_id="ok-2" + ), + ]) + + # Subagent B: LLM crashes on first call + bad_llm = self._make_llm_mock() + bad_llm.completion = Mock(side_effect=RuntimeError("LLM API is down")) + + call_lock = threading.Lock() + sub_llms = [ok_llm, bad_llm] + sub_idx = [0] + + def mock_get_router(config): + with call_lock: + idx = sub_idx[0] + sub_idx[0] += 1 + if idx < len(sub_llms): + return sub_llms[idx] + return sub_llms[-1] + + original_get_router = llm_registry.get_router + llm_registry.get_router = mock_get_router + + def mock_http_post(url, **kwargs): + resp = Mock() + resp.status_code = 200 + resp.raise_for_status = Mock() + resp.json = Mock(return_value={ + "observation": "read", + "content": "file ok", + "extras": {"path": "/ok.py", "impl_source": "default"}, + }) + return resp + + mock_client = Mock() + mock_client.post = Mock(side_effect=mock_http_post) + mock_client.__enter__ = Mock(return_value=mock_client) + mock_client.__exit__ = Mock(return_value=False) + + state = State(session_id="e2e-error-test") + user_msg = MessageAction(content="Do two things") + user_msg._source = EventSource.USER + user_msg._id = 0 + state.history = [user_msg] + + with patch.dict("os.environ", {"OPENHANDS_RUNTIME_URL": "http://localhost:9999"}): + with patch("httpx.Client", return_value=mock_client): + action = agent.step(state) + + # Should still succeed -- parent gets partial results + assert isinstance(action, MessageAction) + + # Both synthetic observations exist (one success, one error) + synth_obs = [ + e for e in state.history if isinstance(e, AgentDelegateObservation) + ] + assert len(synth_obs) == 2 + + obs_contents = [o.content for o in synth_obs] + # One should have success content, one should have error content + has_success = any("Success from A" in c or "file ok" in c for c in obs_contents) + has_error = any("error" in c.lower() for c in obs_contents) + assert has_success or has_error # at least one completed + + llm_registry.get_router = original_get_router diff --git a/tests/unit/agenthub/test_opencode_subagents.py b/tests/unit/agenthub/test_opencode_subagents.py new file mode 100644 index 000000000000..7db50d4558f8 --- /dev/null +++ b/tests/unit/agenthub/test_opencode_subagents.py @@ -0,0 +1,1840 @@ +"""Comprehensive end-to-end tests for the OpenCode subagent architecture. + +Covers: +1. Task tool definition (schema validation) +2. Function calling: task tool -> AgentDelegateAction conversion +3. SubagentMixin: tool summary collection, title derivation, finish enrichment +4. Subagent classes: tool lists, step() enrichment, prompt managers +5. Agent registration and instantiation +6. E2E integration: parent delegates -> subagent runs -> outputs match opencode format +""" + +import asyncio +import json +from collections import deque +from unittest.mock import MagicMock, Mock, patch +from uuid import uuid4 + +import pytest +from litellm import ModelResponse + +from openhands.agenthub.opencode_agent.function_calling import response_to_actions +from openhands.agenthub.opencode_agent.subagents.explore import ( + OpenCodeExploreSubAgent, +) +from openhands.agenthub.opencode_agent.subagents.general import ( + OpenCodeGeneralSubAgent, +) +from openhands.agenthub.opencode_agent.subagents.mixin import SubagentMixin +from openhands.agenthub.opencode_agent.tools.task import ( + SUBAGENT_NAME_MAP, + SUBAGENT_REGISTRY, + TASK_TOOL_NAME, + TaskTool, +) +from openhands.controller.agent import Agent +from openhands.controller.state.state import State +from openhands.core.config import AgentConfig, LLMConfig +from openhands.core.config.openhands_config import OpenHandsConfig +from openhands.core.schema import ActionType +from openhands.events.action import ( + AgentDelegateAction, + AgentFinishAction, + AgentThinkAction, + CmdRunAction, + FileEditAction, + GlobAction, + GrepAction, + ListDirAction, + MessageAction, + OpenCodeReadAction, + OpenCodeWriteAction, +) +from openhands.events.action.agent import ValidationFailureAction +from openhands.events.event import EventSource, FileEditSource +from openhands.events.observation.delegate import AgentDelegateObservation +from openhands.events.tool import ToolCallMetadata +from openhands.llm.llm_registry import LLMRegistry + + +# ============================================================================== +# Helper Functions +# ============================================================================== + + +def create_mock_response(function_name: str, arguments: dict) -> ModelResponse: + """Create a mock LLM response with a single tool call.""" + return ModelResponse( + id='mock-id', + choices=[ + { + 'message': { + 'tool_calls': [ + { + 'function': { + 'name': function_name, + 'arguments': json.dumps(arguments), + }, + 'id': 'mock-tool-call-id', + 'type': 'function', + } + ], + 'content': None, + 'role': 'assistant', + }, + 'index': 0, + 'finish_reason': 'tool_calls', + } + ], + ) + + +def create_mock_response_with_thought( + function_name: str, arguments: dict, thought: str +) -> ModelResponse: + """Create a mock LLM response with thought content and a tool call.""" + return ModelResponse( + id='mock-id', + choices=[ + { + 'message': { + 'tool_calls': [ + { + 'function': { + 'name': function_name, + 'arguments': json.dumps(arguments), + }, + 'id': 'mock-tool-call-id', + 'type': 'function', + } + ], + 'content': thought, + 'role': 'assistant', + }, + 'index': 0, + 'finish_reason': 'tool_calls', + } + ], + ) + + +def create_mock_response_multi_tool( + tool_calls: list[tuple[str, dict]], +) -> ModelResponse: + """Create a mock LLM response with multiple tool calls.""" + calls = [] + for i, (name, args) in enumerate(tool_calls): + calls.append( + { + 'function': { + 'name': name, + 'arguments': json.dumps(args), + }, + 'id': f'mock-tool-call-id-{i}', + 'type': 'function', + } + ) + return ModelResponse( + id='mock-id', + choices=[ + { + 'message': { + 'tool_calls': calls, + 'content': None, + 'role': 'assistant', + }, + 'index': 0, + 'finish_reason': 'tool_calls', + } + ], + ) + + +def _make_action_with_metadata( + action_cls, function_name: str, tool_call_id: str = 'call_001', **kwargs +): + """Create an action instance with tool_call_metadata attached (simulating what function_calling produces).""" + action = action_cls(**kwargs) + mock_response = ModelResponse( + id='mock-resp', + choices=[ + { + 'message': {'content': None, 'role': 'assistant', 'tool_calls': []}, + 'index': 0, + 'finish_reason': 'stop', + } + ], + ) + action.tool_call_metadata = ToolCallMetadata( + tool_call_id=tool_call_id, + function_name=function_name, + model_response=mock_response, + total_calls_in_response=1, + ) + return action + + +def _make_mock_state( + history=None, inputs=None, session_id='test-session-123' +) -> Mock: + """Create a mock State with the given history and inputs.""" + state = Mock(spec=State) + state.history = history or [] + state.inputs = inputs or {} + state.session_id = session_id + state.extra_data = {} + return state + + +@pytest.fixture +def create_llm_registry(): + def _get_registry(llm_config=None): + if llm_config is None: + llm_config = LLMConfig(model='gpt-4o', api_key='test_key') + config = OpenHandsConfig() + config.set_llm_config(llm_config) + return LLMRegistry(config=config) + + return _get_registry + + +# ============================================================================== +# 1. Task Tool Definition Tests +# ============================================================================== + + +class TestTaskToolDefinition: + """Tests for the task tool schema and configuration.""" + + def test_task_tool_type(self): + assert TaskTool['type'] == 'function' + + def test_task_tool_name(self): + func = TaskTool['function'] + assert func['name'] == TASK_TOOL_NAME + assert func['name'] == 'task' + + def test_task_tool_has_parameters(self): + func = TaskTool['function'] + assert 'parameters' in func + + def test_task_tool_required_params(self): + params = TaskTool['function']['parameters'] + assert set(params['required']) == {'description', 'prompt', 'subagent_type'} + + def test_task_tool_description_param(self): + props = TaskTool['function']['parameters']['properties'] + assert 'description' in props + assert props['description']['type'] == 'string' + + def test_task_tool_prompt_param(self): + props = TaskTool['function']['parameters']['properties'] + assert 'prompt' in props + assert props['prompt']['type'] == 'string' + + def test_task_tool_subagent_type_param(self): + props = TaskTool['function']['parameters']['properties'] + assert 'subagent_type' in props + assert props['subagent_type']['type'] == 'string' + assert 'enum' in props['subagent_type'] + + def test_task_tool_subagent_type_enum_values(self): + props = TaskTool['function']['parameters']['properties'] + enum_vals = props['subagent_type']['enum'] + assert 'general' in enum_vals + assert 'explore' in enum_vals + assert len(enum_vals) == len(SUBAGENT_REGISTRY) + + def test_task_tool_description_contains_agents(self): + desc = TaskTool['function']['description'] + assert 'general' in desc.lower() + assert 'explore' in desc.lower() + + def test_task_tool_description_contains_usage_guidelines(self): + desc = TaskTool['function']['description'] + assert 'When to use' in desc + assert 'When NOT to use' in desc + + def test_subagent_registry_keys(self): + assert 'general' in SUBAGENT_REGISTRY + assert 'explore' in SUBAGENT_REGISTRY + + def test_subagent_name_map_matches_registry(self): + assert set(SUBAGENT_NAME_MAP.keys()) == set(SUBAGENT_REGISTRY.keys()) + + def test_subagent_name_map_values(self): + assert SUBAGENT_NAME_MAP['general'] == 'OpenCodeGeneralSubAgent' + assert SUBAGENT_NAME_MAP['explore'] == 'OpenCodeExploreSubAgent' + + +# ============================================================================== +# 2. Function Calling: Task Tool -> AgentDelegateAction +# ============================================================================== + + +class TestTaskToolFunctionCalling: + """Tests for converting task tool calls to AgentDelegateAction.""" + + def test_task_tool_general_creates_delegate_action(self): + response = create_mock_response( + TASK_TOOL_NAME, + { + 'description': 'Search codebase', + 'prompt': 'Find all uses of the Config class', + 'subagent_type': 'general', + }, + ) + actions = response_to_actions(response) + assert len(actions) == 1 + action = actions[0] + assert isinstance(action, AgentDelegateAction) + assert action.agent == 'OpenCodeGeneralSubAgent' + assert action.inputs['task'] == 'Find all uses of the Config class' + assert action.inputs['description'] == 'Search codebase' + + def test_task_tool_explore_creates_delegate_action(self): + response = create_mock_response( + TASK_TOOL_NAME, + { + 'description': 'Explore project', + 'prompt': 'Find all Python files in src/', + 'subagent_type': 'explore', + }, + ) + actions = response_to_actions(response) + assert len(actions) == 1 + action = actions[0] + assert isinstance(action, AgentDelegateAction) + assert action.agent == 'OpenCodeExploreSubAgent' + assert action.inputs['task'] == 'Find all Python files in src/' + assert action.inputs['description'] == 'Explore project' + + def test_task_tool_missing_prompt_raises_validation_error(self): + response = create_mock_response( + TASK_TOOL_NAME, + { + 'description': 'Test', + 'subagent_type': 'general', + }, + ) + actions = response_to_actions(response) + assert len(actions) == 1 + assert isinstance(actions[0], ValidationFailureAction) + assert 'prompt' in actions[0].error_message.lower() + + def test_task_tool_missing_subagent_type_raises_validation_error(self): + response = create_mock_response( + TASK_TOOL_NAME, + { + 'description': 'Test', + 'prompt': 'Do something', + }, + ) + actions = response_to_actions(response) + assert len(actions) == 1 + assert isinstance(actions[0], ValidationFailureAction) + assert 'subagent_type' in actions[0].error_message.lower() + + def test_task_tool_invalid_subagent_type_raises_validation_error(self): + response = create_mock_response( + TASK_TOOL_NAME, + { + 'description': 'Test', + 'prompt': 'Do something', + 'subagent_type': 'nonexistent_agent', + }, + ) + actions = response_to_actions(response) + assert len(actions) == 1 + assert isinstance(actions[0], ValidationFailureAction) + assert 'nonexistent_agent' in actions[0].error_message + + def test_task_tool_with_thought(self): + response = create_mock_response_with_thought( + TASK_TOOL_NAME, + { + 'description': 'Explore structure', + 'prompt': 'Map the directory layout', + 'subagent_type': 'explore', + }, + thought='Let me delegate this to the explore agent.', + ) + actions = response_to_actions(response) + assert len(actions) == 1 + action = actions[0] + assert isinstance(action, AgentDelegateAction) + assert action.thought == 'Let me delegate this to the explore agent.' + + def test_task_tool_description_defaults_to_empty(self): + response = create_mock_response( + TASK_TOOL_NAME, + { + 'prompt': 'Find all tests', + 'subagent_type': 'explore', + }, + ) + actions = response_to_actions(response) + assert len(actions) == 1 + action = actions[0] + assert isinstance(action, AgentDelegateAction) + assert action.inputs['description'] == '' + + def test_task_tool_has_tool_call_metadata(self): + response = create_mock_response( + TASK_TOOL_NAME, + { + 'description': 'Test task', + 'prompt': 'Do something', + 'subagent_type': 'general', + }, + ) + actions = response_to_actions(response) + assert len(actions) == 1 + action = actions[0] + assert action.tool_call_metadata is not None + assert action.tool_call_metadata.function_name == TASK_TOOL_NAME + assert action.tool_call_metadata.tool_call_id == 'mock-tool-call-id' + + def test_task_tool_action_type_is_delegate(self): + response = create_mock_response( + TASK_TOOL_NAME, + { + 'description': 'Test', + 'prompt': 'Do it', + 'subagent_type': 'general', + }, + ) + actions = response_to_actions(response) + assert actions[0].action == ActionType.DELEGATE + + def test_task_tool_among_multiple_tool_calls(self): + """Test task tool call when it appears alongside other tool calls.""" + response = create_mock_response_multi_tool( + [ + ( + TASK_TOOL_NAME, + { + 'description': 'Explore code', + 'prompt': 'Find all classes', + 'subagent_type': 'explore', + }, + ), + ( + TASK_TOOL_NAME, + { + 'description': 'Implement feature', + 'prompt': 'Add logging to main.py', + 'subagent_type': 'general', + }, + ), + ] + ) + actions = response_to_actions(response) + assert len(actions) == 2 + assert isinstance(actions[0], AgentDelegateAction) + assert actions[0].agent == 'OpenCodeExploreSubAgent' + assert isinstance(actions[1], AgentDelegateAction) + assert actions[1].agent == 'OpenCodeGeneralSubAgent' + + +# ============================================================================== +# 3. SubagentMixin Tests +# ============================================================================== + + +class TestSubagentMixin: + """Tests for SubagentMixin: tool summary collection and finish action enrichment.""" + + def _make_mixin_instance(self, model_name='gpt-4o'): + """Create a SubagentMixin instance with a mock LLM.""" + mixin = SubagentMixin() + mixin.llm = Mock() + mixin.llm.config = Mock() + mixin.llm.config.model = model_name + return mixin + + # -- _derive_tool_title -- + + def test_derive_title_read(self): + mixin = self._make_mixin_instance() + action = Mock() + action.path = '/src/main.py' + assert mixin._derive_tool_title(action, 'read') == 'Read /src/main.py' + + def test_derive_title_write(self): + mixin = self._make_mixin_instance() + action = Mock() + action.path = '/src/new.py' + assert mixin._derive_tool_title(action, 'write') == 'Write /src/new.py' + + def test_derive_title_edit(self): + mixin = self._make_mixin_instance() + action = Mock() + action.path = '/src/config.py' + assert mixin._derive_tool_title(action, 'edit') == 'Edit /src/config.py' + + def test_derive_title_glob(self): + mixin = self._make_mixin_instance() + action = Mock() + action.pattern = '**/*.py' + assert mixin._derive_tool_title(action, 'glob') == 'Glob **/*.py' + + def test_derive_title_grep(self): + mixin = self._make_mixin_instance() + action = Mock() + action.pattern = 'import os' + assert mixin._derive_tool_title(action, 'grep') == 'Grep import os' + + def test_derive_title_list_dir(self): + mixin = self._make_mixin_instance() + action = Mock() + action.path = '/src' + assert mixin._derive_tool_title(action, 'list_dir') == 'List /src' + + def test_derive_title_bash(self): + mixin = self._make_mixin_instance() + action = Mock() + action.command = 'git log --oneline -10' + title = mixin._derive_tool_title(action, 'execute_bash') + assert title == 'Bash: git log --oneline -10' + + def test_derive_title_bash_truncates(self): + mixin = self._make_mixin_instance() + action = Mock() + action.command = 'x' * 100 + title = mixin._derive_tool_title(action, 'cmd_run') + assert len(title) <= 66 # "Bash: " + 60 chars + + def test_derive_title_think(self): + mixin = self._make_mixin_instance() + action = Mock() + action.thought = 'Let me reason about the bug in the parser module carefully' + title = mixin._derive_tool_title(action, 'think') + assert title.startswith('Think: ') + assert len(title) <= 48 # "Think: " + 40 chars + + def test_derive_title_unknown_tool(self): + mixin = self._make_mixin_instance() + action = Mock() + assert mixin._derive_tool_title(action, 'unknown_tool') == '' + + def test_derive_title_missing_attribute(self): + mixin = self._make_mixin_instance() + action = Mock(spec=[]) # No attributes + assert mixin._derive_tool_title(action, 'read') == '' + + # -- _collect_tool_summary -- + + def test_collect_summary_empty_history(self): + mixin = self._make_mixin_instance() + state = _make_mock_state(history=[]) + summary = mixin._collect_tool_summary(state) + assert summary == [] + + def test_collect_summary_skips_events_without_metadata(self): + mixin = self._make_mixin_instance() + event = Mock() + del event.tool_call_metadata # hasattr will return False + state = _make_mock_state(history=[event]) + summary = mixin._collect_tool_summary(state) + assert summary == [] + + def test_collect_summary_skips_none_metadata(self): + mixin = self._make_mixin_instance() + event = Mock() + event.tool_call_metadata = None + state = _make_mock_state(history=[event]) + summary = mixin._collect_tool_summary(state) + assert summary == [] + + def test_collect_summary_skips_observations(self): + """Observations have tool_call_metadata but should be skipped.""" + mixin = self._make_mixin_instance() + from openhands.events.observation.observation import Observation + + obs = Mock(spec=Observation) + obs.tool_call_metadata = ToolCallMetadata( + tool_call_id='call_1', + function_name='read', + model_response=ModelResponse(id='r', choices=[]), + total_calls_in_response=1, + ) + state = _make_mock_state(history=[obs]) + summary = mixin._collect_tool_summary(state) + assert summary == [] + + def test_collect_summary_single_action(self): + mixin = self._make_mixin_instance() + action = _make_action_with_metadata( + OpenCodeReadAction, + 'read', + tool_call_id='call_001', + path='/test.py', + offset=0, + limit=2000, + ) + state = _make_mock_state(history=[action]) + summary = mixin._collect_tool_summary(state) + assert len(summary) == 1 + assert summary[0]['id'] == 'call_001' + assert summary[0]['tool'] == 'read' + assert summary[0]['state']['status'] == 'completed' + assert summary[0]['state']['title'] == 'Read /test.py' + + def test_collect_summary_multiple_actions(self): + mixin = self._make_mixin_instance() + actions = [ + _make_action_with_metadata( + OpenCodeReadAction, + 'read', + tool_call_id='call_001', + path='/a.py', + offset=0, + limit=2000, + ), + _make_action_with_metadata( + GrepAction, + 'grep', + tool_call_id='call_002', + pattern='import', + path='.', + include='', + ), + _make_action_with_metadata( + GlobAction, + 'glob', + tool_call_id='call_003', + pattern='**/*.py', + path='.', + ), + ] + state = _make_mock_state(history=actions) + summary = mixin._collect_tool_summary(state) + assert len(summary) == 3 + assert summary[0]['tool'] == 'read' + assert summary[1]['tool'] == 'grep' + assert summary[2]['tool'] == 'glob' + + def test_collect_summary_skips_none_function_name(self): + mixin = self._make_mixin_instance() + action = Mock() + action.tool_call_metadata = ToolCallMetadata( + tool_call_id='call_x', + function_name=None, + model_response=ModelResponse(id='r', choices=[]), + total_calls_in_response=1, + ) + state = _make_mock_state(history=[action]) + summary = mixin._collect_tool_summary(state) + assert summary == [] + + # -- _enrich_finish_action -- + + def test_enrich_finish_basic(self): + mixin = self._make_mixin_instance(model_name='claude-3-opus') + finish = AgentFinishAction(final_thought='Found 3 files matching the pattern.') + state = _make_mock_state( + history=[], + inputs={'description': 'Search codebase', 'task': 'Find files'}, + session_id='session-abc', + ) + result = mixin._enrich_finish_action(finish, state) + + assert result.outputs['title'] == 'Search codebase' + assert result.outputs['metadata']['sessionId'] == 'session-abc' + assert result.outputs['metadata']['model'] == 'claude-3-opus' + assert result.outputs['metadata']['summary'] == [] + assert '' in result.outputs['output'] + assert 'session_id: session-abc' in result.outputs['output'] + assert '' in result.outputs['output'] + assert 'Found 3 files matching the pattern.' in result.outputs['output'] + + def test_enrich_finish_content_key_for_conversation_memory(self): + """The 'content' key is used by ConversationMemory for AgentDelegateObservation.""" + mixin = self._make_mixin_instance() + finish = AgentFinishAction(final_thought='Done.') + state = _make_mock_state( + inputs={'description': 'Test'}, + session_id='sess-1', + ) + result = mixin._enrich_finish_action(finish, state) + + assert 'content' in result.outputs + assert result.outputs['content'] == result.outputs['output'] + assert 'Done.' in result.outputs['content'] + assert '' in result.outputs['content'] + + def test_enrich_finish_with_tool_history(self): + mixin = self._make_mixin_instance() + read_action = _make_action_with_metadata( + OpenCodeReadAction, + 'read', + tool_call_id='call_01', + path='/config.py', + offset=0, + limit=2000, + ) + grep_action = _make_action_with_metadata( + GrepAction, + 'grep', + tool_call_id='call_02', + pattern='API_KEY', + path='.', + include='', + ) + finish = AgentFinishAction(final_thought='API_KEY found in config.py line 42.') + state = _make_mock_state( + history=[read_action, grep_action], + inputs={'description': 'Find API key'}, + session_id='sess-2', + ) + result = mixin._enrich_finish_action(finish, state) + + summary = result.outputs['metadata']['summary'] + assert len(summary) == 2 + assert summary[0]['tool'] == 'read' + assert summary[0]['state']['title'] == 'Read /config.py' + assert summary[1]['tool'] == 'grep' + assert summary[1]['state']['title'] == 'Grep API_KEY' + + def test_enrich_finish_fallback_to_thought(self): + """When final_thought is empty, fall back to thought field.""" + mixin = self._make_mixin_instance() + finish = AgentFinishAction(thought='Completed via thought field.') + state = _make_mock_state(inputs={}, session_id='s') + result = mixin._enrich_finish_action(finish, state) + assert 'Completed via thought field.' in result.outputs['output'] + + def test_enrich_finish_empty_inputs(self): + mixin = self._make_mixin_instance() + finish = AgentFinishAction(final_thought='Done.') + state = _make_mock_state(inputs={}, session_id='s') + result = mixin._enrich_finish_action(finish, state) + assert result.outputs['title'] == '' + + def test_enrich_finish_no_llm(self): + """Mixin without llm attribute should still work (model defaults to empty).""" + mixin = SubagentMixin() + finish = AgentFinishAction(final_thought='Result.') + state = _make_mock_state(inputs={}, session_id='s') + result = mixin._enrich_finish_action(finish, state) + assert result.outputs['metadata']['model'] == '' + + def test_output_format_matches_opencode(self): + """Verify the output structure matches the opencode task tool return format.""" + mixin = self._make_mixin_instance(model_name='gpt-4o') + action = _make_action_with_metadata( + OpenCodeReadAction, + 'read', + tool_call_id='call_abc', + path='/main.py', + offset=0, + limit=2000, + ) + finish = AgentFinishAction(final_thought='Analysis complete.') + state = _make_mock_state( + history=[action], + inputs={'description': 'Analyze main', 'task': 'Read main.py'}, + session_id='session-xyz', + ) + result = mixin._enrich_finish_action(finish, state) + outputs = result.outputs + + # Verify all required keys exist + assert 'title' in outputs + assert 'metadata' in outputs + assert 'output' in outputs + assert 'content' in outputs + + # Verify metadata structure + metadata = outputs['metadata'] + assert 'summary' in metadata + assert 'sessionId' in metadata + assert 'model' in metadata + + # Verify summary entry structure + assert len(metadata['summary']) == 1 + entry = metadata['summary'][0] + assert 'id' in entry + assert 'tool' in entry + assert 'state' in entry + assert 'status' in entry['state'] + + # Verify output text format + output = outputs['output'] + assert output.startswith('Analysis complete.') + assert '\nsession_id: session-xyz\n' in output + + +# ============================================================================== +# 4. Subagent Classes: Tool Lists and Step Enrichment +# ============================================================================== + + +class TestOpenCodeGeneralSubAgent: + """Tests for OpenCodeGeneralSubAgent tool list and behavior.""" + + @pytest.fixture + def general_agent(self, create_llm_registry): + llm_config = LLMConfig(model='gpt-4o', api_key='test_key') + config = AgentConfig() + agent = OpenCodeGeneralSubAgent( + config=config, llm_registry=create_llm_registry(llm_config) + ) + agent.llm = Mock() + agent.llm.config = Mock() + agent.llm.config.model = 'gpt-4o' + agent.llm.config.max_message_chars = 10000 + return agent + + def test_general_tools_include_file_ops(self, general_agent): + tool_names = [t['function']['name'] for t in general_agent.tools] + assert 'read' in tool_names + assert 'write' in tool_names + assert 'edit' in tool_names + + def test_general_tools_include_search(self, general_agent): + tool_names = [t['function']['name'] for t in general_agent.tools] + assert 'glob' in tool_names + assert 'grep' in tool_names + assert 'list_dir' in tool_names + + def test_general_tools_include_think(self, general_agent): + tool_names = [t['function']['name'] for t in general_agent.tools] + assert 'think' in tool_names + + def test_general_tools_include_finish(self, general_agent): + tool_names = [t['function']['name'] for t in general_agent.tools] + assert 'finish' in tool_names + + def test_general_tools_exclude_todo(self, general_agent): + tool_names = [t['function']['name'] for t in general_agent.tools] + assert 'todo_read' not in tool_names + assert 'todo_write' not in tool_names + + def test_general_tools_exclude_task(self, general_agent): + tool_names = [t['function']['name'] for t in general_agent.tools] + assert 'task' not in tool_names + + def test_general_prompt_manager(self, general_agent): + pm = general_agent.prompt_manager + assert pm is not None + + def test_general_is_subagent_mixin(self, general_agent): + assert isinstance(general_agent, SubagentMixin) + + def test_general_is_opencode_agent(self, general_agent): + from openhands.agenthub.opencode_agent.opencode_agent import OpenCodeAgent + + assert isinstance(general_agent, OpenCodeAgent) + + +class TestOpenCodeExploreSubAgent: + """Tests for OpenCodeExploreSubAgent tool list and behavior.""" + + @pytest.fixture + def explore_agent(self, create_llm_registry): + llm_config = LLMConfig(model='gpt-4o', api_key='test_key') + config = AgentConfig() + agent = OpenCodeExploreSubAgent( + config=config, llm_registry=create_llm_registry(llm_config) + ) + agent.llm = Mock() + agent.llm.config = Mock() + agent.llm.config.model = 'gpt-4o' + agent.llm.config.max_message_chars = 10000 + return agent + + def test_explore_tools_include_read_only(self, explore_agent): + tool_names = [t['function']['name'] for t in explore_agent.tools] + assert 'read' in tool_names + assert 'glob' in tool_names + assert 'grep' in tool_names + assert 'list_dir' in tool_names + + def test_explore_tools_include_think(self, explore_agent): + tool_names = [t['function']['name'] for t in explore_agent.tools] + assert 'think' in tool_names + + def test_explore_tools_include_finish(self, explore_agent): + tool_names = [t['function']['name'] for t in explore_agent.tools] + assert 'finish' in tool_names + + def test_explore_tools_exclude_write_ops(self, explore_agent): + tool_names = [t['function']['name'] for t in explore_agent.tools] + assert 'write' not in tool_names + assert 'edit' not in tool_names + + def test_explore_tools_exclude_todo(self, explore_agent): + tool_names = [t['function']['name'] for t in explore_agent.tools] + assert 'todo_read' not in tool_names + assert 'todo_write' not in tool_names + + def test_explore_tools_exclude_task(self, explore_agent): + tool_names = [t['function']['name'] for t in explore_agent.tools] + assert 'task' not in tool_names + + def test_explore_prompt_manager(self, explore_agent): + pm = explore_agent.prompt_manager + assert pm is not None + + def test_explore_is_subagent_mixin(self, explore_agent): + assert isinstance(explore_agent, SubagentMixin) + + +# ============================================================================== +# 5. Agent Registration Tests +# ============================================================================== + + +class TestAgentRegistration: + """Tests that subagent classes are properly registered.""" + + def test_general_subagent_registered(self): + import openhands.agenthub.opencode_agent # noqa: F401 - triggers registration + + cls = Agent.get_cls('OpenCodeGeneralSubAgent') + assert cls is not None + + def test_explore_subagent_registered(self): + import openhands.agenthub.opencode_agent # noqa: F401 - triggers registration + + cls = Agent.get_cls('OpenCodeExploreSubAgent') + assert cls is not None + + def test_primary_agent_still_registered(self): + import openhands.agenthub.opencode_agent # noqa: F401 - triggers registration + + cls = Agent.get_cls('OpenCodeAgent') + assert cls is not None + + def test_general_subagent_class_identity(self): + import openhands.agenthub.opencode_agent # noqa: F401 + + cls = Agent.get_cls('OpenCodeGeneralSubAgent') + assert cls is OpenCodeGeneralSubAgent or ( + callable(cls) and 'General' in str(cls) + ) + + def test_explore_subagent_class_identity(self): + import openhands.agenthub.opencode_agent # noqa: F401 + + cls = Agent.get_cls('OpenCodeExploreSubAgent') + assert cls is OpenCodeExploreSubAgent or ( + callable(cls) and 'Explore' in str(cls) + ) + + def test_general_subagent_can_instantiate(self, create_llm_registry): + llm_config = LLMConfig(model='gpt-4o', api_key='test_key') + config = AgentConfig() + agent = OpenCodeGeneralSubAgent( + config=config, llm_registry=create_llm_registry(llm_config) + ) + assert agent is not None + assert agent.tools is not None + + def test_explore_subagent_can_instantiate(self, create_llm_registry): + llm_config = LLMConfig(model='gpt-4o', api_key='test_key') + config = AgentConfig() + agent = OpenCodeExploreSubAgent( + config=config, llm_registry=create_llm_registry(llm_config) + ) + assert agent is not None + assert agent.tools is not None + + +# ============================================================================== +# 6. E2E Integration: Full Delegation Flow with Output Format Verification +# ============================================================================== + + +class TestE2ESubagentDelegation: + """End-to-end tests for the subagent delegation flow. + + Verifies: parent calls task tool -> AgentDelegateAction created -> + subagent runs -> AgentFinishAction enriched -> outputs match opencode format. + """ + + def test_full_flow_explore_subagent(self, create_llm_registry): + """Simulate a full explore subagent flow: task tool call -> delegate -> finish with enriched outputs.""" + # Step 1: Parent LLM returns a task tool call + parent_response = create_mock_response( + TASK_TOOL_NAME, + { + 'description': 'Explore test structure', + 'prompt': 'Find all test files and describe the test organization', + 'subagent_type': 'explore', + }, + ) + parent_actions = response_to_actions(parent_response) + assert len(parent_actions) == 1 + delegate_action = parent_actions[0] + assert isinstance(delegate_action, AgentDelegateAction) + assert delegate_action.agent == 'OpenCodeExploreSubAgent' + + # Step 2: The controller would create a child State with the inputs + child_state = _make_mock_state( + inputs=delegate_action.inputs, + session_id='delegate-session-001', + ) + + # Step 3: Simulate the explore subagent executing some tools + read_action = _make_action_with_metadata( + OpenCodeReadAction, + 'read', + tool_call_id='call_e1', + path='/tests/conftest.py', + offset=0, + limit=2000, + ) + glob_action = _make_action_with_metadata( + GlobAction, + 'glob', + tool_call_id='call_e2', + pattern='tests/**/*.py', + path='.', + ) + grep_action = _make_action_with_metadata( + GrepAction, + 'grep', + tool_call_id='call_e3', + pattern='def test_', + path='.', + include='', + ) + child_state.history = [read_action, glob_action, grep_action] + + # Step 4: The explore subagent calls finish + finish_action = AgentFinishAction( + final_thought='Found 45 test files organized in tests/unit/ and tests/e2e/. ' + 'Key test categories: agenthub, controller, memory, runtime.' + ) + + # Step 5: SubagentMixin enriches the finish action + llm_config = LLMConfig(model='claude-3-opus', api_key='test_key') + config = AgentConfig() + explore_agent = OpenCodeExploreSubAgent( + config=config, llm_registry=create_llm_registry(llm_config) + ) + explore_agent.llm = Mock() + explore_agent.llm.config = Mock() + explore_agent.llm.config.model = 'claude-3-opus' + + enriched = explore_agent._enrich_finish_action(finish_action, child_state) + + # Step 6: Verify the outputs match the opencode format exactly + outputs = enriched.outputs + + # Title from inputs + assert outputs['title'] == 'Explore test structure' + + # Metadata + metadata = outputs['metadata'] + assert metadata['sessionId'] == 'delegate-session-001' + assert metadata['model'] == 'claude-3-opus' + + # Summary of tool executions + summary = metadata['summary'] + assert len(summary) == 3 + assert summary[0] == { + 'id': 'call_e1', + 'tool': 'read', + 'state': {'status': 'completed', 'title': 'Read /tests/conftest.py'}, + } + assert summary[1] == { + 'id': 'call_e2', + 'tool': 'glob', + 'state': {'status': 'completed', 'title': 'Glob tests/**/*.py'}, + } + assert summary[2] == { + 'id': 'call_e3', + 'tool': 'grep', + 'state': {'status': 'completed', 'title': 'Grep def test_'}, + } + + # Output text with task_metadata block + output = outputs['output'] + assert output.startswith('Found 45 test files') + assert '\nsession_id: delegate-session-001\n' in output + + # Content key (used by ConversationMemory) + assert outputs['content'] == outputs['output'] + + def test_full_flow_general_subagent(self, create_llm_registry): + """Simulate a full general subagent flow with write operations.""" + # Step 1: Task tool call + parent_response = create_mock_response( + TASK_TOOL_NAME, + { + 'description': 'Add logging', + 'prompt': 'Add debug logging to the main module', + 'subagent_type': 'general', + }, + ) + parent_actions = response_to_actions(parent_response) + delegate_action = parent_actions[0] + assert delegate_action.agent == 'OpenCodeGeneralSubAgent' + + # Step 2: Child state + child_state = _make_mock_state( + inputs=delegate_action.inputs, + session_id='delegate-session-002', + ) + + # Step 3: General subagent executes read, then edit + read_action = _make_action_with_metadata( + OpenCodeReadAction, + 'read', + tool_call_id='call_g1', + path='/src/main.py', + offset=0, + limit=2000, + ) + edit_action = _make_action_with_metadata( + FileEditAction, + 'edit', + tool_call_id='call_g2', + path='/src/main.py', + command='str_replace', + old_str='def main():', + new_str='def main():\n logger.debug("Starting main")', + impl_source=FileEditSource.OH_ACI, + ) + child_state.history = [read_action, edit_action] + + # Step 4: Finish + finish_action = AgentFinishAction( + final_thought='Added debug logging to main() function in /src/main.py.' + ) + + # Step 5: Enrich + llm_config = LLMConfig(model='gpt-4o', api_key='test_key') + config = AgentConfig() + general_agent = OpenCodeGeneralSubAgent( + config=config, llm_registry=create_llm_registry(llm_config) + ) + general_agent.llm = Mock() + general_agent.llm.config = Mock() + general_agent.llm.config.model = 'gpt-4o' + + enriched = general_agent._enrich_finish_action(finish_action, child_state) + outputs = enriched.outputs + + assert outputs['title'] == 'Add logging' + assert len(outputs['metadata']['summary']) == 2 + assert outputs['metadata']['summary'][0]['tool'] == 'read' + assert outputs['metadata']['summary'][1]['tool'] == 'edit' + assert outputs['metadata']['summary'][1]['state']['title'] == 'Edit /src/main.py' + assert 'Added debug logging' in outputs['output'] + assert '' in outputs['output'] + + def test_delegate_observation_content_uses_output(self): + """Verify that AgentDelegateObservation.outputs['content'] contains the + opencode-formatted text, which ConversationMemory will use.""" + outputs = { + 'content': 'Analysis done.\n\n\nsession_id: s1\n', + 'title': 'Test', + 'metadata': { + 'summary': [{'id': 'c1', 'tool': 'read', 'state': {'status': 'completed'}}], + 'sessionId': 's1', + 'model': 'gpt-4o', + }, + 'output': 'Analysis done.\n\n\nsession_id: s1\n', + } + obs = AgentDelegateObservation( + outputs=outputs, + content='Delegated agent finished with result:\n\nTest agent done', + ) + + # ConversationMemory reads obs.outputs.get('content', obs.content) + effective_content = obs.outputs.get('content', obs.content) + assert effective_content == outputs['content'] + assert '' in effective_content + assert 'session_id: s1' in effective_content + + def test_parallel_task_tool_calls(self): + """Test that multiple concurrent task tool calls produce independent delegate actions.""" + response = create_mock_response_multi_tool( + [ + ( + TASK_TOOL_NAME, + { + 'description': 'Explore backend', + 'prompt': 'Find all API routes in the backend', + 'subagent_type': 'explore', + }, + ), + ( + TASK_TOOL_NAME, + { + 'description': 'Explore frontend', + 'prompt': 'Find all React components', + 'subagent_type': 'explore', + }, + ), + ] + ) + actions = response_to_actions(response) + assert len(actions) == 2 + + assert isinstance(actions[0], AgentDelegateAction) + assert actions[0].agent == 'OpenCodeExploreSubAgent' + assert actions[0].inputs['description'] == 'Explore backend' + + assert isinstance(actions[1], AgentDelegateAction) + assert actions[1].agent == 'OpenCodeExploreSubAgent' + assert actions[1].inputs['description'] == 'Explore frontend' + + # Each has its own tool_call_id + assert actions[0].tool_call_metadata.tool_call_id != actions[1].tool_call_metadata.tool_call_id + + def test_mixed_task_and_regular_tool_calls(self): + """Test task tool mixed with regular tools in a single response.""" + response = create_mock_response_multi_tool( + [ + ( + 'read', + {'file_path': '/README.md'}, + ), + ( + TASK_TOOL_NAME, + { + 'description': 'Deep analysis', + 'prompt': 'Analyze the codebase architecture', + 'subagent_type': 'general', + }, + ), + ] + ) + actions = response_to_actions(response) + assert len(actions) == 2 + assert isinstance(actions[0], OpenCodeReadAction) + assert isinstance(actions[1], AgentDelegateAction) + assert actions[1].agent == 'OpenCodeGeneralSubAgent' + + def test_subagent_step_enriches_finish_action(self, create_llm_registry): + """Test that calling step() on a subagent with a pending AgentFinishAction + enriches the outputs correctly.""" + llm_config = LLMConfig(model='gpt-4o', api_key='test_key') + config = AgentConfig() + agent = OpenCodeExploreSubAgent( + config=config, llm_registry=create_llm_registry(llm_config) + ) + + # Mock the LLM to return a finish tool call + mock_llm = Mock() + mock_llm.config = Mock() + mock_llm.config.model = 'gpt-4o' + mock_llm.config.max_message_chars = 10000 + mock_llm.vision_is_active = Mock(return_value=False) + mock_llm.is_caching_prompt_active = Mock(return_value=False) + mock_llm.completion = Mock( + return_value=create_mock_response('finish', {'message': 'Done exploring.'}) + ) + agent.llm = mock_llm + + # Create a state with history containing a read action + state = Mock(spec=State) + state.history = [ + _make_action_with_metadata( + OpenCodeReadAction, + 'read', + tool_call_id='call_r1', + path='/src/app.py', + offset=0, + limit=2000, + ), + ] + state.inputs = {'description': 'Explore app', 'task': 'Read app.py'} + state.session_id = 'sess-step-test' + state.extra_data = {} + + # Simulate get_last_user_message + user_msg = Mock() + user_msg.content = 'TASK: Read app.py' + state.get_last_user_message = Mock(return_value=user_msg) + + # Mock condenser to return the history as-is + from openhands.memory.condenser.condenser import View + + agent.condenser = Mock() + agent.condenser.condensed_history = Mock( + return_value=View(events=state.history) + ) + + # Mock _get_initial_user_message + initial_msg = MessageAction(content='TASK: Read app.py') + initial_msg._source = EventSource.USER + state.history.insert(0, initial_msg) + + # Mock conversation_memory + agent.conversation_memory = Mock() + agent.conversation_memory.process_events = Mock(return_value=[]) + agent.conversation_memory.apply_prompt_caching = Mock() + + # Step should call LLM and get finish action, which gets enriched + action = agent.step(state) + + assert isinstance(action, AgentFinishAction) + assert 'content' in action.outputs + assert 'metadata' in action.outputs + assert 'output' in action.outputs + assert '' in action.outputs['output'] + assert 'session_id: sess-step-test' in action.outputs['output'] + # The summary should include the read action from history + summary = action.outputs['metadata']['summary'] + assert len(summary) == 1 + assert summary[0]['tool'] == 'read' + assert summary[0]['state']['title'] == 'Read /src/app.py' + + +# ============================================================================== +# Parallel Subagent Runner Tests +# ============================================================================== + + +class TestParallelSubagentRunner: + """Tests for the ParallelSubagentRunner.""" + + def _make_delegate_action( + self, agent_name: str, task: str, tool_call_id: str + ) -> AgentDelegateAction: + """Create an AgentDelegateAction with tool_call_metadata.""" + action = AgentDelegateAction( + agent=agent_name, + inputs={'task': task, 'description': f'Test: {task}'}, + ) + action.tool_call_metadata = ToolCallMetadata( + tool_call_id=tool_call_id, + function_name='task', + model_response=ModelResponse( + id='mock-resp', + choices=[ + { + 'message': { + 'tool_calls': [ + { + 'function': { + 'name': 'task', + 'arguments': json.dumps( + { + 'prompt': task, + 'subagent_type': 'general', + } + ), + }, + 'id': tool_call_id, + 'type': 'function', + } + ], + 'content': None, + 'role': 'assistant', + }, + 'index': 0, + 'finish_reason': 'tool_calls', + } + ], + ), + total_calls_in_response=2, + ) + return action + + def test_runner_requires_runtime_url(self): + """Runner raises when OPENHANDS_RUNTIME_URL is not set.""" + from openhands.agenthub.opencode_agent.subagents.parallel_runner import ( + ParallelSubagentRunner, + ) + + runner = ParallelSubagentRunner( + llm_registry=Mock(), + parent_agent_config=Mock(), + agent_configs={}, + runtime_url=None, + ) + # Clear env in case it's set + with patch.dict('os.environ', {}, clear=True): + runner.runtime_url = None + with pytest.raises(RuntimeError, match='OPENHANDS_RUNTIME_URL'): + runner.run_parallel([self._make_delegate_action('a', 'b', 'c1')]) + + def test_runner_creates_subagent_state(self): + """Verify _create_subagent_state builds proper state for a subagent.""" + from openhands.agenthub.opencode_agent.subagents.parallel_runner import ( + ParallelSubagentRunner, + ) + + runner = ParallelSubagentRunner( + llm_registry=Mock(), + parent_agent_config=Mock(), + agent_configs={}, + runtime_url='http://test:8000', + ) + + mock_agent = Mock() + mock_agent.get_system_message.return_value = None + + state = runner._create_subagent_state( + mock_agent, {'task': 'Read the file', 'description': 'test'} + ) + + assert state.inputs == {'task': 'Read the file', 'description': 'test'} + assert len(state.history) == 1 + task_msg = state.history[0] + assert isinstance(task_msg, MessageAction) + assert 'TASK: Read the file' in task_msg.content + + def test_runner_creates_state_with_system_message(self): + """State includes system message when agent provides one.""" + from openhands.agenthub.opencode_agent.subagents.parallel_runner import ( + ParallelSubagentRunner, + ) + from openhands.events.action.message import SystemMessageAction + + runner = ParallelSubagentRunner( + llm_registry=Mock(), + parent_agent_config=Mock(), + agent_configs={}, + runtime_url='http://test:8000', + ) + + sys_msg = SystemMessageAction(content='You are a helpful agent.') + mock_agent = Mock() + mock_agent.get_system_message.return_value = sys_msg + + state = runner._create_subagent_state(mock_agent, {'task': 'Do X'}) + + assert len(state.history) == 2 + assert isinstance(state.history[0], SystemMessageAction) + assert isinstance(state.history[1], MessageAction) + + @patch('httpx.Client') + def test_execute_action_success(self, mock_client_cls): + """_execute_action makes HTTP POST and returns observation.""" + from openhands.agenthub.opencode_agent.subagents.parallel_runner import ( + ParallelSubagentRunner, + ) + + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'observation': 'read', + 'content': 'file contents here', + 'extras': {}, + } + mock_response.raise_for_status = Mock() + + mock_client = Mock() + mock_client.post.return_value = mock_response + mock_client.__enter__ = Mock(return_value=mock_client) + mock_client.__exit__ = Mock(return_value=False) + mock_client_cls.return_value = mock_client + + runner = ParallelSubagentRunner( + llm_registry=Mock(), + parent_agent_config=Mock(), + agent_configs={}, + runtime_url='http://localhost:9999', + ) + + action = OpenCodeReadAction(path='/test.py') + obs = runner._execute_action(action) + + mock_client.post.assert_called_once() + call_args = mock_client.post.call_args + assert '/execute_action' in call_args[0][0] + + @patch('httpx.Client') + def test_execute_action_timeout(self, mock_client_cls): + """_execute_action handles timeout gracefully.""" + from openhands.agenthub.opencode_agent.subagents.parallel_runner import ( + ParallelSubagentRunner, + ) + + mock_client = Mock() + mock_client.post.side_effect = Exception('Timeout') + mock_client.__enter__ = Mock(return_value=mock_client) + mock_client.__exit__ = Mock(return_value=False) + mock_client_cls.return_value = mock_client + + runner = ParallelSubagentRunner( + llm_registry=Mock(), + parent_agent_config=Mock(), + agent_configs={}, + runtime_url='http://localhost:9999', + ) + + action = OpenCodeReadAction(path='/test.py') + obs = runner._execute_action(action) + + from openhands.events.observation.error import ErrorObservation + + assert isinstance(obs, ErrorObservation) + assert 'failed' in obs.content.lower() or 'Timeout' in obs.content + + @patch( + 'openhands.agenthub.opencode_agent.subagents.parallel_runner.ParallelSubagentRunner._run_subagent_loop' + ) + def test_run_parallel_concurrent_execution(self, mock_loop): + """run_parallel executes multiple subagents and collects results.""" + from openhands.agenthub.opencode_agent.subagents.parallel_runner import ( + ParallelSubagentRunner, + ) + + mock_loop.side_effect = [ + {'content': 'Result A', 'outputs': {'content': 'Result A'}}, + {'content': 'Result B', 'outputs': {'content': 'Result B'}}, + ] + + runner = ParallelSubagentRunner( + llm_registry=Mock(), + parent_agent_config=Mock(), + agent_configs={}, + runtime_url='http://localhost:9999', + ) + + actions = [ + self._make_delegate_action( + 'OpenCodeGeneralSubAgent', 'Task A', 'call_001' + ), + self._make_delegate_action( + 'OpenCodeExploreSubAgent', 'Task B', 'call_002' + ), + ] + + results = runner.run_parallel(actions) + + assert len(results) == 2 + assert 'call_001' in results + assert 'call_002' in results + result_contents = {results['call_001']['content'], results['call_002']['content']} + assert result_contents == {'Result A', 'Result B'} + + @patch( + 'openhands.agenthub.opencode_agent.subagents.parallel_runner.ParallelSubagentRunner._run_subagent_loop' + ) + def test_run_parallel_handles_errors(self, mock_loop): + """run_parallel handles individual subagent failures gracefully.""" + from openhands.agenthub.opencode_agent.subagents.parallel_runner import ( + ParallelSubagentRunner, + ) + + mock_loop.side_effect = [ + {'content': 'Success', 'outputs': {}}, + RuntimeError('LLM unavailable'), + ] + + runner = ParallelSubagentRunner( + llm_registry=Mock(), + parent_agent_config=Mock(), + agent_configs={}, + runtime_url='http://localhost:9999', + ) + + actions = [ + self._make_delegate_action('AgentA', 'Task 1', 'call_x'), + self._make_delegate_action('AgentB', 'Task 2', 'call_y'), + ] + + results = runner.run_parallel(actions) + + assert len(results) == 2 + assert results['call_x']['content'] == 'Success' + assert 'error' in results['call_y']['content'].lower() + + +class TestOpenCodeAgentParallelDetection: + """Tests that OpenCodeAgent.step() correctly detects and dispatches parallel task calls.""" + + def _build_agent(self): + """Create a mock OpenCodeAgent for testing.""" + from openhands.agenthub.opencode_agent.opencode_agent import OpenCodeAgent + + mock_llm_config = Mock() + mock_llm_config.model = 'test-model' + mock_llm_config.max_message_chars = 10000 + mock_llm_config.custom_llm_provider = None + + mock_llm = Mock() + mock_llm.config = mock_llm_config + mock_llm.vision_is_active.return_value = False + mock_llm.is_caching_prompt_active.return_value = False + + mock_registry = Mock() + mock_registry.get.return_value = mock_llm + mock_registry.get_router.return_value = mock_llm + + agent_config = AgentConfig() + agent_config.enable_cmd = False + agent_config.enable_finish = True + + with patch.object( + OpenCodeAgent, '__init__', lambda self, *a, **k: None + ): + agent = OpenCodeAgent.__new__(OpenCodeAgent) + agent.config = agent_config + agent.llm_registry = mock_registry + agent.llm = mock_llm + agent.pending_actions = deque() + agent.tools = [] + agent.mcp_tools = {} + agent.condenser = Mock() + agent.conversation_memory = Mock() + agent.conversation_memory.process_events = Mock(return_value=[]) + agent.conversation_memory.apply_prompt_caching = Mock() + agent._prompt_manager = Mock() + return agent + + def _make_multi_task_response(self, num_tasks: int) -> ModelResponse: + """Create a response with multiple task tool calls.""" + calls = [] + for i in range(num_tasks): + calls.append( + { + 'function': { + 'name': 'task', + 'arguments': json.dumps( + { + 'prompt': f'Do task {i}', + 'subagent_type': 'general', + } + ), + }, + 'id': f'call_{i}', + 'type': 'function', + } + ) + return ModelResponse( + id='multi-task-resp', + choices=[ + { + 'message': { + 'tool_calls': calls, + 'content': None, + 'role': 'assistant', + }, + 'index': 0, + 'finish_reason': 'tool_calls', + } + ], + ) + + def test_parallel_detection_with_multiple_task_calls(self): + """When ALL actions are AgentDelegateAction and count ≥ 2, parallel path triggers.""" + agent = self._build_agent() + + state = State(session_id='test-parallel') + initial_msg = MessageAction(content='Do multiple things') + initial_msg._source = EventSource.USER + state.history = [initial_msg] + state.get_last_user_message = Mock(return_value=initial_msg) + + from openhands.memory.condenser.condenser import View + + agent.condenser.condensed_history = Mock( + return_value=View(events=state.history) + ) + + multi_resp = self._make_multi_task_response(3) + finish_resp = ModelResponse( + id='finish-resp', + choices=[ + { + 'message': { + 'tool_calls': [ + { + 'function': { + 'name': 'finish', + 'arguments': json.dumps( + {'message': 'All done'} + ), + }, + 'id': 'call_finish', + 'type': 'function', + } + ], + 'content': None, + 'role': 'assistant', + }, + 'index': 0, + 'finish_reason': 'tool_calls', + } + ], + ) + + agent.llm.completion = Mock(side_effect=[multi_resp, finish_resp]) + + with patch.dict('os.environ', {'OPENHANDS_RUNTIME_URL': 'http://test:8000'}): + with patch( + 'openhands.agenthub.opencode_agent.subagents.parallel_runner.ParallelSubagentRunner.run_parallel' + ) as mock_runner: + mock_runner.return_value = { + 'call_0': {'content': 'Result 0', 'outputs': {'content': 'Result 0'}}, + 'call_1': {'content': 'Result 1', 'outputs': {'content': 'Result 1'}}, + 'call_2': {'content': 'Result 2', 'outputs': {'content': 'Result 2'}}, + } + + action = agent.step(state) + + mock_runner.assert_called_once() + assert agent.llm.completion.call_count == 2 + + def test_sequential_fallback_for_single_task(self): + """A single task tool call does NOT trigger parallel execution.""" + agent = self._build_agent() + + state = State(session_id='test-seq') + initial_msg = MessageAction(content='Do one thing') + initial_msg._source = EventSource.USER + state.history = [initial_msg] + state.get_last_user_message = Mock(return_value=initial_msg) + + from openhands.memory.condenser.condenser import View + + agent.condenser.condensed_history = Mock( + return_value=View(events=state.history) + ) + + single_resp = self._make_multi_task_response(1) + agent.llm.completion = Mock(return_value=single_resp) + + with patch.dict('os.environ', {'OPENHANDS_RUNTIME_URL': 'http://test:8000'}): + with patch( + 'openhands.agenthub.opencode_agent.subagents.parallel_runner.ParallelSubagentRunner.run_parallel' + ) as mock_runner: + action = agent.step(state) + mock_runner.assert_not_called() + assert isinstance(action, AgentDelegateAction) + + def test_sequential_fallback_for_mixed_calls(self): + """Mixed task + non-task calls fall back to sequential.""" + agent = self._build_agent() + + state = State(session_id='test-mixed') + initial_msg = MessageAction(content='Do things') + initial_msg._source = EventSource.USER + state.history = [initial_msg] + state.get_last_user_message = Mock(return_value=initial_msg) + + from openhands.memory.condenser.condenser import View + + agent.condenser.condensed_history = Mock( + return_value=View(events=state.history) + ) + + mixed_resp = create_mock_response_multi_tool( + [ + ('task', {'prompt': 'Explore codebase', 'subagent_type': 'explore'}), + ('read', {'file_path': '/test.py'}), + ] + ) + agent.llm.completion = Mock(return_value=mixed_resp) + + with patch.dict('os.environ', {'OPENHANDS_RUNTIME_URL': 'http://test:8000'}): + with patch( + 'openhands.agenthub.opencode_agent.subagents.parallel_runner.ParallelSubagentRunner.run_parallel' + ) as mock_runner: + action = agent.step(state) + mock_runner.assert_not_called() + + def test_sequential_fallback_without_runtime_url(self): + """Without OPENHANDS_RUNTIME_URL, falls back to sequential even with multiple tasks.""" + agent = self._build_agent() + + state = State(session_id='test-no-url') + initial_msg = MessageAction(content='Do things') + initial_msg._source = EventSource.USER + state.history = [initial_msg] + state.get_last_user_message = Mock(return_value=initial_msg) + + from openhands.memory.condenser.condenser import View + + agent.condenser.condensed_history = Mock( + return_value=View(events=state.history) + ) + + multi_resp = self._make_multi_task_response(2) + agent.llm.completion = Mock(return_value=multi_resp) + + with patch.dict('os.environ', {}, clear=True): + with patch( + 'openhands.agenthub.opencode_agent.subagents.parallel_runner.ParallelSubagentRunner.run_parallel' + ) as mock_runner: + action = agent.step(state) + mock_runner.assert_not_called() + assert isinstance(action, AgentDelegateAction) + + def test_parallel_injects_synthetic_events(self): + """After parallel execution, synthetic events are injected into state.history.""" + agent = self._build_agent() + + state = State(session_id='test-inject') + initial_msg = MessageAction(content='Do two things') + initial_msg._source = EventSource.USER + state.history = [initial_msg] + state.get_last_user_message = Mock(return_value=initial_msg) + + from openhands.memory.condenser.condenser import View + + agent.condenser.condensed_history = Mock( + return_value=View(events=state.history) + ) + + multi_resp = self._make_multi_task_response(2) + finish_resp = ModelResponse( + id='finish-resp-2', + choices=[ + { + 'message': { + 'tool_calls': [ + { + 'function': { + 'name': 'finish', + 'arguments': json.dumps( + {'message': 'Done'} + ), + }, + 'id': 'call_f', + 'type': 'function', + } + ], + 'content': None, + 'role': 'assistant', + }, + 'index': 0, + 'finish_reason': 'tool_calls', + } + ], + ) + agent.llm.completion = Mock(side_effect=[multi_resp, finish_resp]) + + history_len_before = len(state.history) + + with patch.dict('os.environ', {'OPENHANDS_RUNTIME_URL': 'http://test:8000'}): + with patch( + 'openhands.agenthub.opencode_agent.subagents.parallel_runner.ParallelSubagentRunner.run_parallel' + ) as mock_runner: + mock_runner.return_value = { + 'call_0': {'content': 'R0', 'outputs': {'content': 'R0'}}, + 'call_1': {'content': 'R1', 'outputs': {'content': 'R1'}}, + } + action = agent.step(state) + + # 2 tasks -> 2 synthetic actions + 2 synthetic observations = 4 new events + assert len(state.history) == history_len_before + 4 + + synthetic_actions = [ + e + for e in state.history[history_len_before:] + if isinstance(e, AgentDelegateAction) + ] + synthetic_obs = [ + e + for e in state.history[history_len_before:] + if isinstance(e, AgentDelegateObservation) + ] + + assert len(synthetic_actions) == 2 + assert len(synthetic_obs) == 2 + + for obs in synthetic_obs: + assert obs.tool_call_metadata is not None