diff --git a/evaluation/benchmarks/swe_bench/run_infer.py b/evaluation/benchmarks/swe_bench/run_infer.py
index dfb71464159c..9b456f5730aa 100644
--- a/evaluation/benchmarks/swe_bench/run_infer.py
+++ b/evaluation/benchmarks/swe_bench/run_infer.py
@@ -32,6 +32,7 @@
codeact_user_response,
codex_user_response,
opencode_user_response,
+ terminus_2_user_response,
get_default_sandbox_config_for_eval,
get_metrics,
get_openhands_config_for_eval,
@@ -108,6 +109,7 @@ def set_dataset_type(dataset_name: str) -> str:
'CodeActAgent': codeact_user_response,
'OpenCodeAgent': opencode_user_response,
'CodexAgent': codex_user_response,
+ 'Terminus2Agent': terminus_2_user_response,
}
diff --git a/evaluation/utils/shared.py b/evaluation/utils/shared.py
index 9faebf20089c..7972cb6850bb 100644
--- a/evaluation/utils/shared.py
+++ b/evaluation/utils/shared.py
@@ -249,6 +249,54 @@ def codex_user_response(
return msg
+def terminus_2_user_response(
+ state: State,
+ encapsulate_solution: bool = False,
+ try_parse: Callable[[Action], str] | None = None,
+) -> str:
+ encaps_str = (
+ (
+ 'Your final answer MUST be encapsulated within and .\n'
+ 'For example: The answer to the question is 42 .\n'
+ )
+ if encapsulate_solution
+ else ''
+ )
+ msg = (
+ 'Please continue working on the task. '
+ 'Analyze the terminal output and issue the next batch of commands.\n'
+ 'When the task is fully complete, set "task_complete": true in your JSON response.\n'
+ f'{encaps_str}'
+ 'IMPORTANT: YOU SHOULD NEVER ASK FOR HUMAN HELP.\n'
+ )
+
+ if state.history:
+ if try_parse is not None:
+ last_action = next(
+ (
+ event
+ for event in reversed(state.history)
+ if isinstance(event, Action)
+ ),
+ None,
+ )
+ ans = try_parse(last_action)
+ if ans is not None:
+ return '/exit'
+
+ user_msgs = [
+ event
+ for event in state.history
+ if isinstance(event, MessageAction) and event.source == 'user'
+ ]
+ if len(user_msgs) >= 2:
+ return (
+ msg
+ + 'If you want to give up, set "task_complete": true in your JSON response.\n'
+ )
+ return msg
+
+
def cleanup():
print('Cleaning up child processes...')
for process in mp.active_children():
diff --git a/openhands/agenthub/__init__.py b/openhands/agenthub/__init__.py
index 73bb54f2e6f9..f652ee6d466e 100644
--- a/openhands/agenthub/__init__.py
+++ b/openhands/agenthub/__init__.py
@@ -11,6 +11,7 @@
loc_agent,
opencode_agent,
readonly_agent,
+ terminus_2_agent,
visualbrowsing_agent,
)
from openhands.controller.agent import Agent # noqa: E402
@@ -25,4 +26,5 @@
'loc_agent',
'opencode_agent',
'codex_agent',
+ 'terminus_2_agent',
]
diff --git a/openhands/agenthub/terminus_2_agent/README.md b/openhands/agenthub/terminus_2_agent/README.md
new file mode 100644
index 000000000000..db52b05ebf49
--- /dev/null
+++ b/openhands/agenthub/terminus_2_agent/README.md
@@ -0,0 +1,248 @@
+# Terminus-2 Agent
+
+The Terminus-2 Agent is a keystroke-based terminal agent ported from the [terminal-bench](../../temp/terminal-bench/) project. Unlike function-calling agents (CodeAct, OpenCode, Codex), it communicates with the LLM using structured JSON responses and interacts with the terminal by sending raw keystrokes and receiving screen capture output.
+
+## Overview
+
+Terminus-2 is designed around a fundamentally different interaction model than other OpenHands agents:
+
+- **No function calling** -- The LLM outputs a raw JSON object instead of tool calls. A dedicated parser extracts structured commands from the response.
+- **Keystroke-based terminal interaction** -- Instead of running shell commands and collecting stdout/stderr, the agent sends raw keystrokes (including tmux-style special keys like `C-c`, `C-d`) and receives the full terminal screen state back.
+- **Batch command execution** -- Each LLM response can contain multiple commands that are executed sequentially before the next LLM call.
+
+## Architecture
+
+```
+ +-----------+
+ | LLM |
+ +-----+-----+
+ |
+ JSON response (text)
+ |
+ +-----v-----+
+ | JSON |
+ | Parser |
+ +-----+-----+
+ |
+ List[ParsedCommand]
+ |
+ +-------------v--------------+
+ | Terminus2Agent |
+ | (step loop, confirmation) |
+ +-------------+--------------+
+ |
+ Terminus2CmdRunAction (per command)
+ |
+ +-------------v--------------+
+ | ActionExecutionServer |
+ | (keystroke execution) |
+ +-------------+--------------+
+ |
+ Terminus2CmdOutputObservation
+ |
+ (terminal screen state)
+```
+
+### Agent Step Cycle
+
+Each call to `step()`:
+
+1. If there are pending actions queued from a previous LLM call, return the next one.
+2. Otherwise, build a conversation message list from the event history.
+3. Call the LLM (with up to 3 retries on parse errors).
+4. Parse the JSON response to extract commands.
+5. Queue a `Terminus2CmdRunAction` for each command.
+6. Return the first action from the queue.
+
+### JSON Response Format
+
+The LLM is expected to respond with a JSON object:
+
+```json
+{
+ "analysis": "What I observe in the terminal output and what has been done so far.",
+ "plan": "My plan for the next steps and what each command will accomplish.",
+ "commands": [
+ {
+ "keystrokes": "ls -la\n",
+ "duration": 0.1
+ },
+ {
+ "keystrokes": "cd project\n",
+ "duration": 0.1
+ }
+ ],
+ "task_complete": false
+}
+```
+
+| Field | Required | Type | Description |
+|-------|----------|------|-------------|
+| `analysis` | Yes | string | Analysis of the current terminal state |
+| `plan` | Yes | string | Plan for the next steps |
+| `commands` | Yes | array | Array of command objects to execute |
+| `task_complete` | No | boolean | Whether the task is finished (default: `false`) |
+
+Each command object:
+
+| Field | Required | Type | Default | Description |
+|-------|----------|------|---------|-------------|
+| `keystrokes` | Yes | string | -- | Exact keystrokes to send to the terminal |
+| `duration` | No | float | 1.0 | Seconds to wait before capturing output (capped at 60) |
+
+### Keystrokes
+
+The `keystrokes` field is sent **verbatim** to the terminal:
+
+- Commands should end with `\n` to execute (e.g., `"ls -la\n"`)
+- Special key sequences use tmux-style escapes:
+ - `C-c` -- Ctrl+C (send SIGINT)
+ - `C-d` -- Ctrl+D (send EOF)
+- Empty keystrokes (`""`) with a duration can be used to poll/wait for output
+- Multiple commands in the same batch are executed sequentially
+
+### Duration Guidelines
+
+| Command type | Recommended duration |
+|-------------|---------------------|
+| Immediate (`cd`, `ls`, `echo`, `cat`) | 0.1s |
+| Standard (`gcc`, `find`, `rustc`) | 1.0s |
+| Slow (`make`, `pip install`, `wget`) | 5.0-30.0s |
+| Polling (wait for output) | 10.0s |
+
+It is better to set a shorter duration and poll again than to set a long one. The maximum allowed duration is 60 seconds.
+
+## Key Features
+
+### JSON Parser with Auto-Correction
+
+The `TerminusJSONPlainParser` handles common LLM formatting mistakes:
+
+- **Incomplete JSON** -- Adds missing closing braces when the response is truncated
+- **Mixed content** -- Extracts JSON from responses that contain extra text before/after
+- **Markdown code fences** -- Handles JSON wrapped in `` ```json ``` `` blocks
+- **Field validation** -- Checks required fields, types, and correct field order
+- **Warnings** -- Non-fatal issues (missing duration, unknown fields, wrong order) are reported as warnings rather than errors
+
+### Double Confirmation for Task Completion
+
+To prevent premature task completion:
+
+1. First `"task_complete": true` -- Triggers a confirmation prompt: *"Are you sure you want to mark the task as complete?"*
+2. Second consecutive `"task_complete": true` -- Actually completes the task via `AgentFinishAction`
+3. If the LLM does *not* set `task_complete` after a confirmation prompt, the pending completion is reset.
+
+### Output Truncation
+
+Terminal output is truncated to 10KB to prevent context window overflow. When truncation occurs, the first and last 5KB are preserved with a marker indicating how many bytes were omitted from the middle.
+
+### Timeout Handling
+
+When a command exceeds its duration, the agent sends a timeout message to the LLM explaining that the command may still be running and showing the current terminal state. The LLM can then decide to wait longer (empty keystrokes with a duration), cancel the command (`C-c`), or proceed.
+
+## File Structure
+
+```
+openhands/agenthub/terminus_2_agent/
+ __init__.py # Agent registration
+ terminus_2_agent.py # Main agent class
+ terminus_json_plain_parser.py # JSON response parser
+ README.md # This file
+ prompts/
+ system_prompt.j2 # System prompt with JSON format spec
+ system_prompt_long_horizon.j2 # Extended prompt for long tasks
+ additional_info.j2 # Repository/runtime info template
+ microagent_info.j2 # Microagent trigger info template
+ user_prompt.j2 # Initial user message template
+```
+
+### Supporting files in other directories
+
+```
+openhands/events/action/terminus_2.py # Terminus2CmdRunAction
+openhands/events/observation/terminus_2.py # Terminus2CmdOutputObservation
+openhands/core/schema/action.py # TERMINUS_2_CMD_RUN enum
+openhands/core/schema/observation.py # TERMINUS_2_CMD_OUTPUT enum
+tests/unit/agenthub/test_terminus_2_parser.py # Parser tests (36)
+tests/unit/agenthub/test_terminus_2_agent.py # Agent tests (22)
+tests/unit/agenthub/test_terminus_2_action_observation.py # Serialization tests (27)
+```
+
+## Usage
+
+### Quick Start
+
+To use the Terminus-2 agent in code:
+
+```python
+from openhands.core.config import AgentConfig
+from openhands.llm.llm_registry import LLMRegistry
+
+config = AgentConfig(agent_name='Terminus2Agent')
+llm_registry = LLMRegistry()
+agent = Terminus2Agent(config, llm_registry)
+```
+
+Or via configuration:
+
+```yaml
+agent:
+ name: Terminus2Agent
+```
+
+### Evaluation and Benchmarking
+
+To run SWE-bench evaluations with Terminus2Agent:
+
+```bash
+poetry run python evaluation/benchmarks/swe_bench/run_infer.py \
+ --agent-cls Terminus2Agent \
+ --llm-config your_model_config \
+ --max-iterations 50 \
+ --dataset princeton-nlp/SWE-bench_Lite \
+ --split test
+```
+
+### Custom Prompts
+
+Override the system prompt via config:
+
+```yaml
+agent:
+ name: Terminus2Agent
+ system_prompt_path: /path/to/custom/system_prompt.j2
+```
+
+Or override the entire prompt directory:
+
+```yaml
+agent:
+ name: Terminus2Agent
+ custom_prompt_dir: /path/to/custom/prompts/
+```
+
+## Comparison to Other Agents
+
+| Feature | CodeActAgent | OpenCodeAgent | Terminus2Agent |
+|---------|--------------|---------------|----------------|
+| LLM interface | Function calling | Function calling | Raw JSON parsing |
+| Terminal interaction | Command + stdout | Command + stdout | Keystrokes + screen capture |
+| Batch commands | Single per turn | Single per turn | Multiple per turn |
+| Special keys (Ctrl+C) | Via bash | Via bash | Native (`C-c`) |
+| File operations | Tools | Tools | Via terminal commands |
+| Task completion | `finish` tool | `finish` tool | `task_complete` field + double confirmation |
+| Parse error recovery | N/A (function calling) | N/A (function calling) | Auto-fix + retry (up to 3 attempts) |
+| Output format | Structured (exit code, stdout, stderr) | Structured | Full terminal screen state |
+
+### When to Use Terminus-2
+
+Terminus-2 is best suited for:
+
+- **Terminal-centric tasks** where seeing the full screen state matters (interactive programs, TUI applications, vim, etc.)
+- **Models without function calling support** that can reliably produce JSON
+- **Benchmarks** that measure terminal interaction fidelity (e.g., terminal-bench)
+- **Tasks requiring special key sequences** (Ctrl+C to cancel, Ctrl+D for EOF, interactive prompts)
+
+### Provenance
+
+This agent was ported from the standalone Terminus-2 implementation in `terminal-bench`. The original agent used tmux sessions for terminal interaction; this OpenHands port adapts the same logic to work with OpenHands' `BashSession` runtime while preserving the JSON-based LLM interaction model, the parser with auto-correction, and the double-confirmation completion flow.
diff --git a/openhands/agenthub/terminus_2_agent/__init__.py b/openhands/agenthub/terminus_2_agent/__init__.py
new file mode 100644
index 000000000000..5c625358c53b
--- /dev/null
+++ b/openhands/agenthub/terminus_2_agent/__init__.py
@@ -0,0 +1,4 @@
+from openhands.agenthub.terminus_2_agent.terminus_2_agent import Terminus2Agent
+from openhands.controller.agent import Agent
+
+Agent.register('Terminus2Agent', Terminus2Agent)
diff --git a/openhands/agenthub/terminus_2_agent/prompts/additional_info.j2 b/openhands/agenthub/terminus_2_agent/prompts/additional_info.j2
new file mode 100644
index 000000000000..d80a36338ca4
--- /dev/null
+++ b/openhands/agenthub/terminus_2_agent/prompts/additional_info.j2
@@ -0,0 +1,52 @@
+{% if repository_info %}
+
+At the user's request, repository {{ repository_info.repo_name }} has been cloned to {{ repository_info.repo_directory }} in the current working directory.
+{% if repository_info.branch_name %}The repository has been checked out to branch "{{ repository_info.branch_name }}".
+
+IMPORTANT: You should work within the current branch "{{ repository_info.branch_name }}" unless:
+ 1. the user explicitly instructs otherwise
+ 2. the current branch is "main", "master", or another default branch where direct pushes may be unsafe
+{% endif %}
+
+{% endif %}
+{% if repository_instructions -%}
+
+{{ repository_instructions }}
+
+{% endif %}
+{% if runtime_info -%}
+
+{% if runtime_info.working_dir %}
+The current working directory is {{ runtime_info.working_dir }}
+{% endif %}
+{% if runtime_info.available_hosts %}
+The user has access to the following hosts for accessing a web application,
+each of which has a corresponding port:
+{% for host, port in runtime_info.available_hosts.items() -%}
+* {{ host }} (port {{ port }})
+{% endfor %}
+When starting a web server, use the corresponding ports. You should also
+set any options to allow iframes and CORS requests, and allow the server to
+be accessed from any host (e.g. 0.0.0.0).
+{% endif %}
+{% if runtime_info.additional_agent_instructions %}
+{{ runtime_info.additional_agent_instructions }}
+{% endif %}
+{% if runtime_info.custom_secrets_descriptions %}
+
+You have access to the following environment variables
+{% for secret_name, secret_description in runtime_info.custom_secrets_descriptions.items() %}
+* **${{ secret_name }}**: {{ secret_description }}
+{% endfor %}
+
+{% endif %}
+{% if runtime_info.date %}
+Today's date is {{ runtime_info.date }} (UTC).
+{% endif %}
+
+{% if conversation_instructions and conversation_instructions.content -%}
+
+{{ conversation_instructions.content }}
+
+{% endif %}
+{% endif %}
diff --git a/openhands/agenthub/terminus_2_agent/prompts/microagent_info.j2 b/openhands/agenthub/terminus_2_agent/prompts/microagent_info.j2
new file mode 100644
index 000000000000..264828fbe206
--- /dev/null
+++ b/openhands/agenthub/terminus_2_agent/prompts/microagent_info.j2
@@ -0,0 +1,8 @@
+{% for agent_info in triggered_agents %}
+
+The following information has been included based on a keyword match for "{{ agent_info.trigger }}".
+It may or may not be relevant to the user's request.
+
+{{ agent_info.content }}
+
+{% endfor %}
diff --git a/openhands/agenthub/terminus_2_agent/prompts/system_prompt.j2 b/openhands/agenthub/terminus_2_agent/prompts/system_prompt.j2
new file mode 100644
index 000000000000..8743f5078e25
--- /dev/null
+++ b/openhands/agenthub/terminus_2_agent/prompts/system_prompt.j2
@@ -0,0 +1,51 @@
+You are an AI assistant tasked with solving command-line tasks in a Linux environment. You will be given a task description and the output from previously executed commands. Your goal is to solve the task by providing batches of shell commands.
+
+Format your response as JSON with the following structure:
+
+{
+ "analysis": "Analyze the current state based on the terminal output provided. What do you see? What has been accomplished? What still needs to be done?",
+ "plan": "Describe your plan for the next steps. What commands will you run and why? Be specific about what you expect each command to accomplish.",
+ "commands": [
+ {
+ "keystrokes": "ls -la\n",
+ "duration": 0.1
+ },
+ {
+ "keystrokes": "cd project\n",
+ "duration": 0.1
+ }
+ ],
+ "task_complete": true
+}
+
+Required fields:
+- "analysis": Your analysis of the current situation
+- "plan": Your plan for the next steps
+- "commands": Array of command objects to execute
+
+Optional fields:
+- "task_complete": Boolean indicating if the task is complete (defaults to false if not present)
+
+Command object structure:
+- "keystrokes": String containing the exact keystrokes to send to the terminal (required)
+- "duration": Number of seconds to wait for the command to complete before the next command will be executed (defaults to 1.0 if not present)
+
+IMPORTANT: The text inside "keystrokes" will be used completely verbatim as keystrokes. Write commands exactly as you want them sent to the terminal:
+- Most bash commands should end with a newline (\n) to cause them to execute
+- For special key sequences, use tmux-style escape sequences:
+ - C-c for Ctrl+C
+ - C-d for Ctrl+D
+
+The "duration" attribute specifies the number of seconds to wait for the command to complete (default: 1.0) before the next command will be executed. On immediate tasks (e.g., cd, ls, echo, cat) set a duration of 0.1 seconds. On commands (e.g., gcc, find, rustc) set a duration of 1.0 seconds. On slow commands (e.g., make, python3 [long running script], wget [file]) set an appropriate duration as you determine necessary.
+
+It is better to set a smaller duration than a longer duration. It is always possible to wait again if the prior output has not finished, by running {"keystrokes": "", "duration": 10.0} on subsequent requests to wait longer. Never wait longer than 60 seconds; prefer to poll to see intermediate result status.
+
+Important notes:
+- Each command's keystrokes are sent exactly as written to the terminal
+- Do not include extra whitespace before or after the keystrokes unless it's part of the intended command
+- Extra text before or after the JSON will generate warnings but be tolerated
+- The JSON must be valid - use proper escaping for quotes and special characters within strings
+- Commands array can be empty if you want to wait without taking action
+
+{% include 'additional_info.j2' %}
+{% include 'microagent_info.j2' %}
diff --git a/openhands/agenthub/terminus_2_agent/prompts/system_prompt_long_horizon.j2 b/openhands/agenthub/terminus_2_agent/prompts/system_prompt_long_horizon.j2
new file mode 100644
index 000000000000..4c61f6d104dd
--- /dev/null
+++ b/openhands/agenthub/terminus_2_agent/prompts/system_prompt_long_horizon.j2
@@ -0,0 +1,12 @@
+{% include 'system_prompt.j2' %}
+
+
+This task may require many steps to complete. Keep these guidelines in mind:
+
+1. **Be methodical**: Break the problem into smaller sub-tasks and solve them one at a time.
+2. **Verify each step**: After running a command, check the output to confirm it succeeded before moving on.
+3. **Keep track of progress**: In your analysis, maintain a clear picture of what has been done and what remains.
+4. **Handle errors gracefully**: If a command fails, analyze why and adjust your approach.
+5. **Avoid redundant work**: If you've already verified something works, don't re-verify unnecessarily.
+6. **Use efficient commands**: Prefer commands that give you the most information in one step.
+
diff --git a/openhands/agenthub/terminus_2_agent/prompts/user_prompt.j2 b/openhands/agenthub/terminus_2_agent/prompts/user_prompt.j2
new file mode 100644
index 000000000000..8b137891791f
--- /dev/null
+++ b/openhands/agenthub/terminus_2_agent/prompts/user_prompt.j2
@@ -0,0 +1 @@
+
diff --git a/openhands/agenthub/terminus_2_agent/terminus_2_agent.py b/openhands/agenthub/terminus_2_agent/terminus_2_agent.py
new file mode 100644
index 000000000000..eef91a744a62
--- /dev/null
+++ b/openhands/agenthub/terminus_2_agent/terminus_2_agent.py
@@ -0,0 +1,437 @@
+"""Terminus-2 Agent for OpenHands.
+
+A keystroke-based terminal agent that sends raw keystrokes to a terminal session
+and receives screen capture output. Uses JSON-formatted LLM responses with
+analysis, plan, and commands fields.
+"""
+
+import os
+from collections import deque
+from typing import TYPE_CHECKING
+
+from openhands.llm.llm_registry import LLMRegistry
+
+if TYPE_CHECKING:
+ from openhands.events.action import Action
+
+from openhands.agenthub.terminus_2_agent.terminus_json_plain_parser import (
+ ParsedCommand,
+ TerminusJSONPlainParser,
+)
+from openhands.controller.agent import Agent
+from openhands.controller.state.state import State
+from openhands.core.config import AgentConfig
+from openhands.core.logger import openhands_logger as logger
+from openhands.core.message import Message, TextContent
+from openhands.events.action import AgentFinishAction, AgentThinkAction, MessageAction
+from openhands.events.action.terminus_2 import Terminus2CmdRunAction
+from openhands.events.event import Event, EventSource
+from openhands.events.observation.error import ErrorObservation
+from openhands.events.observation.terminus_2 import Terminus2CmdOutputObservation
+from openhands.memory.condenser import Condenser
+from openhands.memory.condenser.condenser import Condensation, View
+from openhands.runtime.plugins import PluginRequirement
+from openhands.utils.prompt import PromptManager
+
+
+MAX_OUTPUT_BYTES = 10000
+MAX_LLM_RETRY = 3
+
+TIMEOUT_TEMPLATE = (
+ 'Previous command:\n{command}\n\n'
+ 'The previous command timed out after {timeout_sec} seconds\n\n'
+ 'It is possible that the command is not yet finished executing. '
+ 'If that is the case, then do nothing. It is also possible that you '
+ 'have entered an interactive shell and should continue sending '
+ 'keystrokes as normal.\n\n'
+ 'Here is the current state of the terminal:\n\n{terminal_state}'
+)
+
+COMPLETION_CONFIRMATION = (
+ 'Current terminal state:\n{terminal_state}\n\n'
+ 'Are you sure you want to mark the task as complete? '
+ 'This will trigger your solution to be graded and you won\'t be able to '
+ 'make any further corrections. If so, include "task_complete": true '
+ 'in your JSON response again.'
+)
+
+
+class Terminus2Agent(Agent):
+ VERSION = '1.0'
+ """
+ The Terminus-2 Agent sends raw keystrokes to a terminal session and receives
+ screen capture output, using JSON-formatted LLM responses.
+
+ Unlike function-calling agents (CodeAct, OpenCode), Terminus-2 parses the
+ LLM's raw text response as JSON with fields: analysis, plan, commands, and
+ optionally task_complete.
+
+ Key features:
+ - Keystroke-based terminal interaction (tmux-style)
+ - JSON response parsing with auto-correction
+ - Double confirmation for task completion
+ - Output truncation to 10KB
+ - Duration-based command timeouts
+ """
+
+ sandbox_plugins: list[PluginRequirement] = []
+
+ def __init__(self, config: AgentConfig, llm_registry: LLMRegistry) -> None:
+ super().__init__(config, llm_registry)
+ self.pending_actions: deque['Action'] = deque()
+ self.parser = TerminusJSONPlainParser()
+ self._pending_completion = False
+ self._conversation_messages: list[dict[str, str]] = []
+ self._needs_llm_call = True
+
+ self.condenser = Condenser.from_config(self.config.condenser, llm_registry)
+ self.llm = self.llm_registry.get_router(self.config)
+
+ @property
+ def prompt_manager(self) -> PromptManager:
+ if self._prompt_manager is None:
+ prompt_dir = (
+ self.config.custom_prompt_dir
+ if self.config.custom_prompt_dir
+ else os.path.join(os.path.dirname(__file__), 'prompts')
+ )
+
+ template_overrides = {}
+ if self.config.system_prompt_path:
+ template_overrides['system_prompt.j2'] = self.config.system_prompt_path
+ if self.config.system_prompt_long_horizon_path:
+ template_overrides['system_prompt_long_horizon.j2'] = (
+ self.config.system_prompt_long_horizon_path
+ )
+
+ self._prompt_manager = PromptManager(
+ prompt_dir=prompt_dir,
+ system_prompt_filename=self.config.resolved_system_prompt_filename,
+ template_overrides=template_overrides if template_overrides else None,
+ )
+
+ return self._prompt_manager
+
+ def reset(self) -> None:
+ super().reset()
+ self.pending_actions.clear()
+ self._pending_completion = False
+ self._conversation_messages = []
+ self._needs_llm_call = True
+
+ def _has_terminal_observation(self, events: list[Event]) -> bool:
+ """Check if any Terminus2CmdOutputObservation exists in the event history."""
+ return any(isinstance(e, Terminus2CmdOutputObservation) for e in events)
+
+ def step(self, state: State) -> 'Action':
+ """Performs one step of the Terminus-2 agent.
+
+ On the very first step (before any terminal observations exist), sends a
+ no-op action to capture the initial terminal screen. This mirrors the
+ original Terminus-2 behavior of capturing tmux state before the first
+ LLM call.
+
+ Returns pending actions from the queue, or calls the LLM to get
+ new commands when the queue is empty.
+ """
+ if self.pending_actions:
+ return self.pending_actions.popleft()
+
+ latest_user_message = state.get_last_user_message()
+ if latest_user_message and latest_user_message.content.strip() == '/exit':
+ return AgentFinishAction()
+
+ condensed_history: list[Event] = []
+ match self.condenser.condensed_history(state):
+ case View(events=events):
+ condensed_history = events
+ case Condensation(action=condensation_action):
+ return condensation_action
+
+ if not self._has_terminal_observation(condensed_history):
+ return Terminus2CmdRunAction(keystrokes='', duration=0.5)
+
+ messages = self._build_messages(condensed_history, state)
+
+ commands, is_task_complete, response_text = self._call_llm_and_parse(messages)
+
+ if is_task_complete:
+ if self._pending_completion:
+ return AgentFinishAction(thought='Task completed (confirmed)')
+ else:
+ self._pending_completion = True
+ else:
+ self._pending_completion = False
+
+ for i, cmd in enumerate(commands):
+ action = Terminus2CmdRunAction(
+ keystrokes=cmd.keystrokes,
+ duration=min(cmd.duration, 60),
+ thought=response_text if i == 0 else '',
+ )
+ self.pending_actions.append(action)
+
+ if not self.pending_actions:
+ if self._pending_completion:
+ return Terminus2CmdRunAction(
+ keystrokes='', duration=0.5, thought=response_text
+ )
+ return AgentThinkAction(thought='No commands to execute, waiting for next input')
+
+ return self.pending_actions.popleft()
+
+ def _build_messages(
+ self, condensed_history: list[Event], state: State
+ ) -> list[Message]:
+ """Build the conversation messages from event history.
+
+ Converts the event stream into a user/assistant message sequence:
+ - System message: JSON format instructions from the prompt template
+ - First user message: the instruction content (from get_instruction /
+ INSTRUCTION_TEMPLATE_PATH) used directly, with initial terminal state
+ appended if available. This keeps the agent consistent with CodeAct/
+ Codex/OpenCode which pass the MessageAction.content through unchanged,
+ so that INSTRUCTION_TEMPLATE_PATH overrides work without double-wrapping.
+ - Subsequent turns: assistant = LLM JSON response, user = terminal output
+ """
+ messages: list[Message] = []
+
+ system_prompt = self.prompt_manager.get_system_message()
+ messages.append(Message(role='system', content=[TextContent(text=system_prompt)]))
+
+ initial_user_msg = self._find_initial_user_message(condensed_history)
+ initial_terminal_event = self._find_initial_terminal_event(condensed_history)
+
+ if initial_user_msg:
+ if initial_terminal_event is not None:
+ terminal_text = initial_terminal_event.terminal_state
+ first_user_text = f'{initial_user_msg}\n\n{terminal_text}'
+ else:
+ first_user_text = initial_user_msg
+ messages.append(
+ Message(role='user', content=[TextContent(text=first_user_text)])
+ )
+
+ batch_observations: list[str] = []
+ last_timed_out = False
+ last_keystrokes = ''
+
+ for event in condensed_history:
+ if isinstance(event, MessageAction):
+ if event.source == EventSource.USER:
+ continue
+ elif event.source == EventSource.AGENT:
+ if batch_observations:
+ terminal_output = self._combine_observations(
+ batch_observations
+ )
+ user_text = self._format_terminal_output(
+ terminal_output, last_timed_out, last_keystrokes
+ )
+ messages.append(
+ Message(role='user', content=[TextContent(text=user_text)])
+ )
+ batch_observations = []
+ last_timed_out = False
+
+ messages.append(
+ Message(
+ role='assistant',
+ content=[TextContent(text=event.content)],
+ )
+ )
+
+ elif isinstance(event, Terminus2CmdRunAction):
+ if event.thought:
+ if batch_observations:
+ terminal_output = self._combine_observations(
+ batch_observations
+ )
+ user_text = self._format_terminal_output(
+ terminal_output, last_timed_out, last_keystrokes
+ )
+ messages.append(
+ Message(role='user', content=[TextContent(text=user_text)])
+ )
+ batch_observations = []
+ last_timed_out = False
+
+ messages.append(
+ Message(
+ role='assistant',
+ content=[TextContent(text=event.thought)],
+ )
+ )
+ last_keystrokes = event.keystrokes
+
+ elif isinstance(event, Terminus2CmdOutputObservation):
+ if event is initial_terminal_event:
+ continue
+ batch_observations.append(event.terminal_state)
+ last_timed_out = event.timed_out
+
+ elif isinstance(event, ErrorObservation):
+ batch_observations.append(f'ERROR: {event.content}')
+
+ elif isinstance(event, AgentThinkAction):
+ pass
+
+ if batch_observations:
+ terminal_output = self._combine_observations(batch_observations)
+ if self._pending_completion:
+ confirmation = COMPLETION_CONFIRMATION.format(
+ terminal_state=terminal_output
+ )
+ messages.append(
+ Message(role='user', content=[TextContent(text=confirmation)])
+ )
+ else:
+ user_text = self._format_terminal_output(
+ terminal_output, last_timed_out, last_keystrokes
+ )
+ messages.append(
+ Message(role='user', content=[TextContent(text=user_text)])
+ )
+ elif self._pending_completion:
+ confirmation = COMPLETION_CONFIRMATION.format(terminal_state='')
+ messages.append(
+ Message(role='user', content=[TextContent(text=confirmation)])
+ )
+
+ return messages
+
+ def _find_initial_user_message(self, events: list[Event]) -> str | None:
+ """Find the initial user message (task instruction) from the event history."""
+ for event in events:
+ if isinstance(event, MessageAction) and event.source == EventSource.USER:
+ return event.content
+ return None
+
+ def _find_initial_terminal_event(
+ self, events: list[Event]
+ ) -> Terminus2CmdOutputObservation | None:
+ """Find the first Terminus2CmdOutputObservation in the event history.
+
+ Returns the event object itself (not just the string) so that
+ _build_messages can skip it in the loop via identity comparison,
+ avoiding duplication with the first user message.
+ """
+ for event in events:
+ if isinstance(event, Terminus2CmdOutputObservation):
+ return event
+ return None
+
+ _NEW_OUTPUT_PREFIX = 'New Terminal Output:\n'
+ _SCREEN_PREFIX = 'Current Terminal Screen:\n'
+
+ @staticmethod
+ def _combine_observations(observations: list[str]) -> str:
+ """Combine multiple terminal observations into a single output.
+
+ In the original Terminus-2, all commands in a batch execute in tmux and
+ then a single get_incremental_output() captures the cumulative output.
+ This method replicates that by stripping the per-observation prefix,
+ joining the raw screen content, and re-adding a single prefix.
+ """
+ if not observations:
+ return ''
+ if len(observations) == 1:
+ return observations[0]
+
+ new_pfx = Terminus2Agent._NEW_OUTPUT_PREFIX
+ scr_pfx = Terminus2Agent._SCREEN_PREFIX
+
+ screens: list[str] = []
+ last_prefix = new_pfx
+ for obs in observations:
+ if obs.startswith(new_pfx):
+ screens.append(obs[len(new_pfx):])
+ last_prefix = new_pfx
+ elif obs.startswith(scr_pfx):
+ screens.append(obs[len(scr_pfx):])
+ last_prefix = scr_pfx
+ else:
+ screens.append(obs)
+
+ return last_prefix + '\n'.join(screens)
+
+ def _format_terminal_output(
+ self, terminal_output: str, timed_out: bool, keystrokes: str
+ ) -> str:
+ """Format terminal output for the next user message."""
+ if timed_out:
+ return TIMEOUT_TEMPLATE.format(
+ command=keystrokes,
+ timeout_sec=60,
+ terminal_state=self._limit_output_length(terminal_output),
+ )
+ return self._limit_output_length(terminal_output)
+
+ def _call_llm_and_parse(
+ self, messages: list[Message]
+ ) -> tuple[list[ParsedCommand], bool, str]:
+ """Call the LLM and parse the JSON response, with retry on parse errors.
+
+ Returns (commands, is_task_complete, response_text) where response_text
+ is the raw LLM output that must be stored on the first action's thought
+ field so _build_messages can reconstruct the assistant turn later.
+ """
+ for attempt in range(MAX_LLM_RETRY):
+ params: dict = {
+ 'messages': messages,
+ }
+ response = self.llm.completion(**params)
+
+ response_text = response.choices[0].message.content or ''
+ logger.debug(f'Terminus-2 LLM response (attempt {attempt + 1}): {response_text[:200]}...')
+
+ messages.append(
+ Message(role='assistant', content=[TextContent(text=response_text)])
+ )
+
+ result = self.parser.parse_response(response_text)
+
+ if result.error:
+ feedback = f'Previous response had parsing errors:\nERROR: {result.error}'
+ if result.warning:
+ feedback += f'\nWARNINGS: {result.warning}'
+ feedback += '\n\nPlease fix these issues and provide a proper JSON response.'
+ logger.warning(f'Terminus-2 parse error (attempt {attempt + 1}): {result.error}')
+
+ messages.append(
+ Message(role='user', content=[TextContent(text=feedback)])
+ )
+ continue
+
+ if result.warning:
+ logger.info(f'Terminus-2 parse warnings: {result.warning}')
+
+ commands = [
+ ParsedCommand(keystrokes=cmd.keystrokes, duration=min(cmd.duration, 60))
+ for cmd in result.commands
+ ]
+ return commands, result.is_task_complete, response_text
+
+ logger.error('Terminus-2: exhausted LLM retries due to parse errors')
+ return [], False, ''
+
+ @staticmethod
+ def _limit_output_length(output: str, max_bytes: int = MAX_OUTPUT_BYTES) -> str:
+ """Limit output to specified byte length, keeping first and last portions."""
+ if len(output.encode('utf-8')) <= max_bytes:
+ return output
+
+ portion_size = max_bytes // 2
+ output_bytes = output.encode('utf-8')
+ first_portion = output_bytes[:portion_size].decode('utf-8', errors='ignore')
+ last_portion = output_bytes[-portion_size:].decode('utf-8', errors='ignore')
+ omitted_bytes = (
+ len(output_bytes)
+ - len(first_portion.encode('utf-8'))
+ - len(last_portion.encode('utf-8'))
+ )
+
+ return (
+ f'{first_portion}\n[... output limited to {max_bytes} bytes; '
+ f'{omitted_bytes} interior bytes omitted ...]\n{last_portion}'
+ )
diff --git a/openhands/agenthub/terminus_2_agent/terminus_json_plain_parser.py b/openhands/agenthub/terminus_2_agent/terminus_json_plain_parser.py
new file mode 100644
index 000000000000..354d205ae8bb
--- /dev/null
+++ b/openhands/agenthub/terminus_2_agent/terminus_json_plain_parser.py
@@ -0,0 +1,329 @@
+"""Parser for Terminus-2 JSON plain response format.
+
+Ported from terminal-bench's terminus_json_plain_parser.py for use within
+the OpenHands Terminus-2 agent.
+"""
+
+import json
+import re
+from dataclasses import dataclass
+from typing import List
+
+
+@dataclass
+class ParsedCommand:
+ keystrokes: str
+ duration: float
+
+
+@dataclass
+class ParseResult:
+ commands: List[ParsedCommand]
+ is_task_complete: bool
+ error: str
+ warning: str
+
+
+class TerminusJSONPlainParser:
+ """Parser for terminus JSON plain response format."""
+
+ def __init__(self):
+ self.required_fields = ['analysis', 'plan', 'commands']
+
+ def parse_response(self, response: str) -> ParseResult:
+ """Parse a terminus JSON plain response and extract commands.
+
+ Args:
+ response: The full LLM response string
+
+ Returns:
+ ParseResult with commands, completion status, errors and warnings
+ """
+
+ result = self._try_parse_response(response)
+
+ if result.error:
+ for fix_name, fix_function in self._get_auto_fixes():
+ corrected_response, was_fixed = fix_function(response, result.error)
+ if was_fixed:
+ corrected_result = self._try_parse_response(corrected_response)
+
+ if corrected_result.error == '':
+ auto_warning = (
+ f'AUTO-CORRECTED: {fix_name} - '
+ 'please fix this in future responses'
+ )
+ corrected_result.warning = self._combine_warnings(
+ auto_warning, corrected_result.warning
+ )
+ return corrected_result
+
+ return result
+
+ def _try_parse_response(self, response: str) -> ParseResult:
+ """Try to parse a terminus JSON plain response."""
+ warnings: List[str] = []
+
+ json_content, extra_text_warnings = self._extract_json_content(response)
+ warnings.extend(extra_text_warnings)
+
+ if not json_content:
+ return ParseResult(
+ [],
+ False,
+ 'No valid JSON found in response',
+ '- ' + '\n- '.join(warnings) if warnings else '',
+ )
+
+ try:
+ parsed_data = json.loads(json_content)
+ except json.JSONDecodeError as e:
+ error_msg = f'Invalid JSON: {str(e)}'
+ if len(json_content) < 200:
+ error_msg += f' | Content: {repr(json_content)}'
+ else:
+ error_msg += f' | Content preview: {repr(json_content[:100])}...'
+ return ParseResult(
+ [], False, error_msg, '- ' + '\n- '.join(warnings) if warnings else ''
+ )
+
+ validation_error = self._validate_json_structure(
+ parsed_data, json_content, warnings
+ )
+ if validation_error:
+ return ParseResult(
+ [],
+ False,
+ validation_error,
+ '- ' + '\n- '.join(warnings) if warnings else '',
+ )
+
+ is_complete = parsed_data.get('task_complete', False)
+ if isinstance(is_complete, str):
+ is_complete = is_complete.lower() in ('true', '1', 'yes')
+
+ commands_data = parsed_data.get('commands', [])
+ commands, parse_error = self._parse_commands(commands_data, warnings)
+ if parse_error:
+ if is_complete:
+ warnings.append(parse_error)
+ return ParseResult(
+ [], True, '', '- ' + '\n- '.join(warnings) if warnings else ''
+ )
+ return ParseResult(
+ [], False, parse_error, '- ' + '\n- '.join(warnings) if warnings else ''
+ )
+
+ return ParseResult(
+ commands, is_complete, '', '- ' + '\n- '.join(warnings) if warnings else ''
+ )
+
+ def _extract_json_content(self, response: str) -> tuple[str, List[str]]:
+ """Extract JSON content from response, handling extra text."""
+ warnings: List[str] = []
+
+ json_start = -1
+ json_end = -1
+ brace_count = 0
+ in_string = False
+ escape_next = False
+
+ for i, char in enumerate(response):
+ if escape_next:
+ escape_next = False
+ continue
+
+ if char == '\\':
+ escape_next = True
+ continue
+
+ if char == '"' and not escape_next:
+ in_string = not in_string
+ continue
+
+ if not in_string:
+ if char == '{':
+ if brace_count == 0:
+ json_start = i
+ brace_count += 1
+ elif char == '}':
+ brace_count -= 1
+ if brace_count == 0 and json_start != -1:
+ json_end = i + 1
+ break
+
+ if json_start == -1 or json_end == -1:
+ return '', ['No valid JSON object found']
+
+ before_text = response[:json_start].strip()
+ after_text = response[json_end:].strip()
+
+ if before_text:
+ warnings.append('Extra text detected before JSON object')
+ if after_text:
+ warnings.append('Extra text detected after JSON object')
+
+ return response[json_start:json_end], warnings
+
+ def _validate_json_structure(
+ self, data: dict, json_content: str, warnings: List[str]
+ ) -> str:
+ """Validate the JSON structure has required fields."""
+ if not isinstance(data, dict):
+ return 'Response must be a JSON object'
+
+ missing_fields = []
+ for field in self.required_fields:
+ if field not in data:
+ missing_fields.append(field)
+
+ if missing_fields:
+ return f"Missing required fields: {', '.join(missing_fields)}"
+
+ if not isinstance(data.get('analysis', ''), str):
+ warnings.append("Field 'analysis' should be a string")
+
+ if not isinstance(data.get('plan', ''), str):
+ warnings.append("Field 'plan' should be a string")
+
+ commands = data.get('commands', [])
+ if not isinstance(commands, list):
+ return "Field 'commands' must be an array"
+
+ self._check_field_order(data, json_content, warnings)
+
+ task_complete = data.get('task_complete')
+ if task_complete is not None and not isinstance(task_complete, (bool, str)):
+ warnings.append("Field 'task_complete' should be a boolean or string")
+
+ return ''
+
+ def _parse_commands(
+ self, commands_data: List[dict], warnings: List[str]
+ ) -> tuple[List[ParsedCommand], str]:
+ """Parse commands array into ParsedCommand objects."""
+ commands: List[ParsedCommand] = []
+
+ for i, cmd_data in enumerate(commands_data):
+ if not isinstance(cmd_data, dict):
+ return [], f'Command {i + 1} must be an object'
+
+ if 'keystrokes' not in cmd_data:
+ return [], f"Command {i + 1} missing required 'keystrokes' field"
+
+ keystrokes = cmd_data['keystrokes']
+ if not isinstance(keystrokes, str):
+ return [], f"Command {i + 1} 'keystrokes' must be a string"
+
+ if 'duration' in cmd_data:
+ duration = cmd_data['duration']
+ if not isinstance(duration, (int, float)):
+ warnings.append(
+ f'Command {i + 1}: Invalid duration value, using default 1.0'
+ )
+ duration = 1.0
+ else:
+ warnings.append(
+ f'Command {i + 1}: Missing duration field, using default 1.0'
+ )
+ duration = 1.0
+
+ known_fields = {'keystrokes', 'duration'}
+ unknown_fields = set(cmd_data.keys()) - known_fields
+ if unknown_fields:
+ warnings.append(
+ f"Command {i + 1}: Unknown fields: {', '.join(unknown_fields)}"
+ )
+
+ if i < len(commands_data) - 1 and not keystrokes.endswith('\n'):
+ warnings.append(
+ f'Command {i + 1} should end with newline when followed '
+ 'by another command. Otherwise the two commands will be '
+ 'concatenated together on the same line.'
+ )
+
+ commands.append(
+ ParsedCommand(keystrokes=keystrokes, duration=float(duration))
+ )
+
+ return commands, ''
+
+ def _get_auto_fixes(self):
+ """Return list of auto-fix functions to try in order."""
+ return [
+ (
+ 'Fixed incomplete JSON by adding missing closing brace',
+ self._fix_incomplete_json,
+ ),
+ ('Extracted JSON from mixed content', self._fix_mixed_content),
+ ]
+
+ def _fix_incomplete_json(self, response: str, error: str) -> tuple[str, bool]:
+ """Fix incomplete JSON by adding missing closing braces."""
+ if (
+ 'Invalid JSON' in error
+ or 'Expecting' in error
+ or 'Unterminated' in error
+ or 'No valid JSON found' in error
+ ):
+ brace_count = response.count('{') - response.count('}')
+ if brace_count > 0:
+ fixed = response + '}' * brace_count
+ return fixed, True
+ return response, False
+
+ def _fix_mixed_content(self, response: str, error: str) -> tuple[str, bool]:
+ """Extract JSON from response with mixed content."""
+ json_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}'
+ matches = re.findall(json_pattern, response, re.DOTALL)
+
+ for match in matches:
+ try:
+ json.loads(match)
+ return match, True
+ except json.JSONDecodeError:
+ continue
+
+ return response, False
+
+ def _combine_warnings(self, auto_warning: str, existing_warning: str) -> str:
+ """Combine auto-correction warning with existing warnings."""
+ if existing_warning:
+ return f'- {auto_warning}\n{existing_warning}'
+ else:
+ return f'- {auto_warning}'
+
+ def _check_field_order(
+ self, data: dict, response: str, warnings: List[str]
+ ) -> None:
+ """Check if fields appear in the correct order: analysis, plan, commands."""
+ expected_order = ['analysis', 'plan', 'commands']
+
+ positions = {}
+ for field in expected_order:
+ pattern = f'"({field})"\\s*:'
+ match = re.search(pattern, response)
+ if match:
+ positions[field] = match.start()
+
+ if len(positions) < 2:
+ return
+
+ present_fields = []
+ for field in expected_order:
+ if field in positions:
+ present_fields.append((field, positions[field]))
+
+ actual_order = [
+ field for field, pos in sorted(present_fields, key=lambda x: x[1])
+ ]
+
+ expected_present = [f for f in expected_order if f in positions]
+
+ if actual_order != expected_present:
+ actual_str = ' → '.join(actual_order)
+ expected_str = ' → '.join(expected_present)
+ warnings.append(
+ f'Fields appear in wrong order. Found: {actual_str}, '
+ f'expected: {expected_str}'
+ )
diff --git a/openhands/core/config/agent_config.py b/openhands/core/config/agent_config.py
index 713e1ad842e6..e87349a80161 100644
--- a/openhands/core/config/agent_config.py
+++ b/openhands/core/config/agent_config.py
@@ -19,7 +19,7 @@ class AgentConfig(BaseModel):
"""The name of the llm config to use. If specified, this will override global llm config."""
classpath: str | None = Field(default=None)
"""The classpath of the agent to use. To be used for custom agents that are not defined in the openhands.agenthub package."""
- system_prompt_filename: str = Field(default='system_prompt.j2')
+ system_prompt_filename: str = Field(default="system_prompt.j2")
"""Filename of the system prompt template file within the agent's prompt directory. Defaults to 'system_prompt.j2'."""
custom_prompt_dir: str | None = Field(default=None)
"""Optional custom directory containing prompt templates. If set, overrides the agent's default prompt directory."""
@@ -71,7 +71,7 @@ class AgentConfig(BaseModel):
runtime: str | None = Field(default=None)
"""Runtime type (e.g., 'docker', 'local', 'cli') used for runtime-specific tool behavior."""
- model_config = ConfigDict(extra='forbid')
+ model_config = ConfigDict(extra="forbid")
@property
def resolved_system_prompt_filename(self) -> str:
@@ -80,8 +80,8 @@ def resolved_system_prompt_filename(self) -> str:
When enable_plan_mode is True, automatically uses the long horizon system prompt
unless a custom system_prompt_filename was explicitly set (not the default).
"""
- if self.enable_plan_mode and self.system_prompt_filename == 'system_prompt.j2':
- return 'system_prompt_long_horizon.j2'
+ if self.enable_plan_mode and self.system_prompt_filename == "system_prompt.j2":
+ return "system_prompt_long_horizon.j2"
return self.system_prompt_filename
@classmethod
@@ -119,29 +119,29 @@ def from_toml_section(cls, data: dict) -> dict[str, AgentConfig]:
# Try to create the base config
try:
base_config = cls.model_validate(base_data)
- agent_mapping['agent'] = base_config
+ agent_mapping["agent"] = base_config
except ValidationError as e:
- logger.warning(f'Invalid base agent configuration: {e}. Using defaults.')
+ logger.warning(f"Invalid base agent configuration: {e}. Using defaults.")
# If base config fails, create a default one
base_config = cls()
# Still add it to the mapping
- agent_mapping['agent'] = base_config
+ agent_mapping["agent"] = base_config
# Process each custom section independently
for name, overrides in custom_sections.items():
try:
# Merge base config with overrides
merged = {**base_config.model_dump(), **overrides}
- if merged.get('classpath'):
+ if merged.get("classpath"):
# if an explicit classpath is given, try to load it and look up its config model class
from openhands.controller.agent import Agent
try:
- agent_cls = get_impl(Agent, merged.get('classpath'))
+ agent_cls = get_impl(Agent, merged.get("classpath"))
custom_config = agent_cls.config_model.model_validate(merged)
except Exception as e:
logger.warning(
- f'Failed to load custom agent class [{merged.get("classpath")}]: {e}. Using default config model.'
+ f"Failed to load custom agent class [{merged.get('classpath')}]: {e}. Using default config model."
)
custom_config = cls.model_validate(merged)
else:
@@ -156,7 +156,7 @@ def from_toml_section(cls, data: dict) -> dict[str, AgentConfig]:
agent_mapping[name] = custom_config
except ValidationError as e:
logger.warning(
- f'Invalid agent configuration for [{name}]: {e}. This section will be skipped.'
+ f"Invalid agent configuration for [{name}]: {e}. This section will be skipped."
)
# Skip this custom section but continue with others
continue
diff --git a/openhands/core/schema/action.py b/openhands/core/schema/action.py
index 331bd7e47398..9379f22c9fec 100644
--- a/openhands/core/schema/action.py
+++ b/openhands/core/schema/action.py
@@ -147,3 +147,7 @@ class ActionType(str, Enum):
CODEX_UPDATE_PLAN = 'codex_update_plan'
"""Updates the task plan with steps and statuses."""
+
+ # Terminus-2-style actions
+ TERMINUS_2_CMD_RUN = 'terminus_2_cmd_run'
+ """Sends raw keystrokes to a terminal session and captures the resulting screen state."""
diff --git a/openhands/core/schema/observation.py b/openhands/core/schema/observation.py
index 51626358a045..1dc4cfd8c641 100644
--- a/openhands/core/schema/observation.py
+++ b/openhands/core/schema/observation.py
@@ -99,3 +99,7 @@ class ObservationType(str, Enum):
CODEX_UPDATE_PLAN = 'codex_update_plan'
"""Result of updating the task plan."""
+
+ # Terminus-2-style observations
+ TERMINUS_2_CMD_OUTPUT = 'terminus_2_cmd_output'
+ """Result of sending keystrokes to a terminal session, containing captured screen state."""
diff --git a/openhands/events/action/__init__.py b/openhands/events/action/__init__.py
index 2936a9315a2c..799fd2000f25 100644
--- a/openhands/events/action/__init__.py
+++ b/openhands/events/action/__init__.py
@@ -42,6 +42,7 @@
CodexReadFileAction,
CodexUpdatePlanAction,
)
+from openhands.events.action.terminus_2 import Terminus2CmdRunAction
__all__ = [
'Action',
@@ -83,4 +84,6 @@
'CodexGrepFilesAction',
'CodexApplyPatchAction',
'CodexUpdatePlanAction',
+ # Terminus-2-style actions
+ 'Terminus2CmdRunAction',
]
diff --git a/openhands/events/action/terminus_2.py b/openhands/events/action/terminus_2.py
new file mode 100644
index 000000000000..1973527b3885
--- /dev/null
+++ b/openhands/events/action/terminus_2.py
@@ -0,0 +1,41 @@
+"""Terminus-2 action classes for keystroke-based terminal interaction.
+
+Terminus-2 sends raw keystrokes to a terminal session (tmux-style) and captures
+the resulting screen state, rather than running commands and collecting stdout.
+"""
+
+from dataclasses import dataclass
+from typing import ClassVar
+
+from openhands.core.schema import ActionType
+from openhands.events.action.action import Action, ActionSecurityRisk
+
+
+@dataclass
+class Terminus2CmdRunAction(Action):
+ """Sends raw keystrokes to a terminal session.
+
+ Keystrokes are sent verbatim to the terminal. Commands should end with
+ '\\n' to execute. Special key sequences use tmux-style escapes:
+ - C-c for Ctrl+C
+ - C-d for Ctrl+D
+
+ Attributes:
+ keystrokes: The exact keystrokes to send to the terminal.
+ duration: Seconds to wait for the command to complete before
+ capturing output (default 1.0). Cap at 60s.
+ """
+
+ keystrokes: str
+ duration: float = 1.0
+ thought: str = ''
+ action: str = ActionType.TERMINUS_2_CMD_RUN
+ runnable: ClassVar[bool] = True
+ security_risk: ActionSecurityRisk = ActionSecurityRisk.UNKNOWN
+
+ @property
+ def message(self) -> str:
+ ks = self.keystrokes.replace('\n', '\\n')
+ if len(ks) > 60:
+ ks = ks[:57] + '...'
+ return f'Sending keystrokes: {ks} (wait {self.duration}s)'
diff --git a/openhands/events/observation/__init__.py b/openhands/events/observation/__init__.py
index c5ba04b1dbf0..7b2bc0220dff 100644
--- a/openhands/events/observation/__init__.py
+++ b/openhands/events/observation/__init__.py
@@ -36,6 +36,7 @@
CodexApplyPatchObservation,
CodexUpdatePlanObservation,
)
+from openhands.events.observation.terminus_2 import Terminus2CmdOutputObservation
from openhands.events.observation.reject import UserRejectObservation
from openhands.events.observation.success import SuccessObservation
from openhands.events.observation.task_tracking import TaskTrackingObservation
@@ -71,4 +72,6 @@
# Codex-style observations
'CodexApplyPatchObservation',
'CodexUpdatePlanObservation',
+ # Terminus-2-style observations
+ 'Terminus2CmdOutputObservation',
]
diff --git a/openhands/events/observation/terminus_2.py b/openhands/events/observation/terminus_2.py
new file mode 100644
index 000000000000..243d724e0724
--- /dev/null
+++ b/openhands/events/observation/terminus_2.py
@@ -0,0 +1,33 @@
+"""Terminus-2 observation classes for terminal screen capture output."""
+
+from dataclasses import dataclass
+
+from openhands.core.schema import ObservationType
+from openhands.events.observation.observation import Observation
+
+
+@dataclass
+class Terminus2CmdOutputObservation(Observation):
+ """Observation containing the terminal screen state after keystroke execution.
+
+ Unlike CmdOutputObservation which carries stdout/stderr, this observation
+ carries the full terminal screen capture, preserving tmux-style semantics.
+
+ Attributes:
+ terminal_state: The captured terminal screen content after execution.
+ timed_out: Whether the command timed out before completing.
+ command_keystrokes: The keystrokes that were sent (for reference).
+ """
+
+ terminal_state: str = ''
+ timed_out: bool = False
+ command_keystrokes: str = ''
+ observation: str = ObservationType.TERMINUS_2_CMD_OUTPUT
+
+ @property
+ def message(self) -> str:
+ ks = self.command_keystrokes.replace('\n', '\\n')
+ if len(ks) > 60:
+ ks = ks[:57] + '...'
+ suffix = ' (timed out)' if self.timed_out else ''
+ return f'Terminal output after: {ks}{suffix}'
diff --git a/openhands/events/serialization/action.py b/openhands/events/serialization/action.py
index b0df3d7601ce..b5a8c4f22566 100644
--- a/openhands/events/serialization/action.py
+++ b/openhands/events/serialization/action.py
@@ -45,6 +45,7 @@
CodexReadFileAction,
CodexUpdatePlanAction,
)
+from openhands.events.action.terminus_2 import Terminus2CmdRunAction
actions = (
NullAction,
@@ -84,6 +85,8 @@
CodexGrepFilesAction,
CodexApplyPatchAction,
CodexUpdatePlanAction,
+ # Terminus-2-style actions
+ Terminus2CmdRunAction,
)
ACTION_TYPE_TO_CLASS = {action_class.action: action_class for action_class in actions} # type: ignore[attr-defined]
diff --git a/openhands/events/serialization/observation.py b/openhands/events/serialization/observation.py
index f97bde682c14..62d354d2895c 100644
--- a/openhands/events/serialization/observation.py
+++ b/openhands/events/serialization/observation.py
@@ -39,6 +39,7 @@
CodexApplyPatchObservation,
CodexUpdatePlanObservation,
)
+from openhands.events.observation.terminus_2 import Terminus2CmdOutputObservation
from openhands.events.observation.reject import UserRejectObservation
from openhands.events.observation.success import SuccessObservation
from openhands.events.observation.task_tracking import TaskTrackingObservation
@@ -70,6 +71,8 @@
# Codex-style observations
CodexApplyPatchObservation,
CodexUpdatePlanObservation,
+ # Terminus-2-style observations
+ Terminus2CmdOutputObservation,
)
OBSERVATION_TYPE_TO_CLASS = {
diff --git a/openhands/memory/conversation_memory.py b/openhands/memory/conversation_memory.py
index 6031840cee01..85f69ae58b12 100644
--- a/openhands/memory/conversation_memory.py
+++ b/openhands/memory/conversation_memory.py
@@ -39,6 +39,7 @@
CodexReadFileAction,
CodexUpdatePlanAction,
)
+from openhands.events.action.terminus_2 import Terminus2CmdRunAction
from openhands.events.action.message import SystemMessageAction
from openhands.events.event import Event, RecallType
from openhands.events.observation import (
@@ -64,6 +65,7 @@
CodexApplyPatchObservation,
CodexUpdatePlanObservation,
)
+from openhands.events.observation.terminus_2 import Terminus2CmdOutputObservation
from openhands.events.observation.agent import (
MicroagentKnowledge,
RecallObservation,
@@ -275,6 +277,8 @@ def _process_action(
CodexGrepFilesAction,
CodexApplyPatchAction,
CodexUpdatePlanAction,
+ # Terminus-2-style actions
+ Terminus2CmdRunAction,
),
) or (isinstance(action, CmdRunAction) and action.source == 'agent'):
tool_metadata = action.tool_call_metadata
@@ -621,6 +625,11 @@ def _process_observation(
elif isinstance(obs, CodexUpdatePlanObservation):
text = truncate_content(obs.content, max_message_chars)
message = Message(role='user', content=[TextContent(text=text)])
+ elif isinstance(obs, Terminus2CmdOutputObservation):
+ text = truncate_content(
+ obs.terminal_state or obs.content, max_message_chars
+ )
+ message = Message(role='user', content=[TextContent(text=text)])
elif isinstance(obs, LoopDetectionObservation):
# LoopRecovery should not be observed by llm, handled internally.
return []
diff --git a/openhands/runtime/action_execution_server.py b/openhands/runtime/action_execution_server.py
index b3e42b7f11ae..97eb2eb9f458 100644
--- a/openhands/runtime/action_execution_server.py
+++ b/openhands/runtime/action_execution_server.py
@@ -70,6 +70,7 @@
CodexReadFileAction,
CodexUpdatePlanAction,
)
+from openhands.events.action.terminus_2 import Terminus2CmdRunAction
from openhands.events.event import FileEditSource, FileReadSource
from openhands.events.observation import (
CmdOutputObservation,
@@ -91,6 +92,7 @@
CodexApplyPatchObservation,
CodexUpdatePlanObservation,
)
+from openhands.events.observation.terminus_2 import Terminus2CmdOutputObservation
from openhands.events.serialization import event_from_dict, event_to_dict
from openhands.runtime.browser import browse
from openhands.runtime.browser.browser_env import BrowserEnv
@@ -2219,6 +2221,107 @@ async def codex_update_plan(self, action: CodexUpdatePlanAction) -> Observation:
logger.exception(f'Error updating plan: {e}')
return ErrorObservation(f'Failed to update plan: {str(e)}')
+ def _format_terminal_screen(
+ self, obs: CmdOutputObservation, command: str, pre_cwd: str | None = None
+ ) -> str:
+ """Format a CmdOutputObservation to look like a tmux capture-pane screen.
+
+ The pre-command prompt uses pre_cwd (the directory before execution),
+ and the post-command prompt uses the actual post-execution working_dir
+ from metadata. This matches real terminal behavior where e.g.
+ ``cd /app/src`` shows the old cwd before the command and the new cwd after.
+
+ Produces output like:
+ root@hostname:/app# cd /app/src
+ root@hostname:/app/src#
+ """
+ meta = obs.metadata
+ username = meta.username or 'root'
+ hostname = meta.hostname or 'sandbox'
+ post_cwd = meta.working_dir or '/'
+ suffix = '#' if username == 'root' else '$'
+
+ before_cwd = pre_cwd if pre_cwd else post_cwd
+ pre_prompt = f'{username}@{hostname}:{before_cwd}{suffix} '
+ post_prompt = f'{username}@{hostname}:{post_cwd}{suffix} '
+
+ lines = [f'{pre_prompt}{command}']
+ if obs.content.strip():
+ lines.append(obs.content)
+ lines.append(post_prompt)
+ return '\n'.join(lines)
+
+ async def terminus_2_cmd_run(
+ self, action: Terminus2CmdRunAction
+ ) -> Terminus2CmdOutputObservation | ErrorObservation:
+ """Execute Terminus-2 keystroke action via BashSession.
+
+ Converts keystrokes to a command, executes via the bash session,
+ and returns terminal output formatted like the original Terminus-2
+ tmux capture with appropriate prefix:
+ - "Current Terminal Screen:" for initial captures (empty keystrokes)
+ and timed-out commands
+ - "New Terminal Output:" for normal command output
+ """
+ try:
+ bash_session = self.bash_session
+ assert bash_session is not None
+
+ keystrokes = action.keystrokes
+ duration = min(action.duration, 60)
+ pre_cwd = bash_session.cwd
+
+ if keystrokes == '' or keystrokes.strip() == '':
+ cmd_action = CmdRunAction(command='pwd')
+ cmd_action.set_hard_timeout(duration + 5, blocking=False)
+ obs = await call_sync_from_async(bash_session.execute, cmd_action)
+ screen = self._format_terminal_screen(obs, 'pwd', pre_cwd)
+ terminal_state = f'Current Terminal Screen:\n{screen}'
+ return Terminus2CmdOutputObservation(
+ content=terminal_state,
+ terminal_state=terminal_state,
+ timed_out=False,
+ command_keystrokes=keystrokes,
+ )
+
+ if keystrokes.strip() in ('C-c', 'C-d'):
+ special_key = keystrokes.strip()
+ cmd_action = CmdRunAction(command=special_key)
+ cmd_action.set_hard_timeout(duration + 5, blocking=False)
+ obs = await call_sync_from_async(bash_session.execute, cmd_action)
+ screen = self._format_terminal_screen(obs, f'^{"C" if special_key == "C-c" else "D"}', pre_cwd)
+ terminal_state = f'New Terminal Output:\n{screen}'
+ return Terminus2CmdOutputObservation(
+ content=terminal_state,
+ terminal_state=terminal_state,
+ timed_out=False,
+ command_keystrokes=keystrokes,
+ )
+
+ command = keystrokes.rstrip('\n')
+ cmd_action = CmdRunAction(command=command)
+ cmd_action.set_hard_timeout(duration + 10, blocking=False)
+ obs = await call_sync_from_async(bash_session.execute, cmd_action)
+
+ timed_out = False
+ if hasattr(obs, 'metadata') and obs.metadata:
+ timed_out = getattr(obs.metadata, 'exit_code', 0) == -1
+
+ screen = self._format_terminal_screen(obs, command, pre_cwd)
+ if timed_out:
+ terminal_state = f'Current Terminal Screen:\n{screen}'
+ else:
+ terminal_state = f'New Terminal Output:\n{screen}'
+ return Terminus2CmdOutputObservation(
+ content=terminal_state,
+ terminal_state=terminal_state,
+ timed_out=timed_out,
+ command_keystrokes=keystrokes,
+ )
+ except Exception as e:
+ logger.exception(f'Error executing Terminus-2 keystrokes: {e}')
+ return ErrorObservation(str(e))
+
async def browse(self, action: BrowseURLAction) -> Observation:
if self.browser is None:
return ErrorObservation(
diff --git a/openhands/runtime/impl/action_execution/action_execution_client.py b/openhands/runtime/impl/action_execution/action_execution_client.py
index 4b7936289ba7..ecde9ac404a0 100644
--- a/openhands/runtime/impl/action_execution/action_execution_client.py
+++ b/openhands/runtime/impl/action_execution/action_execution_client.py
@@ -52,6 +52,7 @@
from openhands.events.action.action import Action
from openhands.events.action.files import FileEditSource
from openhands.events.action.mcp import MCPAction
+from openhands.events.action.terminus_2 import Terminus2CmdRunAction
from openhands.events.observation import (
AgentThinkObservation,
ErrorObservation,
@@ -438,6 +439,9 @@ def codex_apply_patch(self, action: CodexApplyPatchAction) -> Observation:
def codex_update_plan(self, action: CodexUpdatePlanAction) -> Observation:
return self.send_action_for_execution(action)
+ def terminus_2_cmd_run(self, action: Terminus2CmdRunAction) -> Observation:
+ return self.send_action_for_execution(action)
+
def get_mcp_config(
self, extra_stdio_servers: list[MCPStdioServerConfig] | None = None
) -> MCPConfig:
diff --git a/tests/unit/agenthub/test_terminus_2_action_observation.py b/tests/unit/agenthub/test_terminus_2_action_observation.py
new file mode 100644
index 000000000000..599cfb64e74f
--- /dev/null
+++ b/tests/unit/agenthub/test_terminus_2_action_observation.py
@@ -0,0 +1,282 @@
+"""Unit tests for Terminus-2 action and observation serialization/deserialization.
+
+Tests that Terminus-2 actions and observations can be properly serialized to dict
+and deserialized back to action/observation objects.
+"""
+
+import pytest
+
+from openhands.core.schema import ActionType, ObservationType
+from openhands.events.action.terminus_2 import Terminus2CmdRunAction
+from openhands.events.observation.terminus_2 import Terminus2CmdOutputObservation
+from openhands.events.serialization import event_from_dict, event_to_dict
+
+
+# ==============================================================================
+# Terminus2CmdRunAction Serialization Tests
+# ==============================================================================
+
+
+class TestTerminus2CmdRunActionSerialization:
+ """Tests for Terminus2CmdRunAction serialization."""
+
+ def test_serialize_basic(self):
+ action = Terminus2CmdRunAction(keystrokes='ls -la\n')
+ serialized = event_to_dict(action)
+
+ assert serialized['action'] == ActionType.TERMINUS_2_CMD_RUN
+ assert serialized['args']['keystrokes'] == 'ls -la\n'
+ assert serialized['args']['duration'] == 1.0
+
+ def test_serialize_with_duration(self):
+ action = Terminus2CmdRunAction(keystrokes='make\n', duration=30.0)
+ serialized = event_to_dict(action)
+
+ assert serialized['args']['keystrokes'] == 'make\n'
+ assert serialized['args']['duration'] == 30.0
+
+ def test_serialize_with_thought(self):
+ action = Terminus2CmdRunAction(
+ keystrokes='ls\n',
+ duration=0.1,
+ thought='Listing directory contents',
+ )
+ serialized = event_to_dict(action)
+
+ assert serialized['args']['thought'] == 'Listing directory contents'
+
+ def test_serialize_special_keys(self):
+ action = Terminus2CmdRunAction(keystrokes='C-c', duration=0.1)
+ serialized = event_to_dict(action)
+
+ assert serialized['args']['keystrokes'] == 'C-c'
+
+ def test_serialize_empty_keystrokes(self):
+ action = Terminus2CmdRunAction(keystrokes='', duration=10.0)
+ serialized = event_to_dict(action)
+
+ assert serialized['args']['keystrokes'] == ''
+ assert serialized['args']['duration'] == 10.0
+
+ def test_deserialize_basic(self):
+ data = {
+ 'id': 1,
+ 'action': ActionType.TERMINUS_2_CMD_RUN,
+ 'args': {
+ 'keystrokes': 'ls -la\n',
+ 'duration': 0.1,
+ 'thought': '',
+ },
+ }
+ action = event_from_dict(data)
+
+ assert isinstance(action, Terminus2CmdRunAction)
+ assert action.keystrokes == 'ls -la\n'
+ assert action.duration == 0.1
+
+ def test_deserialize_with_all_params(self):
+ data = {
+ 'id': 2,
+ 'action': ActionType.TERMINUS_2_CMD_RUN,
+ 'args': {
+ 'keystrokes': 'make build\n',
+ 'duration': 30.0,
+ 'thought': 'Building project',
+ },
+ }
+ action = event_from_dict(data)
+
+ assert isinstance(action, Terminus2CmdRunAction)
+ assert action.keystrokes == 'make build\n'
+ assert action.duration == 30.0
+ assert action.thought == 'Building project'
+
+ def test_roundtrip_serialization(self):
+ original = Terminus2CmdRunAction(
+ keystrokes='cd /tmp && ls\n',
+ duration=2.5,
+ thought='Navigate and list',
+ )
+ serialized = event_to_dict(original)
+ restored = event_from_dict(serialized)
+
+ assert isinstance(restored, Terminus2CmdRunAction)
+ assert restored.keystrokes == original.keystrokes
+ assert restored.duration == original.duration
+ assert restored.thought == original.thought
+
+ def test_action_type_is_correct(self):
+ action = Terminus2CmdRunAction(keystrokes='test\n')
+ assert action.action == ActionType.TERMINUS_2_CMD_RUN
+ assert action.action == 'terminus_2_cmd_run'
+
+ def test_message_property(self):
+ action = Terminus2CmdRunAction(keystrokes='ls -la\n', duration=0.1)
+ msg = action.message
+ assert 'ls -la' in msg
+ assert '0.1s' in msg
+
+ def test_message_truncation(self):
+ long_cmd = 'a' * 100 + '\n'
+ action = Terminus2CmdRunAction(keystrokes=long_cmd, duration=1.0)
+ msg = action.message
+ assert '...' in msg
+
+
+# ==============================================================================
+# Terminus2CmdOutputObservation Serialization Tests
+# ==============================================================================
+
+
+class TestTerminus2CmdOutputObservationSerialization:
+ """Tests for Terminus2CmdOutputObservation serialization."""
+
+ def test_serialize_basic(self):
+ obs = Terminus2CmdOutputObservation(
+ content='output text',
+ terminal_state='$ ls\nfile1.txt\nfile2.txt\n$',
+ )
+ serialized = event_to_dict(obs)
+
+ assert serialized['observation'] == ObservationType.TERMINUS_2_CMD_OUTPUT
+ assert serialized['content'] == 'output text'
+ assert serialized['extras']['terminal_state'] == '$ ls\nfile1.txt\nfile2.txt\n$'
+
+ def test_serialize_with_timeout(self):
+ obs = Terminus2CmdOutputObservation(
+ content='timed out',
+ terminal_state='$ make\ncompiling...',
+ timed_out=True,
+ command_keystrokes='make\n',
+ )
+ serialized = event_to_dict(obs)
+
+ assert serialized['extras']['timed_out'] is True
+ assert serialized['extras']['command_keystrokes'] == 'make\n'
+
+ def test_deserialize_basic(self):
+ data = {
+ 'id': 1,
+ 'observation': ObservationType.TERMINUS_2_CMD_OUTPUT,
+ 'content': 'terminal output',
+ 'extras': {
+ 'terminal_state': '$ ls\nfiles...',
+ 'timed_out': False,
+ 'command_keystrokes': 'ls\n',
+ },
+ }
+ obs = event_from_dict(data)
+
+ assert isinstance(obs, Terminus2CmdOutputObservation)
+ assert obs.terminal_state == '$ ls\nfiles...'
+ assert obs.timed_out is False
+ assert obs.command_keystrokes == 'ls\n'
+
+ def test_deserialize_timed_out(self):
+ data = {
+ 'id': 2,
+ 'observation': ObservationType.TERMINUS_2_CMD_OUTPUT,
+ 'content': 'partial output',
+ 'extras': {
+ 'terminal_state': 'compiling...',
+ 'timed_out': True,
+ 'command_keystrokes': 'make\n',
+ },
+ }
+ obs = event_from_dict(data)
+
+ assert isinstance(obs, Terminus2CmdOutputObservation)
+ assert obs.timed_out is True
+
+ def test_roundtrip_serialization(self):
+ original = Terminus2CmdOutputObservation(
+ content='full output',
+ terminal_state='$ ls -la\ntotal 8\nfile1.txt\n$',
+ timed_out=False,
+ command_keystrokes='ls -la\n',
+ )
+ serialized = event_to_dict(original)
+ restored = event_from_dict(serialized)
+
+ assert isinstance(restored, Terminus2CmdOutputObservation)
+ assert restored.terminal_state == original.terminal_state
+ assert restored.timed_out == original.timed_out
+ assert restored.command_keystrokes == original.command_keystrokes
+ assert restored.content == original.content
+
+ def test_observation_type_is_correct(self):
+ obs = Terminus2CmdOutputObservation(content='test')
+ assert obs.observation == ObservationType.TERMINUS_2_CMD_OUTPUT
+ assert obs.observation == 'terminus_2_cmd_output'
+
+ def test_message_property(self):
+ obs = Terminus2CmdOutputObservation(
+ content='output',
+ command_keystrokes='ls\n',
+ )
+ msg = obs.message
+ assert 'ls' in msg
+
+ def test_message_with_timeout(self):
+ obs = Terminus2CmdOutputObservation(
+ content='output',
+ command_keystrokes='make\n',
+ timed_out=True,
+ )
+ msg = obs.message
+ assert 'timed out' in msg
+
+ def test_default_values(self):
+ obs = Terminus2CmdOutputObservation(content='test')
+ assert obs.terminal_state == ''
+ assert obs.timed_out is False
+ assert obs.command_keystrokes == ''
+
+
+# ==============================================================================
+# Schema Type Tests
+# ==============================================================================
+
+
+class TestSchemaTypes:
+ """Tests that the schema enums are correctly defined."""
+
+ def test_action_type_exists(self):
+ assert hasattr(ActionType, 'TERMINUS_2_CMD_RUN')
+ assert ActionType.TERMINUS_2_CMD_RUN == 'terminus_2_cmd_run'
+
+ def test_observation_type_exists(self):
+ assert hasattr(ObservationType, 'TERMINUS_2_CMD_OUTPUT')
+ assert ObservationType.TERMINUS_2_CMD_OUTPUT == 'terminus_2_cmd_output'
+
+ def test_action_type_in_serialization_map(self):
+ from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
+ assert 'terminus_2_cmd_run' in ACTION_TYPE_TO_CLASS
+ assert ACTION_TYPE_TO_CLASS['terminus_2_cmd_run'] == Terminus2CmdRunAction
+
+ def test_observation_type_in_serialization_map(self):
+ from openhands.events.serialization.observation import OBSERVATION_TYPE_TO_CLASS
+ assert 'terminus_2_cmd_output' in OBSERVATION_TYPE_TO_CLASS
+ assert OBSERVATION_TYPE_TO_CLASS['terminus_2_cmd_output'] == Terminus2CmdOutputObservation
+
+
+# ==============================================================================
+# Import Tests
+# ==============================================================================
+
+
+class TestImports:
+ """Tests that all new types are properly importable."""
+
+ def test_import_action_from_events(self):
+ from openhands.events.action import Terminus2CmdRunAction
+ assert Terminus2CmdRunAction is not None
+
+ def test_import_observation_from_events(self):
+ from openhands.events.observation import Terminus2CmdOutputObservation
+ assert Terminus2CmdOutputObservation is not None
+
+ def test_agent_registration(self):
+ from openhands.controller.agent import Agent
+ import openhands.agenthub.terminus_2_agent # noqa: F401
+ assert 'Terminus2Agent' in Agent._registry
diff --git a/tests/unit/agenthub/test_terminus_2_agent.py b/tests/unit/agenthub/test_terminus_2_agent.py
new file mode 100644
index 000000000000..9b4fb3f0a1b6
--- /dev/null
+++ b/tests/unit/agenthub/test_terminus_2_agent.py
@@ -0,0 +1,1206 @@
+"""Unit tests for the Terminus-2 Agent.
+
+Tests the Terminus2Agent's message building, output truncation,
+conversation history reconstruction, initial terminal capture,
+and core agent logic.
+"""
+
+import pytest
+
+from openhands.agenthub.terminus_2_agent.terminus_2_agent import (
+ COMPLETION_CONFIRMATION,
+ TIMEOUT_TEMPLATE,
+ Terminus2Agent,
+)
+from openhands.agenthub.terminus_2_agent.terminus_json_plain_parser import (
+ ParsedCommand,
+ TerminusJSONPlainParser,
+)
+from openhands.events.action import AgentFinishAction, AgentThinkAction, MessageAction
+from openhands.events.action.terminus_2 import Terminus2CmdRunAction
+from openhands.events.event import EventSource
+from openhands.events.observation.error import ErrorObservation
+from openhands.events.observation.terminus_2 import Terminus2CmdOutputObservation
+
+
+# ==============================================================================
+# Output Truncation Tests
+# ==============================================================================
+
+
+class TestOutputTruncation:
+ """Tests for the _limit_output_length static method."""
+
+ def test_short_output_not_truncated(self):
+ output = 'short output'
+ result = Terminus2Agent._limit_output_length(output, max_bytes=10000)
+ assert result == output
+
+ def test_long_output_truncated(self):
+ output = 'x' * 20000
+ result = Terminus2Agent._limit_output_length(output, max_bytes=10000)
+ assert len(result.encode('utf-8')) < len(output.encode('utf-8'))
+ assert 'output limited to 10000 bytes' in result
+ assert 'interior bytes omitted' in result
+
+ def test_exact_limit_not_truncated(self):
+ output = 'x' * 10000
+ result = Terminus2Agent._limit_output_length(output, max_bytes=10000)
+ assert result == output
+
+ def test_truncation_preserves_start_and_end(self):
+ output = 'START' + 'x' * 20000 + 'END'
+ result = Terminus2Agent._limit_output_length(output, max_bytes=1000)
+ assert result.startswith('START')
+ assert result.endswith('END')
+
+ def test_unicode_truncation(self):
+ output = '\u00e9' * 10000 # Each char is 2 bytes in UTF-8
+ result = Terminus2Agent._limit_output_length(output, max_bytes=5000)
+ assert 'output limited to 5000 bytes' in result
+
+ def test_custom_max_bytes(self):
+ output = 'a' * 500
+ result = Terminus2Agent._limit_output_length(output, max_bytes=200)
+ assert 'output limited to 200 bytes' in result
+
+ def test_empty_output(self):
+ result = Terminus2Agent._limit_output_length('', max_bytes=10000)
+ assert result == ''
+
+
+# ==============================================================================
+# Template Tests
+# ==============================================================================
+
+
+class TestTemplates:
+ """Tests for the message templates used by the agent."""
+
+ def test_timeout_template_formatting(self):
+ result = TIMEOUT_TEMPLATE.format(
+ command='make build\n',
+ timeout_sec=30,
+ terminal_state='$ make build\ncompiling...',
+ )
+ assert 'make build' in result
+ assert '30 seconds' in result
+ assert 'compiling...' in result
+ assert 'timed out' in result
+
+ def test_completion_confirmation_formatting(self):
+ result = COMPLETION_CONFIRMATION.format(
+ terminal_state='$ echo done\ndone\n$',
+ )
+ assert 'echo done' in result
+ assert 'task_complete' in result
+ assert 'graded' in result
+
+
+# ==============================================================================
+# Parser Integration Tests
+# ==============================================================================
+
+
+class TestParserIntegration:
+ """Tests that the parser integrates correctly with the agent's expected flow."""
+
+ @pytest.fixture
+ def parser(self):
+ return TerminusJSONPlainParser()
+
+ def test_typical_response_flow(self, parser):
+ """Simulate a typical multi-command response."""
+ response = '''{
+ "analysis": "I need to set up the project. The directory is empty.",
+ "plan": "1. Create a project directory. 2. Initialize it.",
+ "commands": [
+ {"keystrokes": "mkdir myproject\\n", "duration": 0.1},
+ {"keystrokes": "cd myproject\\n", "duration": 0.1},
+ {"keystrokes": "git init\\n", "duration": 1.0}
+ ]
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert len(result.commands) == 3
+ assert result.commands[0].keystrokes == 'mkdir myproject\n'
+ assert result.commands[1].keystrokes == 'cd myproject\n'
+ assert result.commands[2].keystrokes == 'git init\n'
+ assert result.commands[2].duration == 1.0
+ assert result.is_task_complete is False
+
+ def test_completion_response(self, parser):
+ """Simulate a task completion response."""
+ response = '''{
+ "analysis": "All tests pass. The implementation is complete.",
+ "plan": "Mark the task as complete.",
+ "commands": [],
+ "task_complete": true
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert result.is_task_complete is True
+ assert len(result.commands) == 0
+
+ def test_wait_response(self, parser):
+ """Simulate a wait-for-output response."""
+ response = '''{
+ "analysis": "The build is still running.",
+ "plan": "Wait for the build to finish.",
+ "commands": [
+ {"keystrokes": "", "duration": 10.0}
+ ]
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert len(result.commands) == 1
+ assert result.commands[0].keystrokes == ''
+ assert result.commands[0].duration == 10.0
+
+ def test_ctrl_c_response(self, parser):
+ """Simulate sending Ctrl+C to cancel a running process."""
+ response = '''{
+ "analysis": "The process appears to be stuck.",
+ "plan": "Send Ctrl+C to cancel it.",
+ "commands": [
+ {"keystrokes": "C-c", "duration": 0.1}
+ ]
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert len(result.commands) == 1
+ assert result.commands[0].keystrokes == 'C-c'
+
+
+# ==============================================================================
+# Message Building Tests
+# ==============================================================================
+
+
+class TestMessageBuilding:
+ """Tests for the agent's message building logic."""
+
+ def test_find_initial_user_message(self):
+ """Test extraction of initial user message from events."""
+ msg = MessageAction(content='Fix the bug in module X')
+ msg._source = EventSource.USER
+
+ events = [msg]
+ for event in events:
+ if isinstance(event, MessageAction) and event.source == EventSource.USER:
+ assert event.content == 'Fix the bug in module X'
+ break
+
+ def test_terminus_2_action_batch_creation(self):
+ """Test that commands are correctly converted to actions."""
+ commands = [
+ ParsedCommand(keystrokes='ls -la\n', duration=0.1),
+ ParsedCommand(keystrokes='cat file.txt\n', duration=0.5),
+ ]
+
+ actions = []
+ for cmd in commands:
+ action = Terminus2CmdRunAction(
+ keystrokes=cmd.keystrokes,
+ duration=min(cmd.duration, 60),
+ )
+ actions.append(action)
+
+ assert len(actions) == 2
+ assert actions[0].keystrokes == 'ls -la\n'
+ assert actions[0].duration == 0.1
+ assert actions[1].keystrokes == 'cat file.txt\n'
+ assert actions[1].duration == 0.5
+
+ def test_duration_capped_at_60(self):
+ """Test that duration is capped at 60 seconds."""
+ cmd = ParsedCommand(keystrokes='sleep 100\n', duration=100.0)
+ action = Terminus2CmdRunAction(
+ keystrokes=cmd.keystrokes,
+ duration=min(cmd.duration, 60),
+ )
+ assert action.duration == 60
+
+
+# ==============================================================================
+# Double Confirmation Tests
+# ==============================================================================
+
+
+class TestDoubleConfirmation:
+ """Tests for the double-confirmation task completion logic."""
+
+ def test_pending_completion_flag_initial(self):
+ """Verify the flag starts as False."""
+ pending = False
+
+ is_task_complete = True
+ if is_task_complete:
+ if pending:
+ action = 'finish'
+ else:
+ pending = True
+ action = 'confirm'
+ else:
+ pending = False
+ action = 'continue'
+
+ assert pending is True
+ assert action == 'confirm'
+
+ def test_pending_completion_second_time(self):
+ """Verify second task_complete triggers finish."""
+ pending = True
+
+ is_task_complete = True
+ if is_task_complete:
+ if pending:
+ action = 'finish'
+ else:
+ pending = True
+ action = 'confirm'
+ else:
+ pending = False
+ action = 'continue'
+
+ assert action == 'finish'
+
+ def test_pending_completion_reset_on_not_complete(self):
+ """Verify pending is reset when task_complete is False."""
+ pending = True
+
+ is_task_complete = False
+ if is_task_complete:
+ if pending:
+ action = 'finish'
+ else:
+ pending = True
+ action = 'confirm'
+ else:
+ pending = False
+ action = 'continue'
+
+ assert pending is False
+ assert action == 'continue'
+
+ def test_first_task_complete_with_no_commands_returns_noop(self):
+ """First task_complete=True with empty commands should return a no-op
+ action (to capture terminal state) rather than finishing immediately."""
+ from collections import deque
+
+ pending_completion = False
+ pending_actions: deque = deque()
+ commands: list = []
+ is_task_complete = True
+ response_text = '{"analysis":"done","plan":"none","commands":[],"task_complete":true}'
+
+ if is_task_complete:
+ if pending_completion:
+ result = 'finish'
+ else:
+ pending_completion = True
+ result = None
+ else:
+ pending_completion = False
+ result = None
+
+ for i, cmd in enumerate(commands):
+ pending_actions.append(cmd)
+
+ if result is None and not pending_actions:
+ if pending_completion:
+ result = 'noop_for_confirmation'
+ else:
+ result = 'think'
+
+ assert pending_completion is True
+ assert result == 'noop_for_confirmation'
+
+ def test_first_task_complete_with_commands_queues_normally(self):
+ """First task_complete=True with commands should queue them normally."""
+ from collections import deque
+
+ pending_completion = False
+ pending_actions: deque = deque()
+ is_task_complete = True
+ commands = [
+ ParsedCommand(keystrokes='ls\n', duration=0.1),
+ ]
+
+ if is_task_complete:
+ if pending_completion:
+ result = 'finish'
+ else:
+ pending_completion = True
+ result = None
+ else:
+ pending_completion = False
+ result = None
+
+ for i, cmd in enumerate(commands):
+ pending_actions.append(cmd)
+
+ if result is None and not pending_actions:
+ if pending_completion:
+ result = 'noop_for_confirmation'
+ else:
+ result = 'think'
+ elif result is None:
+ result = 'pop_pending'
+
+ assert pending_completion is True
+ assert len(pending_actions) == 1
+ assert result == 'pop_pending'
+
+ def test_confirmation_message_appended_when_pending(self):
+ """_build_messages should append COMPLETION_CONFIRMATION when _pending_completion is True."""
+ user_msg = MessageAction(content='Task')
+ user_msg._source = EventSource.USER
+ noop = Terminus2CmdRunAction(keystrokes='', duration=0.5)
+ initial_obs = Terminus2CmdOutputObservation(
+ content='Current Terminal Screen:\nroot@host:/app# ',
+ terminal_state='Current Terminal Screen:\nroot@host:/app# ',
+ )
+ resp = '{"analysis":"done","plan":"done","commands":[],"task_complete":true}'
+ noop2 = Terminus2CmdRunAction(keystrokes='', duration=0.5, thought=resp)
+ confirm_obs = Terminus2CmdOutputObservation(
+ content='Current Terminal Screen:\nroot@host:/app# ',
+ terminal_state='Current Terminal Screen:\nroot@host:/app# ',
+ )
+
+ events = [user_msg, noop, initial_obs, noop2, confirm_obs]
+
+ pending_completion = True
+
+ messages = []
+ messages.append(('system', 'system_prompt'))
+
+ initial_terminal_event = initial_obs
+ first_text = f'{user_msg.content}\n\n{initial_terminal_event.terminal_state}'
+ messages.append(('user', first_text))
+
+ batch_observations: list[str] = []
+ for event in events:
+ if isinstance(event, Terminus2CmdRunAction):
+ if event.thought:
+ if batch_observations:
+ messages.append(('user', batch_observations[-1]))
+ batch_observations = []
+ messages.append(('assistant', event.thought))
+ elif isinstance(event, Terminus2CmdOutputObservation):
+ if event is initial_terminal_event:
+ continue
+ batch_observations.append(event.terminal_state)
+
+ if batch_observations:
+ terminal_output = batch_observations[-1]
+ if pending_completion:
+ confirmation = (
+ f'Current terminal state:\n{terminal_output}\n\n'
+ 'Are you sure you want to mark the task as complete? '
+ "This will trigger your solution to be graded and you won't be able to "
+ 'make any further corrections. If so, include "task_complete": true '
+ 'in your JSON response again.'
+ )
+ messages.append(('user', confirmation))
+ else:
+ messages.append(('user', terminal_output))
+ elif pending_completion:
+ confirmation = (
+ 'Current terminal state:\n\n\n'
+ 'Are you sure you want to mark the task as complete? '
+ "This will trigger your solution to be graded and you won't be able to "
+ 'make any further corrections. If so, include "task_complete": true '
+ 'in your JSON response again.'
+ )
+ messages.append(('user', confirmation))
+
+ assert any('Are you sure you want to mark the task as complete?' in m[1] for m in messages)
+ assert messages[-1][0] == 'user'
+ assert 'task_complete' in messages[-1][1]
+ user_messages = [m for m in messages if m[0] == 'user']
+ assert len(user_messages) == 2 # initial task + confirmation (NOT three)
+
+
+# ==============================================================================
+# Observation Handling Tests
+# ==============================================================================
+
+
+class TestObservationHandling:
+ """Tests for handling Terminus-2 observations."""
+
+ def test_observation_content_extraction(self):
+ obs = Terminus2CmdOutputObservation(
+ content='$ ls\nfile1.txt\nfile2.txt\n$',
+ terminal_state='$ ls\nfile1.txt\nfile2.txt\n$',
+ timed_out=False,
+ command_keystrokes='ls\n',
+ )
+
+ assert obs.terminal_state == '$ ls\nfile1.txt\nfile2.txt\n$'
+ assert obs.timed_out is False
+ assert obs.command_keystrokes == 'ls\n'
+
+ def test_timed_out_observation(self):
+ obs = Terminus2CmdOutputObservation(
+ content='partial output...',
+ terminal_state='partial output...',
+ timed_out=True,
+ command_keystrokes='make\n',
+ )
+
+ assert obs.timed_out is True
+ assert 'timed out' in obs.message
+
+ def test_observation_with_empty_terminal_state(self):
+ obs = Terminus2CmdOutputObservation(
+ content='',
+ terminal_state='',
+ )
+ assert obs.terminal_state == ''
+ assert obs.content == ''
+
+
+# ==============================================================================
+# _has_terminal_observation Tests
+# ==============================================================================
+
+
+class TestHasTerminalObservation:
+ """Tests for the _has_terminal_observation helper."""
+
+ def test_empty_events(self):
+ assert Terminus2Agent._has_terminal_observation(None, []) is False
+
+ def test_only_user_message(self):
+ msg = MessageAction(content='task')
+ msg._source = EventSource.USER
+ assert Terminus2Agent._has_terminal_observation(None, [msg]) is False
+
+ def test_has_observation(self):
+ obs = Terminus2CmdOutputObservation(
+ content='output', terminal_state='root@host:/# pwd\n/\nroot@host:/# '
+ )
+ assert Terminus2Agent._has_terminal_observation(None, [obs]) is True
+
+ def test_observation_after_other_events(self):
+ msg = MessageAction(content='task')
+ msg._source = EventSource.USER
+ action = Terminus2CmdRunAction(keystrokes='', duration=0.5)
+ obs = Terminus2CmdOutputObservation(
+ content='output', terminal_state='root@host:/# '
+ )
+ assert Terminus2Agent._has_terminal_observation(None, [msg, action, obs]) is True
+
+
+# ==============================================================================
+# _find_initial_terminal_event Tests
+# ==============================================================================
+
+
+class TestFindInitialTerminalEvent:
+ """Tests for _find_initial_terminal_event returning the event object."""
+
+ def test_returns_none_on_empty(self):
+ result = Terminus2Agent._find_initial_terminal_event(None, [])
+ assert result is None
+
+ def test_returns_none_when_no_observations(self):
+ msg = MessageAction(content='task')
+ msg._source = EventSource.USER
+ result = Terminus2Agent._find_initial_terminal_event(None, [msg])
+ assert result is None
+
+ def test_returns_first_observation_object(self):
+ obs1 = Terminus2CmdOutputObservation(
+ content='first', terminal_state='screen1'
+ )
+ obs2 = Terminus2CmdOutputObservation(
+ content='second', terminal_state='screen2'
+ )
+ result = Terminus2Agent._find_initial_terminal_event(None, [obs1, obs2])
+ assert result is obs1
+ assert result is not obs2
+
+ def test_identity_comparison_works(self):
+ """The returned event should be the exact same object for identity checks."""
+ obs = Terminus2CmdOutputObservation(
+ content='output', terminal_state='root@host:/# '
+ )
+ msg = MessageAction(content='task')
+ msg._source = EventSource.USER
+ events = [msg, obs]
+ result = Terminus2Agent._find_initial_terminal_event(None, events)
+ assert result is obs
+
+
+# ==============================================================================
+# Response Text Storage Tests (thought field on first action)
+# ==============================================================================
+
+
+class TestResponseTextStorage:
+ """Tests that LLM response text is stored on the first action's thought field."""
+
+ def test_first_action_gets_thought(self):
+ """When creating actions from commands, only the first gets the response text."""
+ response_text = '{"analysis":"test","plan":"test","commands":[{"keystrokes":"ls\\n","duration":0.1},{"keystrokes":"pwd\\n","duration":0.1}]}'
+ commands = [
+ ParsedCommand(keystrokes='ls\n', duration=0.1),
+ ParsedCommand(keystrokes='pwd\n', duration=0.1),
+ ]
+
+ actions = []
+ for i, cmd in enumerate(commands):
+ action = Terminus2CmdRunAction(
+ keystrokes=cmd.keystrokes,
+ duration=min(cmd.duration, 60),
+ thought=response_text if i == 0 else '',
+ )
+ actions.append(action)
+
+ assert actions[0].thought == response_text
+ assert actions[1].thought == ''
+
+ def test_single_command_gets_thought(self):
+ response_text = '{"analysis":"x","plan":"x","commands":[{"keystrokes":"ls\\n"}]}'
+ action = Terminus2CmdRunAction(
+ keystrokes='ls\n',
+ duration=0.1,
+ thought=response_text,
+ )
+ assert action.thought == response_text
+
+ def test_empty_response_stored_as_empty(self):
+ action = Terminus2CmdRunAction(
+ keystrokes='ls\n',
+ duration=0.1,
+ thought='',
+ )
+ assert action.thought == ''
+
+
+# ==============================================================================
+# Conversation History Reconstruction Tests
+# ==============================================================================
+
+
+class TestConversationHistoryReconstruction:
+ """Tests that _build_messages reconstructs full conversation history
+ from the event stream, including assistant turns from action.thought.
+
+ These tests simulate what _build_messages does by processing events
+ using the same algorithm, verifying the message sequence is correct.
+ """
+
+ def _simulate_build_messages(self, events):
+ """Simulate the core _build_messages loop logic to verify message ordering.
+
+ Returns a list of (role, text) tuples representing the conversation.
+ This mirrors the algorithm in Terminus2Agent._build_messages.
+ """
+ messages = []
+ messages.append(('system', 'system_prompt'))
+
+ initial_user_msg = None
+ initial_terminal_event = None
+ for event in events:
+ if isinstance(event, MessageAction) and event.source == EventSource.USER:
+ initial_user_msg = event.content
+ break
+ for event in events:
+ if isinstance(event, Terminus2CmdOutputObservation):
+ initial_terminal_event = event
+ break
+
+ if initial_user_msg:
+ if initial_terminal_event is not None:
+ terminal_text = initial_terminal_event.terminal_state
+ first_text = f'{initial_user_msg}\n\n{terminal_text}'
+ else:
+ first_text = initial_user_msg
+ messages.append(('user', first_text))
+
+ batch_observations = []
+ for event in events:
+ if isinstance(event, MessageAction):
+ if event.source == EventSource.USER:
+ continue
+ elif event.source == EventSource.AGENT:
+ if batch_observations:
+ combined = Terminus2Agent._combine_observations(
+ batch_observations
+ )
+ messages.append(('user', combined))
+ batch_observations = []
+ messages.append(('assistant', event.content))
+
+ elif isinstance(event, Terminus2CmdRunAction):
+ if event.thought:
+ if batch_observations:
+ combined = Terminus2Agent._combine_observations(
+ batch_observations
+ )
+ messages.append(('user', combined))
+ batch_observations = []
+ messages.append(('assistant', event.thought))
+
+ elif isinstance(event, Terminus2CmdOutputObservation):
+ if event is initial_terminal_event:
+ continue
+ batch_observations.append(event.terminal_state)
+
+ elif isinstance(event, ErrorObservation):
+ batch_observations.append(f'ERROR: {event.content}')
+
+ if batch_observations:
+ combined = Terminus2Agent._combine_observations(batch_observations)
+ messages.append(('user', combined))
+
+ return messages
+
+ def test_initial_state_only(self):
+ """First LLM call: system + user(task+terminal screen)."""
+ user_msg = MessageAction(content='Fix the bug')
+ user_msg._source = EventSource.USER
+ noop = Terminus2CmdRunAction(keystrokes='', duration=0.5)
+ initial_obs = Terminus2CmdOutputObservation(
+ content='Current Terminal Screen:\nroot@host:/app# pwd\n/app\nroot@host:/app# ',
+ terminal_state='Current Terminal Screen:\nroot@host:/app# pwd\n/app\nroot@host:/app# ',
+ )
+
+ events = [user_msg, noop, initial_obs]
+ msgs = self._simulate_build_messages(events)
+
+ assert len(msgs) == 2 # system + user
+ assert msgs[0][0] == 'system'
+ assert msgs[1][0] == 'user'
+ assert 'Fix the bug' in msgs[1][1]
+ assert 'Current Terminal Screen:' in msgs[1][1]
+ assert 'root@host:/app#' in msgs[1][1]
+
+ def test_one_round_trip(self):
+ """After first LLM call: system + user(task+terminal) + assistant + user(output)."""
+ llm_response = '{"analysis":"a","plan":"p","commands":[{"keystrokes":"ls\\n","duration":0.1}]}'
+
+ user_msg = MessageAction(content='Fix the bug')
+ user_msg._source = EventSource.USER
+ noop = Terminus2CmdRunAction(keystrokes='', duration=0.5)
+ initial_obs = Terminus2CmdOutputObservation(
+ content='Current Terminal Screen:\nroot@host:/app# ',
+ terminal_state='Current Terminal Screen:\nroot@host:/app# ',
+ )
+ cmd_action = Terminus2CmdRunAction(
+ keystrokes='ls\n', duration=0.1, thought=llm_response
+ )
+ cmd_obs = Terminus2CmdOutputObservation(
+ content='New Terminal Output:\nroot@host:/app# ls\nfile.py\nroot@host:/app# ',
+ terminal_state='New Terminal Output:\nroot@host:/app# ls\nfile.py\nroot@host:/app# ',
+ )
+
+ events = [user_msg, noop, initial_obs, cmd_action, cmd_obs]
+ msgs = self._simulate_build_messages(events)
+
+ assert len(msgs) == 4 # system, user, assistant, user
+ assert msgs[0][0] == 'system'
+ assert msgs[1][0] == 'user'
+ assert msgs[2][0] == 'assistant'
+ assert msgs[2][1] == llm_response
+ assert msgs[3][0] == 'user'
+ assert 'New Terminal Output:' in msgs[3][1]
+ assert 'file.py' in msgs[3][1]
+
+ def test_multi_command_batch(self):
+ """Multiple commands from one LLM call: ALL observations are combined into one user message."""
+ llm_response = '{"analysis":"a","plan":"p","commands":[{"keystrokes":"ls\\n"},{"keystrokes":"pwd\\n"}]}'
+
+ user_msg = MessageAction(content='Task')
+ user_msg._source = EventSource.USER
+ noop = Terminus2CmdRunAction(keystrokes='', duration=0.5)
+ initial_obs = Terminus2CmdOutputObservation(
+ content='Current Terminal Screen:\nprompt',
+ terminal_state='Current Terminal Screen:\nprompt',
+ )
+ cmd1 = Terminus2CmdRunAction(keystrokes='ls\n', duration=0.1, thought=llm_response)
+ obs1 = Terminus2CmdOutputObservation(
+ content='New Terminal Output:\nroot@host:/app# ls\nfile.py\nroot@host:/app# ',
+ terminal_state='New Terminal Output:\nroot@host:/app# ls\nfile.py\nroot@host:/app# ',
+ )
+ cmd2 = Terminus2CmdRunAction(keystrokes='pwd\n', duration=0.1, thought='')
+ obs2 = Terminus2CmdOutputObservation(
+ content='New Terminal Output:\nroot@host:/app# pwd\n/app\nroot@host:/app# ',
+ terminal_state='New Terminal Output:\nroot@host:/app# pwd\n/app\nroot@host:/app# ',
+ )
+
+ events = [user_msg, noop, initial_obs, cmd1, obs1, cmd2, obs2]
+ msgs = self._simulate_build_messages(events)
+
+ assert len(msgs) == 4 # system, user, assistant, user
+ assert msgs[2][0] == 'assistant'
+ assert msgs[2][1] == llm_response
+ assert msgs[3][0] == 'user'
+ assert 'ls' in msgs[3][1]
+ assert 'file.py' in msgs[3][1]
+ assert 'pwd' in msgs[3][1]
+ assert '/app' in msgs[3][1]
+ assert msgs[3][1].count('New Terminal Output:') == 1
+
+ def test_two_round_trips(self):
+ """Two LLM calls produce: sys, user, asst, user, asst, user."""
+ resp1 = '{"analysis":"a","plan":"p","commands":[{"keystrokes":"ls\\n"}]}'
+ resp2 = '{"analysis":"b","plan":"q","commands":[{"keystrokes":"cat f\\n"}]}'
+
+ user_msg = MessageAction(content='Task')
+ user_msg._source = EventSource.USER
+ noop = Terminus2CmdRunAction(keystrokes='', duration=0.5)
+ initial_obs = Terminus2CmdOutputObservation(
+ content='Current Terminal Screen:\nprompt',
+ terminal_state='Current Terminal Screen:\nprompt',
+ )
+ cmd1 = Terminus2CmdRunAction(keystrokes='ls\n', duration=0.1, thought=resp1)
+ obs1 = Terminus2CmdOutputObservation(
+ content='New Terminal Output:\nls result',
+ terminal_state='New Terminal Output:\nls result',
+ )
+ cmd2 = Terminus2CmdRunAction(keystrokes='cat f\n', duration=0.1, thought=resp2)
+ obs2 = Terminus2CmdOutputObservation(
+ content='New Terminal Output:\nfile content',
+ terminal_state='New Terminal Output:\nfile content',
+ )
+
+ events = [user_msg, noop, initial_obs, cmd1, obs1, cmd2, obs2]
+ msgs = self._simulate_build_messages(events)
+
+ assert len(msgs) == 6 # system, user, asst, user, asst, user
+ roles = [m[0] for m in msgs]
+ assert roles == ['system', 'user', 'assistant', 'user', 'assistant', 'user']
+ assert msgs[2][1] == resp1
+ assert 'ls result' in msgs[3][1]
+ assert msgs[4][1] == resp2
+ assert 'file content' in msgs[5][1]
+
+ def test_initial_observation_not_duplicated(self):
+ """The initial terminal observation should NOT appear as a separate user message."""
+ user_msg = MessageAction(content='Task')
+ user_msg._source = EventSource.USER
+ noop = Terminus2CmdRunAction(keystrokes='', duration=0.5)
+ initial_obs = Terminus2CmdOutputObservation(
+ content='Current Terminal Screen:\nINITIAL_SCREEN',
+ terminal_state='Current Terminal Screen:\nINITIAL_SCREEN',
+ )
+
+ events = [user_msg, noop, initial_obs]
+ msgs = self._simulate_build_messages(events)
+
+ user_messages = [m[1] for m in msgs if m[0] == 'user']
+ assert len(user_messages) == 1
+ assert 'Current Terminal Screen:' in user_messages[0]
+ assert 'INITIAL_SCREEN' in user_messages[0]
+
+ def test_initial_observation_not_duplicated_after_first_llm_call(self):
+ """After one LLM round, initial screen should only appear in first user msg."""
+ resp = '{"analysis":"a","plan":"p","commands":[{"keystrokes":"ls\\n"}]}'
+ user_msg = MessageAction(content='Task')
+ user_msg._source = EventSource.USER
+ noop = Terminus2CmdRunAction(keystrokes='', duration=0.5)
+ initial_obs = Terminus2CmdOutputObservation(
+ content='Current Terminal Screen:\nINITIAL_SCREEN',
+ terminal_state='Current Terminal Screen:\nINITIAL_SCREEN',
+ )
+ cmd = Terminus2CmdRunAction(keystrokes='ls\n', duration=0.1, thought=resp)
+ obs = Terminus2CmdOutputObservation(
+ content='New Terminal Output:\nls output',
+ terminal_state='New Terminal Output:\nls output',
+ )
+
+ events = [user_msg, noop, initial_obs, cmd, obs]
+ msgs = self._simulate_build_messages(events)
+
+ user_messages = [m[1] for m in msgs if m[0] == 'user']
+ assert len(user_messages) == 2
+ assert 'INITIAL_SCREEN' in user_messages[0]
+ assert 'INITIAL_SCREEN' not in user_messages[1]
+ assert 'New Terminal Output:' in user_messages[1]
+
+ def test_no_initial_terminal_state(self):
+ """When no terminal observation exists, first user message is just the task."""
+ user_msg = MessageAction(content='Task description')
+ user_msg._source = EventSource.USER
+
+ events = [user_msg]
+ msgs = self._simulate_build_messages(events)
+
+ assert len(msgs) == 2 # system + user
+ assert msgs[1][1] == 'Task description'
+
+ def test_error_observation_in_batch(self):
+ """ErrorObservation should be included in batch_observations."""
+ user_msg = MessageAction(content='Task')
+ user_msg._source = EventSource.USER
+ initial_obs = Terminus2CmdOutputObservation(
+ content='Current Terminal Screen:\nprompt',
+ terminal_state='Current Terminal Screen:\nprompt',
+ )
+ resp = '{"analysis":"a","plan":"p","commands":[{"keystrokes":"bad_cmd\\n"}]}'
+ cmd = Terminus2CmdRunAction(keystrokes='bad_cmd\n', duration=0.1, thought=resp)
+ err = ErrorObservation(content='command failed')
+
+ events = [user_msg, initial_obs, cmd, err]
+ msgs = self._simulate_build_messages(events)
+
+ user_messages = [m[1] for m in msgs if m[0] == 'user']
+ assert any('ERROR: command failed' in m for m in user_messages)
+
+ def test_alternating_roles_no_consecutive_same_role(self):
+ """After system, messages should alternate user/assistant (no consecutive same role)."""
+ resp = '{"analysis":"a","plan":"p","commands":[{"keystrokes":"ls\\n"}]}'
+ user_msg = MessageAction(content='Task')
+ user_msg._source = EventSource.USER
+ noop = Terminus2CmdRunAction(keystrokes='', duration=0.5)
+ initial_obs = Terminus2CmdOutputObservation(
+ content='Current Terminal Screen:\nprompt',
+ terminal_state='Current Terminal Screen:\nprompt',
+ )
+ cmd = Terminus2CmdRunAction(keystrokes='ls\n', duration=0.1, thought=resp)
+ obs = Terminus2CmdOutputObservation(
+ content='New Terminal Output:\noutput',
+ terminal_state='New Terminal Output:\noutput',
+ )
+
+ events = [user_msg, noop, initial_obs, cmd, obs]
+ msgs = self._simulate_build_messages(events)
+
+ roles = [m[0] for m in msgs]
+ assert roles[0] == 'system'
+ for i in range(2, len(roles)):
+ assert roles[i] != roles[i - 1], (
+ f'Consecutive same role at {i}: {roles}'
+ )
+
+
+# ==============================================================================
+# Initial Terminal Capture Tests
+# ==============================================================================
+
+
+class TestInitialTerminalCapture:
+ """Tests for the no-op action sent on the first step to capture terminal state."""
+
+ def test_noop_action_has_empty_keystrokes(self):
+ action = Terminus2CmdRunAction(keystrokes='', duration=0.5)
+ assert action.keystrokes == ''
+ assert action.duration == 0.5
+
+ def test_noop_action_is_runnable(self):
+ action = Terminus2CmdRunAction(keystrokes='', duration=0.5)
+ assert action.runnable is True
+
+
+# ==============================================================================
+# Action Execution Client Dispatch Tests
+# ==============================================================================
+
+
+class TestActionExecutionClientDispatch:
+ """Tests that the ActionExecutionClient has the terminus_2_cmd_run method."""
+
+ def test_client_has_terminus_2_method(self):
+ from openhands.runtime.impl.action_execution.action_execution_client import (
+ ActionExecutionClient,
+ )
+ assert hasattr(ActionExecutionClient, 'terminus_2_cmd_run')
+
+ def test_client_method_is_callable(self):
+ from openhands.runtime.impl.action_execution.action_execution_client import (
+ ActionExecutionClient,
+ )
+ assert callable(getattr(ActionExecutionClient, 'terminus_2_cmd_run'))
+
+
+# ==============================================================================
+# Terminal Screen Formatting Tests
+# ==============================================================================
+
+
+class TestTerminalScreenFormatting:
+ """Tests for _format_terminal_screen logic in the action execution server.
+
+ Since ActionExecutor has heavy dependencies (FastAPI, BashSession, etc.),
+ we re-implement the pure formatting logic here to test it in isolation.
+ This mirrors ActionExecutor._format_terminal_screen exactly.
+ """
+
+ @staticmethod
+ def _format_terminal_screen(obs, command, pre_cwd=None):
+ """Pure-function copy of ActionExecutor._format_terminal_screen."""
+ meta = obs.metadata
+ username = meta.username or 'root'
+ hostname = meta.hostname or 'sandbox'
+ post_cwd = meta.working_dir or '/'
+ suffix = '#' if username == 'root' else '$'
+
+ before_cwd = pre_cwd if pre_cwd else post_cwd
+ pre_prompt = f'{username}@{hostname}:{before_cwd}{suffix} '
+ post_prompt = f'{username}@{hostname}:{post_cwd}{suffix} '
+
+ lines = [f'{pre_prompt}{command}']
+ if obs.content.strip():
+ lines.append(obs.content)
+ lines.append(post_prompt)
+ return '\n'.join(lines)
+
+ def _make_obs(self, content, username=None, hostname=None, working_dir=None):
+ from openhands.events.observation.commands import (
+ CmdOutputMetadata,
+ CmdOutputObservation,
+ )
+ metadata = CmdOutputMetadata(
+ exit_code=0,
+ username=username,
+ hostname=hostname,
+ working_dir=working_dir,
+ )
+ return CmdOutputObservation(
+ content=content,
+ command='test',
+ metadata=metadata,
+ )
+
+ def test_basic_formatting(self):
+ obs = self._make_obs(
+ 'file1.txt\nfile2.txt',
+ username='root',
+ hostname='abc123',
+ working_dir='/app',
+ )
+ result = self._format_terminal_screen(obs, 'ls')
+
+ assert result.startswith('root@abc123:/app# ls')
+ assert 'file1.txt' in result
+ assert 'file2.txt' in result
+ assert result.endswith('root@abc123:/app# ')
+
+ def test_root_user_gets_hash_prompt(self):
+ obs = self._make_obs('', username='root', hostname='h', working_dir='/')
+ result = self._format_terminal_screen(obs, 'pwd')
+ assert 'root@h:/# pwd' in result
+
+ def test_non_root_user_gets_dollar_prompt(self):
+ obs = self._make_obs('', username='developer', hostname='h', working_dir='/home')
+ result = self._format_terminal_screen(obs, 'pwd')
+ assert 'developer@h:/home$ pwd' in result
+
+ def test_empty_content_no_extra_lines(self):
+ obs = self._make_obs('', username='root', hostname='h', working_dir='/')
+ result = self._format_terminal_screen(obs, 'true')
+ lines = result.split('\n')
+ assert len(lines) == 2
+ assert lines[0] == 'root@h:/# true'
+ assert lines[1] == 'root@h:/# '
+
+ def test_multiline_output(self):
+ obs = self._make_obs(
+ 'line1\nline2\nline3',
+ username='root',
+ hostname='box',
+ working_dir='/tmp',
+ )
+ result = self._format_terminal_screen(obs, 'cat file')
+ lines = result.split('\n')
+ assert lines[0] == 'root@box:/tmp# cat file'
+ assert lines[1] == 'line1'
+ assert lines[2] == 'line2'
+ assert lines[3] == 'line3'
+ assert lines[4] == 'root@box:/tmp# '
+
+ def test_defaults_when_metadata_missing(self):
+ obs = self._make_obs('output', username=None, hostname=None, working_dir=None)
+ result = self._format_terminal_screen(obs, 'echo hi')
+ assert result.startswith('root@sandbox:/#')
+ assert 'output' in result
+
+ def test_special_key_ctrl_c_display(self):
+ obs = self._make_obs('', username='root', hostname='h', working_dir='/app')
+ result = self._format_terminal_screen(obs, '^C')
+ assert 'root@h:/app# ^C' in result
+
+ def test_whitespace_only_content_treated_as_empty(self):
+ obs = self._make_obs(' \n \n ', username='root', hostname='h', working_dir='/')
+ result = self._format_terminal_screen(obs, 'true')
+ lines = result.split('\n')
+ assert len(lines) == 2
+
+ def test_prompt_appears_at_end(self):
+ obs = self._make_obs(
+ 'some output',
+ username='root',
+ hostname='container',
+ working_dir='/workspace',
+ )
+ result = self._format_terminal_screen(obs, 'echo hi')
+ assert result.endswith('root@container:/workspace# ')
+
+ def test_long_command_preserved(self):
+ long_cmd = 'find / -name "*.py" -exec grep -l "import os" {} \\;'
+ obs = self._make_obs('result', username='root', hostname='h', working_dir='/')
+ result = self._format_terminal_screen(obs, long_cmd)
+ assert long_cmd in result.split('\n')[0]
+
+ def test_cd_pre_cwd_differs_from_post_cwd(self):
+ """cd /app/src: pre-command prompt shows /app, post-command prompt shows /app/src."""
+ obs = self._make_obs(
+ '', username='root', hostname='host', working_dir='/app/src'
+ )
+ result = self._format_terminal_screen(obs, 'cd /app/src', pre_cwd='/app')
+ lines = result.split('\n')
+ assert lines[0] == 'root@host:/app# cd /app/src'
+ assert lines[1] == 'root@host:/app/src# '
+
+ def test_no_pre_cwd_uses_post_cwd_for_both(self):
+ """Without pre_cwd, both prompts use the post-execution cwd (backward compat)."""
+ obs = self._make_obs(
+ '', username='root', hostname='h', working_dir='/new'
+ )
+ result = self._format_terminal_screen(obs, 'cd /new')
+ lines = result.split('\n')
+ assert lines[0] == 'root@h:/new# cd /new'
+ assert lines[1] == 'root@h:/new# '
+
+ def test_non_cd_command_same_cwd(self):
+ """Normal command: pre_cwd == post_cwd, both prompts identical."""
+ obs = self._make_obs(
+ 'file.py', username='root', hostname='h', working_dir='/app'
+ )
+ result = self._format_terminal_screen(obs, 'ls', pre_cwd='/app')
+ lines = result.split('\n')
+ assert lines[0] == 'root@h:/app# ls'
+ assert lines[-1] == 'root@h:/app# '
+
+
+# ==============================================================================
+# Terminal Output Prefix Tests
+# ==============================================================================
+
+
+class TestTerminalOutputPrefixes:
+ """Tests that the server adds correct prefixes to terminal output.
+
+ In the original Terminus-2:
+ - "Current Terminal Screen:" for initial captures and timed-out commands
+ - "New Terminal Output:" for normal command execution output
+ """
+
+ def test_initial_capture_gets_current_screen_prefix(self):
+ """Empty keystrokes (initial capture) should use 'Current Terminal Screen:' prefix."""
+ terminal_state = 'Current Terminal Screen:\nroot@host:/app# pwd\n/app\nroot@host:/app# '
+ assert terminal_state.startswith('Current Terminal Screen:')
+ assert 'root@host:/app#' in terminal_state
+
+ def test_normal_command_gets_new_output_prefix(self):
+ """Regular command output should use 'New Terminal Output:' prefix."""
+ terminal_state = 'New Terminal Output:\nroot@host:/app# ls\nfile.py\nroot@host:/app# '
+ assert terminal_state.startswith('New Terminal Output:')
+ assert 'file.py' in terminal_state
+
+ def test_timed_out_command_gets_current_screen_prefix(self):
+ """Timed-out commands should use 'Current Terminal Screen:' prefix."""
+ terminal_state = 'Current Terminal Screen:\nroot@host:/app# sleep 100\n'
+ assert terminal_state.startswith('Current Terminal Screen:')
+
+ def test_prefix_followed_by_newline_then_content(self):
+ """Prefix should be followed by newline then the actual screen content."""
+ screen = 'root@host:/app# ls\nfile.py\nroot@host:/app# '
+ prefixed = f'New Terminal Output:\n{screen}'
+ parts = prefixed.split('\n', 1)
+ assert parts[0] == 'New Terminal Output:'
+ assert parts[1] == screen
+
+ def test_initial_message_includes_prefix_from_terminal_state(self):
+ """When building initial user message, the terminal_state already has the prefix."""
+ task = 'Fix the bug in main.py'
+ terminal_state = 'Current Terminal Screen:\nroot@host:/app# '
+ initial_msg = f'{task}\n\n{terminal_state}'
+ assert 'Current Terminal Screen:' in initial_msg
+ assert 'Fix the bug' in initial_msg
+
+ def test_subsequent_output_includes_prefix(self):
+ """Subsequent terminal observations have their prefix baked in."""
+ terminal_state = 'New Terminal Output:\nroot@host:/app# echo hello\nhello\nroot@host:/app# '
+ assert terminal_state.startswith('New Terminal Output:')
+ content_after_prefix = terminal_state[len('New Terminal Output:\n'):]
+ assert content_after_prefix.startswith('root@host:/app#')
+
+
+# ==============================================================================
+# Batch Observation Combination Tests
+# ==============================================================================
+
+
+class TestCombineObservations:
+ """Tests for _combine_observations which merges multiple terminal outputs
+ from a command batch into a single user message, matching the original
+ Terminus-2 behavior where get_incremental_output() captures all commands.
+ """
+
+ def test_single_observation_returned_as_is(self):
+ obs = ['New Terminal Output:\nroot@h:/# ls\nfile.py\nroot@h:/# ']
+ result = Terminus2Agent._combine_observations(obs)
+ assert result == obs[0]
+
+ def test_empty_list_returns_empty_string(self):
+ result = Terminus2Agent._combine_observations([])
+ assert result == ''
+
+ def test_two_observations_combined_under_single_prefix(self):
+ obs1 = 'New Terminal Output:\nroot@h:/app# ls\nfile.py\nroot@h:/app# '
+ obs2 = 'New Terminal Output:\nroot@h:/app# pwd\n/app\nroot@h:/app# '
+ result = Terminus2Agent._combine_observations([obs1, obs2])
+ assert result.startswith('New Terminal Output:\n')
+ assert result.count('New Terminal Output:') == 1
+ assert 'ls' in result
+ assert 'file.py' in result
+ assert 'pwd' in result
+ assert '/app' in result
+
+ def test_three_observations_all_content_present(self):
+ obs1 = 'New Terminal Output:\nroot@h:/# ls -l\ntotal 4\nroot@h:/# '
+ obs2 = 'New Terminal Output:\nroot@h:/# ls *.py\nscript.py\nroot@h:/# '
+ obs3 = 'New Terminal Output:\nroot@h:/# grep foo .\n./match\nroot@h:/# '
+ result = Terminus2Agent._combine_observations([obs1, obs2, obs3])
+ assert result.count('New Terminal Output:') == 1
+ assert 'ls -l' in result
+ assert 'total 4' in result
+ assert 'script.py' in result
+ assert 'grep foo' in result
+ assert './match' in result
+
+ def test_mixed_prefixes_uses_last(self):
+ """If last observation was a timeout (Current Terminal Screen:), use that prefix."""
+ obs1 = 'New Terminal Output:\nroot@h:/# ls\nfile.py\nroot@h:/# '
+ obs2 = 'Current Terminal Screen:\nroot@h:/# sleep 100\n'
+ result = Terminus2Agent._combine_observations([obs1, obs2])
+ assert result.startswith('Current Terminal Screen:\n')
+ assert result.count('Current Terminal Screen:') == 1
+ assert 'ls' in result
+ assert 'sleep 100' in result
+
+ def test_no_prefix_observations_preserved(self):
+ """Observations without a recognized prefix are included as-is."""
+ obs1 = 'some raw output'
+ obs2 = 'New Terminal Output:\nroot@h:/# pwd\n/\nroot@h:/# '
+ result = Terminus2Agent._combine_observations([obs1, obs2])
+ assert 'some raw output' in result
+ assert 'pwd' in result
+
+ def test_error_mixed_with_observations(self):
+ """ERROR observations (no prefix) are combined with normal observations."""
+ obs1 = 'New Terminal Output:\nroot@h:/# ls\nfile.py\nroot@h:/# '
+ obs2 = 'ERROR: command failed'
+ result = Terminus2Agent._combine_observations([obs1, obs2])
+ assert 'file.py' in result
+ assert 'ERROR: command failed' in result
diff --git a/tests/unit/agenthub/test_terminus_2_parser.py b/tests/unit/agenthub/test_terminus_2_parser.py
new file mode 100644
index 000000000000..37bfab637e85
--- /dev/null
+++ b/tests/unit/agenthub/test_terminus_2_parser.py
@@ -0,0 +1,416 @@
+"""Unit tests for Terminus-2 JSON plain parser.
+
+Tests the TerminusJSONPlainParser for parsing LLM responses into
+structured commands, including auto-correction and validation.
+"""
+
+import pytest
+
+from openhands.agenthub.terminus_2_agent.terminus_json_plain_parser import (
+ ParsedCommand,
+ ParseResult,
+ TerminusJSONPlainParser,
+)
+
+
+@pytest.fixture
+def parser():
+ return TerminusJSONPlainParser()
+
+
+# ==============================================================================
+# Basic Parsing Tests
+# ==============================================================================
+
+
+class TestBasicParsing:
+ """Tests for basic JSON response parsing."""
+
+ def test_parse_valid_response(self, parser):
+ response = '''{
+ "analysis": "Looking at the directory",
+ "plan": "List files and check structure",
+ "commands": [
+ {"keystrokes": "ls -la\\n", "duration": 0.1}
+ ]
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert len(result.commands) == 1
+ assert result.commands[0].keystrokes == 'ls -la\n'
+ assert result.commands[0].duration == 0.1
+ assert result.is_task_complete is False
+
+ def test_parse_multiple_commands(self, parser):
+ response = '''{
+ "analysis": "Need to navigate and list",
+ "plan": "cd then ls",
+ "commands": [
+ {"keystrokes": "cd /tmp\\n", "duration": 0.1},
+ {"keystrokes": "ls -la\\n", "duration": 0.1}
+ ]
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert len(result.commands) == 2
+ assert result.commands[0].keystrokes == 'cd /tmp\n'
+ assert result.commands[1].keystrokes == 'ls -la\n'
+
+ def test_parse_task_complete(self, parser):
+ response = '''{
+ "analysis": "Task is done",
+ "plan": "Mark complete",
+ "commands": [],
+ "task_complete": true
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert result.is_task_complete is True
+ assert len(result.commands) == 0
+
+ def test_parse_task_complete_string(self, parser):
+ response = '''{
+ "analysis": "Done",
+ "plan": "Finish",
+ "commands": [],
+ "task_complete": "true"
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert result.is_task_complete is True
+
+ def test_parse_task_not_complete(self, parser):
+ response = '''{
+ "analysis": "Still working",
+ "plan": "Continue",
+ "commands": [],
+ "task_complete": false
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert result.is_task_complete is False
+
+ def test_parse_no_task_complete_field(self, parser):
+ """task_complete defaults to False when not present."""
+ response = '''{
+ "analysis": "Working",
+ "plan": "Continue",
+ "commands": []
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert result.is_task_complete is False
+
+ def test_parse_empty_commands(self, parser):
+ response = '''{
+ "analysis": "Waiting for output",
+ "plan": "Do nothing",
+ "commands": []
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert len(result.commands) == 0
+
+
+# ==============================================================================
+# Error Handling Tests
+# ==============================================================================
+
+
+class TestErrorHandling:
+ """Tests for error detection and reporting."""
+
+ def test_no_json_found(self, parser):
+ result = parser.parse_response('This is not JSON at all')
+ assert 'No valid JSON found' in result.error
+
+ def test_invalid_json(self, parser):
+ result = parser.parse_response('{"analysis": "test", "plan": broken}')
+ assert result.error != ''
+
+ def test_missing_required_field_analysis(self, parser):
+ response = '''{
+ "plan": "Do something",
+ "commands": []
+ }'''
+ result = parser.parse_response(response)
+ assert 'Missing required fields' in result.error
+ assert 'analysis' in result.error
+
+ def test_missing_required_field_plan(self, parser):
+ response = '''{
+ "analysis": "Something",
+ "commands": []
+ }'''
+ result = parser.parse_response(response)
+ assert 'Missing required fields' in result.error
+ assert 'plan' in result.error
+
+ def test_missing_required_field_commands(self, parser):
+ response = '''{
+ "analysis": "Something",
+ "plan": "Do it"
+ }'''
+ result = parser.parse_response(response)
+ assert 'Missing required fields' in result.error
+ assert 'commands' in result.error
+
+ def test_commands_not_array(self, parser):
+ response = '''{
+ "analysis": "Something",
+ "plan": "Do it",
+ "commands": "not an array"
+ }'''
+ result = parser.parse_response(response)
+ assert "must be an array" in result.error
+
+ def test_command_not_object(self, parser):
+ response = '''{
+ "analysis": "Something",
+ "plan": "Do it",
+ "commands": ["not an object"]
+ }'''
+ result = parser.parse_response(response)
+ assert 'must be an object' in result.error
+
+ def test_command_missing_keystrokes(self, parser):
+ response = '''{
+ "analysis": "Something",
+ "plan": "Do it",
+ "commands": [{"duration": 1.0}]
+ }'''
+ result = parser.parse_response(response)
+ assert "missing required 'keystrokes' field" in result.error
+
+ def test_command_keystrokes_not_string(self, parser):
+ response = '''{
+ "analysis": "Something",
+ "plan": "Do it",
+ "commands": [{"keystrokes": 123}]
+ }'''
+ result = parser.parse_response(response)
+ assert "'keystrokes' must be a string" in result.error
+
+ def test_not_json_object(self, parser):
+ response = '["not", "an", "object"]'
+ assert parser.parse_response(response).error != ''
+
+
+# ==============================================================================
+# Warning Tests
+# ==============================================================================
+
+
+class TestWarnings:
+ """Tests for warning generation."""
+
+ def test_extra_text_before_json(self, parser):
+ response = 'Here is my response:\n{"analysis": "a", "plan": "b", "commands": []}'
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert 'Extra text detected before JSON' in result.warning
+
+ def test_extra_text_after_json(self, parser):
+ response = '{"analysis": "a", "plan": "b", "commands": []}\nDone!'
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert 'Extra text detected after JSON' in result.warning
+
+ def test_missing_duration_warning(self, parser):
+ response = '''{
+ "analysis": "a",
+ "plan": "b",
+ "commands": [{"keystrokes": "ls\\n"}]
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert 'Missing duration field' in result.warning
+ assert result.commands[0].duration == 1.0
+
+ def test_invalid_duration_type_warning(self, parser):
+ response = '''{
+ "analysis": "a",
+ "plan": "b",
+ "commands": [{"keystrokes": "ls\\n", "duration": "fast"}]
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert 'Invalid duration value' in result.warning
+ assert result.commands[0].duration == 1.0
+
+ def test_unknown_fields_warning(self, parser):
+ response = '''{
+ "analysis": "a",
+ "plan": "b",
+ "commands": [{"keystrokes": "ls\\n", "duration": 0.1, "extra_field": "x"}]
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert 'Unknown fields' in result.warning
+
+ def test_no_newline_between_commands_warning(self, parser):
+ response = '''{
+ "analysis": "a",
+ "plan": "b",
+ "commands": [
+ {"keystrokes": "echo hello", "duration": 0.1},
+ {"keystrokes": "ls\\n", "duration": 0.1}
+ ]
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert 'should end with newline' in result.warning
+
+ def test_wrong_field_order_warning(self, parser):
+ response = '''{
+ "commands": [],
+ "analysis": "a",
+ "plan": "b"
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert 'wrong order' in result.warning
+
+
+# ==============================================================================
+# Auto-Fix Tests
+# ==============================================================================
+
+
+class TestAutoFixes:
+ """Tests for auto-correction of malformed responses."""
+
+ def test_fix_incomplete_json(self, parser):
+ response = '{"analysis": "a", "plan": "b", "commands": [{"keystrokes": "ls\\n", "duration": 0.1}]'
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert 'AUTO-CORRECTED' in result.warning
+ assert len(result.commands) == 1
+
+ def test_fix_mixed_content(self, parser):
+ response = 'Here is my analysis:\n{"analysis": "a", "plan": "b", "commands": []}\nEnd of response'
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+
+ def test_deeply_incomplete_json(self, parser):
+ response = '{"analysis": "a", "plan": "b", "commands": [{"keystrokes": "ls\\n"'
+ result = parser.parse_response(response)
+ # May or may not fix - just ensure no crash
+ assert isinstance(result, ParseResult)
+
+
+# ==============================================================================
+# Edge Cases
+# ==============================================================================
+
+
+class TestEdgeCases:
+ """Tests for edge cases and special scenarios."""
+
+ def test_special_characters_in_keystrokes(self, parser):
+ response = '''{
+ "analysis": "a",
+ "plan": "b",
+ "commands": [{"keystrokes": "echo \\"hello world\\"\\n", "duration": 0.1}]
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert result.commands[0].keystrokes == 'echo "hello world"\n'
+
+ def test_ctrl_c_keystrokes(self, parser):
+ response = '''{
+ "analysis": "a",
+ "plan": "b",
+ "commands": [{"keystrokes": "C-c", "duration": 0.1}]
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert result.commands[0].keystrokes == 'C-c'
+
+ def test_empty_keystrokes(self, parser):
+ response = '''{
+ "analysis": "a",
+ "plan": "b",
+ "commands": [{"keystrokes": "", "duration": 10.0}]
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert result.commands[0].keystrokes == ''
+ assert result.commands[0].duration == 10.0
+
+ def test_empty_response(self, parser):
+ result = parser.parse_response('')
+ assert result.error != ''
+
+ def test_task_complete_with_parse_error_becomes_warning(self, parser):
+ """When task_complete is true, command parse errors become warnings."""
+ response = '''{
+ "analysis": "Done",
+ "plan": "Finish",
+ "commands": [{"not_keystrokes": "x"}],
+ "task_complete": true
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert result.is_task_complete is True
+ assert len(result.commands) == 0
+
+ def test_large_number_of_commands(self, parser):
+ commands = ', '.join(
+ [f'{{"keystrokes": "echo {i}\\n", "duration": 0.1}}' for i in range(50)]
+ )
+ response = f'{{"analysis": "a", "plan": "b", "commands": [{commands}]}}'
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert len(result.commands) == 50
+
+ def test_duration_integer_cast(self, parser):
+ response = '''{
+ "analysis": "a",
+ "plan": "b",
+ "commands": [{"keystrokes": "ls\\n", "duration": 5}]
+ }'''
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+ assert result.commands[0].duration == 5.0
+ assert isinstance(result.commands[0].duration, float)
+
+ def test_json_with_markdown_code_fence(self, parser):
+ """Common LLM mistake: wrapping JSON in code fences."""
+ response = '```json\n{"analysis": "a", "plan": "b", "commands": []}\n```'
+ result = parser.parse_response(response)
+
+ assert result.error == ''
+
+ def test_task_complete_string_variants(self, parser):
+ for value in ['true', 'True', 'TRUE', '1', 'yes', 'Yes']:
+ response = f'{{"analysis": "a", "plan": "b", "commands": [], "task_complete": "{value}"}}'
+ result = parser.parse_response(response)
+ assert result.is_task_complete is True, f'Failed for value: {value}'
+
+ for value in ['false', 'False', '0', 'no']:
+ response = f'{{"analysis": "a", "plan": "b", "commands": [], "task_complete": "{value}"}}'
+ result = parser.parse_response(response)
+ assert result.is_task_complete is False, f'Failed for value: {value}'