feat: 为local环境添加交互式shell工具#8116
Conversation
… and timeout cleanup
…trBot into interactive_shell
There was a problem hiding this comment.
Hey - I've found 4 issues, and left some high level feedback:
- In LocalInteractiveShellComponent.terminate you call proc.send_signal(subprocess.signal.CTRL_C_EVENT), but subprocess has no signal attribute; this will raise at runtime on Windows—import and use signal.CTRL_C_EVENT (and SIGINT for POSIX) instead.
- The read() method treats max_chars=0 as "no limit" because of the
if max_chars and ...checks; if you intend 0 to be a meaningful boundary, normalize or explicitly validate non-positive values to avoid surprising behavior.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In LocalInteractiveShellComponent.terminate you call proc.send_signal(subprocess.signal.CTRL_C_EVENT), but subprocess has no signal attribute; this will raise at runtime on Windows—import and use signal.CTRL_C_EVENT (and SIGINT for POSIX) instead.
- The read() method treats max_chars=0 as "no limit" because of the `if max_chars and ...` checks; if you intend 0 to be a meaningful boundary, normalize or explicitly validate non-positive values to avoid surprising behavior.
## Individual Comments
### Comment 1
<location path="astrbot/core/computer/booters/local_interactive_shell.py" line_range="195-204" />
<code_context>
+ exit_code = proc.returncode
+ else:
+ if graceful:
+ if sys.platform == "win32":
+ try:
+ proc.send_signal(subprocess.signal.CTRL_C_EVENT)
+ except (ValueError, OSError):
+ pass
</code_context>
<issue_to_address>
**issue (bug_risk):** Using `subprocess.signal` will fail; use the `signal` module instead.
In the Windows graceful-termination path, `proc.send_signal(subprocess.signal.CTRL_C_EVENT)` will raise `AttributeError` because `signal` is a separate stdlib module. Import `signal` and use `proc.send_signal(signal.CTRL_C_EVENT)` here (and `signal.SIGINT` on POSIX) to prevent runtime failures when terminating sessions.
</issue_to_address>
### Comment 2
<location path="astrbot/core/tools/computer_tools/interactive_shell.py" line_range="88-97" />
<code_context>
+ env: dict[str, Any] | None = None,
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Normalize `env` values to strings before passing to the interactive shell component.
`InteractiveShellComponent.start` expects `env: dict[str, str] | None`, but this tool takes `dict[str, Any]` and forwards it after only a shallow `dict()` copy. Other backends may assume string keys/values and fail on non-string entries. Please normalize here, e.g. `env = {str(k): str(v) for k, v in (env or {}).items()}`, so the tool always satisfies the protocol contract.
Suggested implementation:
```python
async def call(
self,
context: ContextWrapper[AstrAgentContext],
command: str,
env: dict[str, Any] | None = None,
) -> ToolExecResult:
# Normalize env to satisfy InteractiveShellComponent.start's env: dict[str, str] | None
if env is not None:
env = {str(k): str(v) for k, v in env.items()}
if permission_error := check_admin_permission(
context, "Interactive shell start"
):
return permission_error
sb = await get_booter(
context.context.context,
context.context.event.unified_msg_origin,
)
```
If `env` is later re-typed or forwarded to other helpers that still declare `dict[str, Any]`, you may optionally narrow their type hints to `dict[str, str]` to reflect the normalized contract, but this is not required for runtime correctness.
</issue_to_address>
### Comment 3
<location path="astrbot/core/computer/booters/local_interactive_shell.py" line_range="81" />
<code_context>
+ except Exception as e:
+ logger.warning("[InteractiveShell] Cleanup error: %s", e)
+
+ def _cleanup_terminated(self) -> None:
+ """Remove sessions for processes that have exited."""
+ to_remove: list[tuple[str, _LocalInteractiveSession]] = []
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the shared session disposal and output-decoding logic into dedicated helpers to simplify the lifecycle and read loop while preserving behaviour.
You can reduce complexity without changing behaviour by factoring out the duplicated cleanup logic and the decoding logic into focused helpers.
### 1. Factor session cleanup into a single helper
`_cleanup_terminated`, `_cleanup_idle_sessions`, and `terminate` all repeat:
- `session.stop_reading.set()`
- kill/terminate process
- close pipes
- join `read_threads`
- remove from `_sessions`
You can centralize this in a private helper and call it from all three places.
```python
def _dispose_session(
self,
session_id: str,
session: _LocalInteractiveSession,
*,
graceful: bool,
idle_kill: bool = False,
) -> int | None:
proc = session.process
session.stop_reading.set()
exit_code: int | None = proc.returncode if proc.poll() is not None else None
if proc.poll() is None:
if graceful and not idle_kill:
# existing Ctrl-C / SIGINT logic extracted here
if sys.platform == "win32":
try:
proc.send_signal(subprocess.signal.CTRL_C_EVENT)
except (ValueError, OSError):
pass
else:
try:
proc.send_signal(subprocess.signal.SIGINT)
except (ValueError, OSError):
pass
try:
exit_code = proc.wait(timeout=3.0)
except subprocess.TimeoutExpired:
exit_code = None
if proc.poll() is None:
proc.kill()
try:
exit_code = proc.wait(timeout=2.0)
except subprocess.TimeoutExpired:
exit_code = None
if proc.poll() is None:
exit_code = -9
for pipe in (proc.stdin, proc.stdout, proc.stderr):
if pipe:
try:
pipe.close()
except Exception:
pass
for t in session.read_threads:
if t.is_alive():
t.join(timeout=1.0)
with self._session_lock:
self._sessions.pop(session_id, None)
return exit_code
```
Then `terminate` becomes much slimmer:
```python
def _terminate() -> InteractiveSession:
session = self._get_session(session_id)
exit_code = self._dispose_session(session_id, session, graceful=graceful)
logger.info(
"[InteractiveShell] Terminated session %s (exit_code=%s)",
session_id,
exit_code,
)
return InteractiveSession(
session_id=session_id,
command=session.command,
pid=session.process.pid,
state=InteractiveSessionState.TERMINATED,
exit_code=exit_code,
created_at=session.created_at,
last_activity=session.last_activity,
)
```
And `_cleanup_terminated` / `_cleanup_idle_sessions` only decide *which* sessions to dispose and with what flags, without re‑implementing the lifecycle:
```python
def _cleanup_terminated(self) -> None:
to_remove: list[tuple[str, _LocalInteractiveSession]] = []
with self._session_lock:
for session_id, session in self._sessions.items():
if session.process.poll() is not None:
to_remove.append((session_id, session))
for session_id, session in to_remove:
self._dispose_session(session_id, session, graceful=False, idle_kill=False)
logger.info(
"[InteractiveShell] Cleaned up terminated session: %s",
session_id,
)
```
```python
def _cleanup_idle_sessions(self) -> None:
now = time.time()
to_remove: list[tuple[str, _LocalInteractiveSession]] = []
with self._session_lock:
for session_id, session in self._sessions.items():
if session.process.poll() is None:
idle_time = now - session.last_activity
if idle_time > self._session_timeout_seconds:
to_remove.append((session_id, session))
for session_id, session in to_remove:
logger.warning(
"[InteractiveShell] Session %s idle for %.0fs, forcing termination",
session_id,
self._session_timeout_seconds,
)
self._dispose_session(session_id, session, graceful=False, idle_kill=True)
```
This keeps all existing semantics, while removing three separate hand‑rolled cleanup flows.
---
### 2. Extract decoding into a helper
The `read` method’s inner loop is doing a lot; the decoding path in particular adds branches and nested loops. Moving that logic into a small helper makes the hot path easier to follow and test.
```python
def _decode_chunk(chunk: bytes) -> str:
if not chunk:
return ""
text = chunk.decode("utf-8", errors="replace")
if sys.platform != "win32" or "\ufffd" not in text or len(text) <= 1:
return text
# Windows fallback encodings, same behaviour as today
for fallback_encoding in ("gbk", "gb18030", "cp936"):
try:
fallback_text = chunk.decode(fallback_encoding)
if "\ufffd" not in fallback_text:
return fallback_text
except (UnicodeDecodeError, LookupError):
continue
return text
```
Then the chunk handling in `_read` becomes linear:
```python
chunks = [(stdout_chunk, False), (stderr_chunk, True)]
for chunk, is_stderr in chunks:
if not chunk:
continue
text = self._decode_chunk(chunk)
if max_chars and chars_collected + len(text) > max_chars:
take = max_chars - chars_collected
result_parts.append(text[:take])
overflow = text[take:].encode("utf-8", errors="replace")
with session.lock:
if is_stderr:
session.stderr_buffer[:0] = overflow
else:
session.stdout_buffer[:0] = overflow
chars_collected += take
has_data = True
break
result_parts.append(text)
chars_collected += len(text)
has_data = True
```
This keeps the Windows multi‑encoding behaviour intact but moves the complexity out of the main read loop, making the control flow (buffer drain → decode → max_chars handling) much easier to scan.
</issue_to_address>
### Comment 4
<location path="astrbot/core/tools/computer_tools/interactive_shell.py" line_range="90" />
<code_context>
+ command: str,
+ env: dict[str, Any] | None = None,
+ ) -> ToolExecResult:
+ if permission_error := check_admin_permission(
+ context, "Interactive shell start"
+ ):
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting shared permission checks, interactive shell lookup, JSON response helpers, and optional session/hint utilities into small reusable functions to reduce duplication and keep each tool’s call method focused on its core behavior.
You can significantly reduce duplication/complexity by centralizing the common permission/bootstrapping/JSON logic. Here are a few focused refactors that keep behavior unchanged:
### 1. Centralize permission + booter + interactive_shell resolution
All tools repeat the same pattern:
- `check_admin_permission`
- `get_booter(...)`
- `ish = sb.interactive_shell; if ish is None: ...`
You can encapsulate this:
```python
async def _get_interactive_shell(
context: ContextWrapper[AstrAgentContext],
action_name: str,
*,
allow_missing: bool = False,
) -> tuple[Any | None, ToolExecResult | None]:
if permission_error := check_admin_permission(context, action_name):
return None, permission_error
sb = await get_booter(
context.context.context,
context.context.event.unified_msg_origin,
)
ish = sb.interactive_shell
if ish is None and not allow_missing:
return None, _err("Interactive shell is not supported by the current runtime.")
return ish, None
```
Then e.g. `InteractiveShellSendTool.call` can focus on its core logic:
```python
async def call(
self,
context: ContextWrapper[AstrAgentContext],
session_id: str,
input: str,
send_eof: bool = False,
) -> ToolExecResult:
ish, error = await _get_interactive_shell(context, "Interactive shell send")
if error:
return error
try:
await ish.send(session_id, input, send_eof=send_eof)
return _ok({"message": "Input sent successfully."})
except ValueError as e:
return _err(f"Session not found: {e}")
except Exception as e:
return _err(f"Failed to send input: {e}")
```
`InteractiveShellListTool` can use `allow_missing=True` to preserve the current “success=True + empty list” behavior:
```python
ish, error = await _get_interactive_shell(
context,
"Interactive shell list",
allow_missing=True,
)
if error:
return error
if ish is None:
return _ok({
"sessions": [],
"message": "Interactive shell is not available in this runtime.",
})
```
### 2. Small JSON response helpers
All tools manually build JSON + `json.dumps(..., ensure_ascii=False)`. A tiny helper keeps each tool’s `call` body focused:
```python
def _ok(payload: dict[str, Any] | None = None) -> str:
data: dict[str, Any] = {"success": True}
if payload:
data.update(payload)
return json.dumps(data, ensure_ascii=False)
def _err(message: str) -> str:
return json.dumps({"success": False, "error": message}, ensure_ascii=False)
```
Then, for example, `InteractiveShellStopTool.call` becomes:
```python
async def call(
self,
context: ContextWrapper[AstrAgentContext],
session_id: str,
force: bool = False,
) -> ToolExecResult:
ish, error = await _get_interactive_shell(context, "Interactive shell stop")
if error:
return error
try:
session = await ish.terminate(session_id, graceful=not force)
return _ok({
"session": _session_to_dict(session),
"message": "Session terminated.",
})
except ValueError as e:
return _err(f"Session not found: {e}")
except Exception as e:
return _err(f"Failed to terminate session: {e}")
```
### 3. Optional session dict helper
The `get_session` + conditional dict conversion pattern appears multiple times:
```python
session = await ish.get_session(session_id)
state_info = _session_to_dict(session) if session else None
```
You can encapsulate it to reduce small but repeated branching:
```python
def _optional_session_to_dict(session) -> dict[str, Any] | None:
return _session_to_dict(session) if session is not None else None
```
Usage in `InteractiveShellReadTool`:
```python
session = await ish.get_session(session_id)
state_info = _optional_session_to_dict(session)
```
### 4. Optionally move long hints to constants
The long `hint`/description strings inflate each `call` method. You can keep the user-facing text but move it to constants near the top of the file:
```python
_READ_HINT = (
"Analyze the output to determine if the program is: "
"(1) waiting for input (shows a prompt), "
"(2) still processing (no prompt yet), or "
"(3) has finished (exited)."
)
```
Then:
```python
return _ok({
"output": output,
"session": state_info,
"hint": _READ_HINT,
})
```
These changes keep all behavior and responses intact while making each tool’s `call` implementation much shorter and easier to scan.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| if sys.platform == "win32": | ||
| run_env["PYTHONIOENCODING"] = "utf-8" | ||
|
|
||
| # Use binary mode for reliable cross-platform pipe behavior | ||
| popen_kwargs: dict[str, Any] = { | ||
| "shell": shell, | ||
| "cwd": working_dir, | ||
| "env": run_env, | ||
| "stdin": subprocess.PIPE, | ||
| "stdout": subprocess.PIPE, |
There was a problem hiding this comment.
issue (bug_risk): Using subprocess.signal will fail; use the signal module instead.
In the Windows graceful-termination path, proc.send_signal(subprocess.signal.CTRL_C_EVENT) will raise AttributeError because signal is a separate stdlib module. Import signal and use proc.send_signal(signal.CTRL_C_EVENT) here (and signal.SIGINT on POSIX) to prevent runtime failures when terminating sessions.
| env: dict[str, Any] | None = None, | ||
| ) -> ToolExecResult: | ||
| if permission_error := check_admin_permission( | ||
| context, "Interactive shell start" | ||
| ): | ||
| return permission_error | ||
|
|
||
| sb = await get_booter( | ||
| context.context.context, | ||
| context.context.event.unified_msg_origin, |
There was a problem hiding this comment.
suggestion (bug_risk): Normalize env values to strings before passing to the interactive shell component.
InteractiveShellComponent.start expects env: dict[str, str] | None, but this tool takes dict[str, Any] and forwards it after only a shallow dict() copy. Other backends may assume string keys/values and fail on non-string entries. Please normalize here, e.g. env = {str(k): str(v) for k, v in (env or {}).items()}, so the tool always satisfies the protocol contract.
Suggested implementation:
async def call(
self,
context: ContextWrapper[AstrAgentContext],
command: str,
env: dict[str, Any] | None = None,
) -> ToolExecResult:
# Normalize env to satisfy InteractiveShellComponent.start's env: dict[str, str] | None
if env is not None:
env = {str(k): str(v) for k, v in env.items()}
if permission_error := check_admin_permission(
context, "Interactive shell start"
):
return permission_error
sb = await get_booter(
context.context.context,
context.context.event.unified_msg_origin,
)If env is later re-typed or forwarded to other helpers that still declare dict[str, Any], you may optionally narrow their type hints to dict[str, str] to reflect the normalized contract, but this is not required for runtime correctness.
| except Exception as e: | ||
| logger.warning("[InteractiveShell] Cleanup error: %s", e) | ||
|
|
||
| def _cleanup_terminated(self) -> None: |
There was a problem hiding this comment.
issue (complexity): Consider extracting the shared session disposal and output-decoding logic into dedicated helpers to simplify the lifecycle and read loop while preserving behaviour.
You can reduce complexity without changing behaviour by factoring out the duplicated cleanup logic and the decoding logic into focused helpers.
1. Factor session cleanup into a single helper
_cleanup_terminated, _cleanup_idle_sessions, and terminate all repeat:
session.stop_reading.set()- kill/terminate process
- close pipes
- join
read_threads - remove from
_sessions
You can centralize this in a private helper and call it from all three places.
def _dispose_session(
self,
session_id: str,
session: _LocalInteractiveSession,
*,
graceful: bool,
idle_kill: bool = False,
) -> int | None:
proc = session.process
session.stop_reading.set()
exit_code: int | None = proc.returncode if proc.poll() is not None else None
if proc.poll() is None:
if graceful and not idle_kill:
# existing Ctrl-C / SIGINT logic extracted here
if sys.platform == "win32":
try:
proc.send_signal(subprocess.signal.CTRL_C_EVENT)
except (ValueError, OSError):
pass
else:
try:
proc.send_signal(subprocess.signal.SIGINT)
except (ValueError, OSError):
pass
try:
exit_code = proc.wait(timeout=3.0)
except subprocess.TimeoutExpired:
exit_code = None
if proc.poll() is None:
proc.kill()
try:
exit_code = proc.wait(timeout=2.0)
except subprocess.TimeoutExpired:
exit_code = None
if proc.poll() is None:
exit_code = -9
for pipe in (proc.stdin, proc.stdout, proc.stderr):
if pipe:
try:
pipe.close()
except Exception:
pass
for t in session.read_threads:
if t.is_alive():
t.join(timeout=1.0)
with self._session_lock:
self._sessions.pop(session_id, None)
return exit_codeThen terminate becomes much slimmer:
def _terminate() -> InteractiveSession:
session = self._get_session(session_id)
exit_code = self._dispose_session(session_id, session, graceful=graceful)
logger.info(
"[InteractiveShell] Terminated session %s (exit_code=%s)",
session_id,
exit_code,
)
return InteractiveSession(
session_id=session_id,
command=session.command,
pid=session.process.pid,
state=InteractiveSessionState.TERMINATED,
exit_code=exit_code,
created_at=session.created_at,
last_activity=session.last_activity,
)And _cleanup_terminated / _cleanup_idle_sessions only decide which sessions to dispose and with what flags, without re‑implementing the lifecycle:
def _cleanup_terminated(self) -> None:
to_remove: list[tuple[str, _LocalInteractiveSession]] = []
with self._session_lock:
for session_id, session in self._sessions.items():
if session.process.poll() is not None:
to_remove.append((session_id, session))
for session_id, session in to_remove:
self._dispose_session(session_id, session, graceful=False, idle_kill=False)
logger.info(
"[InteractiveShell] Cleaned up terminated session: %s",
session_id,
)def _cleanup_idle_sessions(self) -> None:
now = time.time()
to_remove: list[tuple[str, _LocalInteractiveSession]] = []
with self._session_lock:
for session_id, session in self._sessions.items():
if session.process.poll() is None:
idle_time = now - session.last_activity
if idle_time > self._session_timeout_seconds:
to_remove.append((session_id, session))
for session_id, session in to_remove:
logger.warning(
"[InteractiveShell] Session %s idle for %.0fs, forcing termination",
session_id,
self._session_timeout_seconds,
)
self._dispose_session(session_id, session, graceful=False, idle_kill=True)This keeps all existing semantics, while removing three separate hand‑rolled cleanup flows.
2. Extract decoding into a helper
The read method’s inner loop is doing a lot; the decoding path in particular adds branches and nested loops. Moving that logic into a small helper makes the hot path easier to follow and test.
def _decode_chunk(chunk: bytes) -> str:
if not chunk:
return ""
text = chunk.decode("utf-8", errors="replace")
if sys.platform != "win32" or "\ufffd" not in text or len(text) <= 1:
return text
# Windows fallback encodings, same behaviour as today
for fallback_encoding in ("gbk", "gb18030", "cp936"):
try:
fallback_text = chunk.decode(fallback_encoding)
if "\ufffd" not in fallback_text:
return fallback_text
except (UnicodeDecodeError, LookupError):
continue
return textThen the chunk handling in _read becomes linear:
chunks = [(stdout_chunk, False), (stderr_chunk, True)]
for chunk, is_stderr in chunks:
if not chunk:
continue
text = self._decode_chunk(chunk)
if max_chars and chars_collected + len(text) > max_chars:
take = max_chars - chars_collected
result_parts.append(text[:take])
overflow = text[take:].encode("utf-8", errors="replace")
with session.lock:
if is_stderr:
session.stderr_buffer[:0] = overflow
else:
session.stdout_buffer[:0] = overflow
chars_collected += take
has_data = True
break
result_parts.append(text)
chars_collected += len(text)
has_data = TrueThis keeps the Windows multi‑encoding behaviour intact but moves the complexity out of the main read loop, making the control flow (buffer drain → decode → max_chars handling) much easier to scan.
| command: str, | ||
| env: dict[str, Any] | None = None, | ||
| ) -> ToolExecResult: | ||
| if permission_error := check_admin_permission( |
There was a problem hiding this comment.
issue (complexity): Consider extracting shared permission checks, interactive shell lookup, JSON response helpers, and optional session/hint utilities into small reusable functions to reduce duplication and keep each tool’s call method focused on its core behavior.
You can significantly reduce duplication/complexity by centralizing the common permission/bootstrapping/JSON logic. Here are a few focused refactors that keep behavior unchanged:
1. Centralize permission + booter + interactive_shell resolution
All tools repeat the same pattern:
check_admin_permissionget_booter(...)ish = sb.interactive_shell; if ish is None: ...
You can encapsulate this:
async def _get_interactive_shell(
context: ContextWrapper[AstrAgentContext],
action_name: str,
*,
allow_missing: bool = False,
) -> tuple[Any | None, ToolExecResult | None]:
if permission_error := check_admin_permission(context, action_name):
return None, permission_error
sb = await get_booter(
context.context.context,
context.context.event.unified_msg_origin,
)
ish = sb.interactive_shell
if ish is None and not allow_missing:
return None, _err("Interactive shell is not supported by the current runtime.")
return ish, NoneThen e.g. InteractiveShellSendTool.call can focus on its core logic:
async def call(
self,
context: ContextWrapper[AstrAgentContext],
session_id: str,
input: str,
send_eof: bool = False,
) -> ToolExecResult:
ish, error = await _get_interactive_shell(context, "Interactive shell send")
if error:
return error
try:
await ish.send(session_id, input, send_eof=send_eof)
return _ok({"message": "Input sent successfully."})
except ValueError as e:
return _err(f"Session not found: {e}")
except Exception as e:
return _err(f"Failed to send input: {e}")InteractiveShellListTool can use allow_missing=True to preserve the current “success=True + empty list” behavior:
ish, error = await _get_interactive_shell(
context,
"Interactive shell list",
allow_missing=True,
)
if error:
return error
if ish is None:
return _ok({
"sessions": [],
"message": "Interactive shell is not available in this runtime.",
})2. Small JSON response helpers
All tools manually build JSON + json.dumps(..., ensure_ascii=False). A tiny helper keeps each tool’s call body focused:
def _ok(payload: dict[str, Any] | None = None) -> str:
data: dict[str, Any] = {"success": True}
if payload:
data.update(payload)
return json.dumps(data, ensure_ascii=False)
def _err(message: str) -> str:
return json.dumps({"success": False, "error": message}, ensure_ascii=False)Then, for example, InteractiveShellStopTool.call becomes:
async def call(
self,
context: ContextWrapper[AstrAgentContext],
session_id: str,
force: bool = False,
) -> ToolExecResult:
ish, error = await _get_interactive_shell(context, "Interactive shell stop")
if error:
return error
try:
session = await ish.terminate(session_id, graceful=not force)
return _ok({
"session": _session_to_dict(session),
"message": "Session terminated.",
})
except ValueError as e:
return _err(f"Session not found: {e}")
except Exception as e:
return _err(f"Failed to terminate session: {e}")3. Optional session dict helper
The get_session + conditional dict conversion pattern appears multiple times:
session = await ish.get_session(session_id)
state_info = _session_to_dict(session) if session else NoneYou can encapsulate it to reduce small but repeated branching:
def _optional_session_to_dict(session) -> dict[str, Any] | None:
return _session_to_dict(session) if session is not None else NoneUsage in InteractiveShellReadTool:
session = await ish.get_session(session_id)
state_info = _optional_session_to_dict(session)4. Optionally move long hints to constants
The long hint/description strings inflate each call method. You can keep the user-facing text but move it to constants near the top of the file:
_READ_HINT = (
"Analyze the output to determine if the program is: "
"(1) waiting for input (shows a prompt), "
"(2) still processing (no prompt yet), or "
"(3) has finished (exited)."
)Then:
return _ok({
"output": output,
"session": state_info,
"hint": _READ_HINT,
})These changes keep all behavior and responses intact while making each tool’s call implementation much shorter and easier to scan.
There was a problem hiding this comment.
Code Review
This pull request introduces a stateful interactive shell component to AstrBot, enabling multi-turn bidirectional communication with long-running shell processes. It includes a new protocol, a local implementation using subprocesses with background reader threads, and a suite of tools for the agent to manage shell sessions. Key feedback includes moving blocking cleanup operations to separate threads to prevent event loop stalls, implementing security checks for dangerous commands, simplifying redundant decoding logic, and adding an explicit shutdown mechanism to ensure proper resource management.
| self._cleanup_terminated() | ||
| self._cleanup_idle_sessions() |
There was a problem hiding this comment.
_cleanup_terminated 和 _cleanup_idle_sessions 包含阻塞操作(如 t.join() 和 process.wait()),直接在 asyncio 事件循环中调用会阻塞整个 bot 的响应。建议使用 asyncio.to_thread 异步执行。
| self._cleanup_terminated() | |
| self._cleanup_idle_sessions() | |
| await asyncio.to_thread(self._cleanup_terminated) | |
| await asyncio.to_thread(self._cleanup_idle_sessions) |
| """Start an interactive shell session.""" | ||
| await self._ensure_cleanup_task() | ||
|
|
||
| def _start() -> _LocalInteractiveSession: |
| try: | ||
| text = chunk.decode("utf-8", errors="replace") | ||
| except Exception: | ||
| text = chunk.decode("utf-8", errors="replace") |
| to ensure output is captured promptly. | ||
| """ | ||
|
|
||
| def __init__(self) -> None: |
|
Related Knowledge 1 document with suggested updates is ready for review. AstrBotTeam's Space pr4697的改动View Suggested Changes@@ -145,6 +145,12 @@
- `FILE_WRITE_TOOL`:写入 UTF-8 文本文件,相对路径默认落在当前 workspace
- `FILE_EDIT_TOOL`:通过精确字符串替换编辑文件,支持全局替换
- `GREP_TOOL`:使用 ripgrep 搜索文件内容,支持 glob 过滤和上下文行数
+ - **交互式 Shell 工具**([PR #8116](https://github.com/AstrBotDevs/AstrBot/pull/8116)):
+ - `astrbot_inta_shell_start`:启动交互式 Shell 会话,返回 session_id 和初始输出。不支持完整 TTY 程序(如 vim、nano)
+ - `astrbot_inta_shell_send`:向会话发送输入,自动追加换行符,支持 send_eof 参数
+ - `astrbot_inta_shell_read`:从会话读取输出,支持可配置的超时和 max_chars
+ - `astrbot_inta_shell_stop`:终止会话,默认优雅关闭(先发送 Ctrl+C),支持强制 kill
+ - `astrbot_inta_shell_list`:列出所有活跃的交互式 Shell 会话
这些工具在 SubAgent handoff 场景下可正常使用,与主 Agent 运行时动态挂载的工具保持一致。
|
Modifications / 改动点
resolve #8086
摘要
本 PR 为 AstrBot 的 Agent 系统引入了一套完整的交互式 Shell 工具,使 LLM Agent 能够与需要长期运行的 Shell 进程进行有状态的、双向的、多轮交互。这填补了现有工具集的一个重要空白:虽然
ShellComponent非常适合一次性命令执行(即发即走),但它无法处理需要持续交互的程序(例如npm init、Python REPL、git add -p、交互式安装程序等)。解决了什么问题?
目前,AstrBot 的 computer-use 工具集提供:
ShellComponent—— 一次性命令执行(运行 → 等待 → 收集输出)PythonComponent—— 代码执行FileSystemComponent—— 文件操作然而,许多现实中的 CLI 程序需要交互式对话:程序打印提示符,等待用户输入,处理输入,打印更多输出,然后重复这个过程。例如:
npm initgit add -ppasswd、ssh-keygen没有这个工具,Agent 只能:
实现了什么?
1. 协议层:
InteractiveShellComponent(olayer/interactive_shell.py)新增了一个协议(Protocol),定义交互式 Shell 操作的契约:
关键设计决策:
start()创建一个持久进程,分配唯一的session_id,支持多个并发交互会话。InteractiveSessionState枚举跟踪RUNNING(运行中)、WAITING_INPUT(等待输入)、OUTPUT_READY(输出就绪)、TERMINATED(已终止)、ERROR(错误)五种状态。ShellComponent分离:保持一次性 Shell 工具的简洁,同时在此提供丰富的交互语义。2. 本地实现:
LocalInteractiveShellComponent(booters/local_interactive_shell.py)一个完整的本地运行时实现(约 525 行),基于
subprocess.Popen构建,使用持久的 stdin/stdout/stderr 管道。核心机制:
bytearray缓冲区,防止进程产生输出速度快于 Agent 读取时发生管道死锁threading.Lock保护stdout_buffer/stderr_buffer,避免读取线程与read()调用之间的竞争条件bufsize=0(无缓冲)+ 二进制模式确保即时、跨平台的输出捕获;手动 UTF-8 解码,并支持 Windows 编码回退(GBK/GB18030/CP936)kill();正确关闭管道并等待线程结束Windows 专属处理:
PYTHONIOENCODING=utf-8环境变量cmd.exe会话前缀添加chcp 65001设置 UTF-8 代码页subprocess.CREATE_NO_WINDOW标志抑制控制台窗口弹出3. Agent 工具:5 个新的 Function 工具(
tools/computer_tools/interactive_shell.py)以标准 AstrBot
FunctionTool形式暴露给 LLM Agent:astrbot_inta_shell_startsession_id+ 初始输出npm init、REPL、安装程序astrbot_inta_shell_send\n)astrbot_inta_shell_readastrbot_inta_shell_stopastrbot_inta_shell_list工具设计亮点:
check_admin_permission(),与其他 computer-use 工具保持一致get_booter()同时支持local和sandbox运行时(沙箱支持通过协议扩展实现)cwd默认设置为对话的工作区根目录npm init -y代替npm init"){"success": bool, "session": {...}, "output": "..."}4. Booter 集成(
booters/base.py+booters/local.py)ComputerBooter基类:新增interactive_shell属性,返回InteractiveShellComponent | None,默认值为None以保持向后兼容LocalBooter:实例化LocalInteractiveShellComponent并通过属性暴露astr_main_agent.py中,将 5 个工具同时添加到_apply_local_env_tools()和_apply_sandbox_tools()中5. 模块导出(
olayer/__init__.py)将
InteractiveShellComponent添加到公共 API 导出列表中。架构概览
变更文件
astrbot/core/astr_main_agent.pyastrbot/core/computer/booters/base.pyComputerBooter添加interactive_shell抽象属性astrbot/core/computer/booters/local.pyLocalBooter中实例化LocalInteractiveShellComponentastrbot/core/computer/booters/local_interactive_shell.pyastrbot/core/computer/olayer/__init__.pyInteractiveShellComponentastrbot/core/computer/olayer/interactive_shell.pyInteractiveShellComponent、InteractiveSession、InteractiveSessionStateastrbot/core/tools/computer_tools/interactive_shell.pyFunctionTool实现向后兼容性
ComputerBooter上的interactive_shell属性默认返回None;没有此属性的现有 booter 无需修改即可继续工作。computer_use_runtime(local/sandbox)时才会添加,与现有 computer-use 工具行为一致。ShellComponent、PythonComponent或任何现有工具 API 无破坏性变更。安全考量
terminate()关闭所有管道,等待读取线程结束,如优雅关闭失败则强制杀死进程read()的max_chars参数,配合缓冲区溢出处理(多余内容放回缓冲区)check_admin_permission),与文件/Shell 工具保持一致未来工作
Screenshots or Test Results / 运行截图或测试结果
示例
以下两份报告由AI自动化测试并撰写完成
1 测试报告一 模拟测试
1.1 基础生命周期测试 (TestBasicLifecycle) — 6 用例
1.2 输入输出行为测试 (TestIOBehavior) — 7 用例
\u4f60\u597d)\n/\r\n分隔chr(7/0/255)1.3 并发与并行测试 (TestConcurrency) — 4 用例
1.4 边界与异常测试 (TestEdgeCases) — 10 用例
1.5 场景模拟测试 (TestScenarioBased) — 6 用例
1.6 性能与稳定性测试 (TestPerformanceStability) — 3 用例
1.7 覆盖率统计
2 测试报告二 真实测试
2.1 测试汇总
2.2 详细结果
TEST-INTA-001: 启动 cmd.exe 交互式会话 ✅
命令:
cmd.exe返回结果:
{ "success": true, "session": { "session_id": "1446cbc5", "command": "cmd.exe", "pid": 13060, "state": "running", "exit_code": null, "created_at": 1778334293.9733527 }, "initial_output": "Microsoft Windows [版本 10.0.19045.6466]\r\n(c) Microsoft Corporation保留所有权利。\r\n\r\n(astrbot) F:\\...>" }结论: 交互式 shell 启动正常,返回完整的会话信息和初始输出。
TEST-INTA-002: 向 cmd 发送命令并读取输出 ✅
操作序列:
send:echo hello_interactiveread: timeout=3s读取结果:
结论: send/read 交互正常,输出包含输入回显、执行结果和新的提示符。
TEST-INTA-003: 目录切换持久化 ✅
操作序列:
send:D:(先切换盘符)read: 提示符变为(astrbot) D:\>send:cd AstrbotWorkSpaceread: 提示符变为(astrbot) D:\AstrbotWorkSpace>send:dir /bread: 正确列出 D:\AstrbotWorkSpace 目录内容send:cd ..read: 提示符变回(astrbot) D:\>send:cd AstrbotWorkSpace\docsread: 提示符变为(astrbot) D:\AstrbotWorkSpace\docs>send:dir /bread: 正确列出 docs 目录内容,包含刚生成的shell_tools_test_report.md结论: 目录切换持久化功能正常
TEST-INTA-004: 启动 Python 交互式解释器 ✅
命令:
python -i初始输出:
结论: Python 交互式模式(
-i强制交互)启动正常,显示>>>提示符。TEST-INTA-005: Python 交互式执行代码 ✅
操作序列:
send:print('hello_interactive_python')read: timeout=5s读取结果:
结论: Python 代码执行和输出读取正常,返回结果和新的
>>>提示符。TEST-INTA-006: Python 表达式求值 ✅
操作序列:
send:1+1read: timeout=5s读取结果:
结论: 表达式求值正常,结果
2正确返回。TEST-INTA-007: 优雅退出交互式程序 ✅
操作序列:
send:exit()read: timeout=5s结果: 会话状态变为
terminated,exit_code=0结论: 交互式程序优雅退出正常,无残留进程。
TEST-INTA-008: 多命令连续发送 ✅
操作序列:
读取结果:
结论: 批量命令发送和输出缓冲正常,命令按顺序执行,输出完整捕获。
Checklist / 检查清单
😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
/ 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in
requirements.txtandpyproject.toml./ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到
requirements.txt和pyproject.toml文件相应位置。😮 My changes do not introduce malicious code.
/ 我的更改没有引入恶意代码。
Summary by Sourcery
Introduce an interactive shell capability to the computer-use runtime, enabling stateful, bidirectional shell sessions and exposing them as agent tools.
New Features:
Enhancements: