From 24cfe3545502f6279de3c25d22ab9eb55c483e7b Mon Sep 17 00:00:00 2001 From: Chathurangi Shyalika Date: Thu, 2 Jul 2026 21:53:42 -0400 Subject: [PATCH] Secure OpenCode CLI runs with staged per-scenario workspaces Signed-off-by: Chathurangi Shyalika --- src/agent/opencode_agent/runner.py | 9 +- src/benchmark/scenario_suite_runner.py | 181 +++++++++++++++++- .../tests/test_scenario_suite_runner.py | 106 ++++++++++ 3 files changed, 294 insertions(+), 2 deletions(-) diff --git a/src/agent/opencode_agent/runner.py b/src/agent/opencode_agent/runner.py index ac6561be..9df56973 100644 --- a/src/agent/opencode_agent/runner.py +++ b/src/agent/opencode_agent/runner.py @@ -46,6 +46,8 @@ When file or bash access is enabled, use the current working directory as the run workspace. Write any scripts, temporary files, intermediate data, and final artifacts there. Do not read or write files outside the current workspace. +Do not inspect parent directories, repository folders, reports, traces, +groundtruth files, previous agent outputs, or hidden evaluation artifacts. """ ) @@ -647,6 +649,11 @@ async def run(self, question: str) -> AgentResult: cmd.append(question) env = os.environ.copy() + # The OpenCode subprocess should not expose host-side evaluation + # output paths to file/bash tools. The Python wrapper persists the + # trajectory after OpenCode exits, using the parent process env. + env.pop("AGENT_TRAJECTORY_DIR", None) + env.pop("SCENARIOS_DATA_DIR", None) env.update(self._env_overrides) env["OPENCODE_CONFIG_CONTENT"] = json.dumps(self._config) env.setdefault("OPENCODE_DISABLE_AUTOUPDATE", "true") @@ -659,7 +666,7 @@ async def run(self, question: str) -> AgentResult: ) proc = await asyncio.create_subprocess_exec( *cmd, - cwd=str(_REPO_ROOT), + cwd=str(self._run_dir), env=env, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, diff --git a/src/benchmark/scenario_suite_runner.py b/src/benchmark/scenario_suite_runner.py index 93728298..606b30c1 100644 --- a/src/benchmark/scenario_suite_runner.py +++ b/src/benchmark/scenario_suite_runner.py @@ -26,8 +26,10 @@ from __future__ import annotations import argparse +import json import os import re +import shutil import subprocess import sys from dataclasses import dataclass @@ -48,6 +50,7 @@ class MethodConfig: model_id: str extra_args: tuple[str, ...] = () workspace_root: Path | None = None + stage_workspace: bool = False def model_dir_name(model_id: str) -> str: @@ -142,6 +145,156 @@ def validate_groundtruth_exists(scenario_root: Path, scenario_id: str) -> None: ) +def _is_relative_to(path: Path, base: Path) -> bool: + try: + path.resolve().relative_to(base.resolve()) + return True + except ValueError: + return False + + +def validate_workspace_root_outside_repo(workspace_root: Path, label: str) -> None: + """Reject CLI workspaces inside the repo to avoid report/trace leakage.""" + workspace_root = workspace_root.expanduser().resolve() + if _is_relative_to(workspace_root, REPO_ROOT): + raise ValueError( + f"{label} must be outside the repository when file/bash/edit " + f"access is enabled: {workspace_root}" + ) + + +def _iter_manifest_paths(value: object) -> list[str]: + """Return path-like strings from a scenario manifest value.""" + if isinstance(value, str): + return [value] + if isinstance(value, list): + paths: list[str] = [] + for item in value: + paths.extend(_iter_manifest_paths(item)) + return paths + if isinstance(value, dict): + paths = [] + for item in value.values(): + paths.extend(_iter_manifest_paths(item)) + return paths + return [] + + +def _resolve_manifest_data_path( + *, + scenario_root: Path, + scenario_dir: Path, + spec: str, +) -> Path | None: + """Resolve a manifest data path using the same search order as init_data.""" + raw = Path(spec).expanduser() + candidates = [raw] if raw.is_absolute() else [ + scenario_dir / raw, + scenario_root / raw, + ] + + for candidate in candidates: + try: + resolved = candidate.resolve(strict=True) + except FileNotFoundError: + continue + if resolved.is_file() and _is_relative_to(resolved, scenario_root): + return resolved + return None + + +def _copy_into_workspace( + *, + source: Path, + scenario_root: Path, + workspace_dir: Path, +) -> Path: + relative = source.resolve().relative_to(scenario_root.resolve()) + destination = workspace_dir / relative + destination.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(source, destination) + return destination + + +def stage_scenario_workspace( + *, + scenario_root: Path, + scenario_id: str, + workspace_dir: Path, +) -> list[Path]: + """Create a clean run workspace with only allowed scenario input files. + + The staged workspace intentionally excludes ``groundtruth.txt`` and any + existing reports/traces. It includes ``question.txt``, ``manifest.json``, + and data files referenced by the manifest. + """ + scenario_root = scenario_root.expanduser().resolve() + scenario_dir = scenario_dir_for_id(scenario_root, scenario_id) + manifest_path = scenario_dir / "manifest.json" + question_path = scenario_dir / "question.txt" + + if not question_path.exists(): + raise FileNotFoundError( + f"Missing question file for scenario {scenario_id}: {question_path}" + ) + if not manifest_path.exists(): + raise FileNotFoundError( + f"Missing manifest file for scenario {scenario_id}: {manifest_path}" + ) + + workspace_dir = workspace_dir.expanduser().resolve() + if workspace_dir.exists(): + shutil.rmtree(workspace_dir) + workspace_dir.mkdir(parents=True, exist_ok=True) + + copied = [ + _copy_into_workspace( + source=question_path, + scenario_root=scenario_root, + workspace_dir=workspace_dir, + ), + _copy_into_workspace( + source=manifest_path, + scenario_root=scenario_root, + workspace_dir=workspace_dir, + ), + ] + + manifest = json.loads(manifest_path.read_text(encoding="utf-8")) + for spec in sorted(set(_iter_manifest_paths(manifest))): + source = _resolve_manifest_data_path( + scenario_root=scenario_root, + scenario_dir=scenario_dir, + spec=spec, + ) + if source is None or source.name == "groundtruth.txt": + continue + copied.append( + _copy_into_workspace( + source=source, + scenario_root=scenario_root, + workspace_dir=workspace_dir, + ) + ) + + readme = workspace_dir / "README.md" + readme.write_text( + "\n".join( + [ + f"# AssetOpsBench scenario {scenario_id} workspace", + "", + "This workspace contains only staged scenario inputs.", + "Do not inspect files outside this directory.", + "Ground truth, reports, and previous trajectories are intentionally excluded.", + "", + ] + ), + encoding="utf-8", + ) + copied.append(readme) + return copied + + def reset_and_load_couchdb(scenario_id: str, scenario_root: Path, dry_run: bool) -> None: """Reset CouchDB and load the scenario-specific data from scenario_root.""" env = os.environ.copy() @@ -172,6 +325,7 @@ def run_agent_for_scenario( question: str, trajectory_dir: Path, dry_run: bool, + scenario_root: Path | None = None, ) -> None: """Run one scenario with one method.""" run_id = f"{method.agent_name}_{scenario_id}" @@ -184,7 +338,22 @@ def run_agent_for_scenario( workspace_dir = method.workspace_root / run_id extra_args.extend(["--workspace-dir", str(workspace_dir)]) if not dry_run: - workspace_dir.mkdir(parents=True, exist_ok=True) + if method.stage_workspace: + if scenario_root is None: + raise ValueError( + "scenario_root is required when staging a run workspace" + ) + staged = stage_scenario_workspace( + scenario_root=scenario_root, + scenario_id=scenario_id, + workspace_dir=workspace_dir, + ) + print( + f"Staged {len(staged)} files for scenario {scenario_id} " + f"into {workspace_dir}" + ) + else: + workspace_dir.mkdir(parents=True, exist_ok=True) cmd = [ "uv", @@ -308,6 +477,7 @@ def build_methods(args: argparse.Namespace) -> dict[str, MethodConfig]: model_id=args.model_id, extra_args=tuple(opencode_extra_args), workspace_root=args.opencode_workspace_root, + stage_workspace=True, ), "gemini_cli_agent": MethodConfig( agent_name="gemini_cli_agent", @@ -552,6 +722,13 @@ def main() -> None: "--opencode-workspace-root is required when enabling OpenCode " "files, bash, or edits" ) + if opencode_workspace_required: + try: + validate_workspace_root_outside_repo( + args.opencode_workspace_root, "--opencode-workspace-root" + ) + except ValueError as exc: + parser.error(str(exc)) gemini_workspace_required = ( args.gemini_allow_files or args.gemini_allow_bash or args.gemini_allow_edit ) @@ -592,6 +769,7 @@ def main() -> None: model_id=method.model_id, extra_args=method.extra_args, workspace_root=method_workspace_root, + stage_workspace=method.stage_workspace, ) if not args.dry_run: @@ -623,6 +801,7 @@ def main() -> None: scenario_id=scenario_id, question=question, trajectory_dir=trajectory_dir, + scenario_root=args.scenario_root, dry_run=args.dry_run, ) except Exception as exc: diff --git a/src/benchmark/tests/test_scenario_suite_runner.py b/src/benchmark/tests/test_scenario_suite_runner.py index 294a4fad..9bd9795f 100644 --- a/src/benchmark/tests/test_scenario_suite_runner.py +++ b/src/benchmark/tests/test_scenario_suite_runner.py @@ -54,6 +54,61 @@ def test_read_question_raises_when_missing(tmp_path: Path) -> None: mr.read_question(tmp_path, "11") +def test_stage_scenario_workspace_copies_inputs_without_groundtruth( + tmp_path: Path, +) -> None: + scenario_root = tmp_path / "scenarios_data" + scenario_dir = scenario_root / "scenario_1001" + shared_iot = scenario_root / "shared" / "iot" + shared_failure = scenario_root / "shared" / "failure_code" + scenario_dir.mkdir(parents=True) + shared_iot.mkdir(parents=True) + shared_failure.mkdir(parents=True) + + (scenario_dir / "question.txt").write_text("Find anomaly.", encoding="utf-8") + (scenario_dir / "groundtruth.txt").write_text( + '{"condition":"faulty"}', + encoding="utf-8", + ) + (scenario_dir / "manifest.json").write_text( + """ + { + "iot": "shared/iot/asset_data.json", + "asset": ["shared/iot/asset_registry.json"], + "failure_code": "shared/failure_code/failure_codes.csv" + } + """, + encoding="utf-8", + ) + (shared_iot / "asset_data.json").write_text("[]", encoding="utf-8") + (shared_iot / "asset_registry.json").write_text("[]", encoding="utf-8") + (shared_failure / "failure_codes.csv").write_text("code,name\n", encoding="utf-8") + + workspace = tmp_path / "workspace" + copied = mr.stage_scenario_workspace( + scenario_root=scenario_root, + scenario_id="1001", + workspace_dir=workspace, + ) + + copied_relative = {path.relative_to(workspace) for path in copied} + assert Path("scenario_1001/question.txt") in copied_relative + assert Path("scenario_1001/manifest.json") in copied_relative + assert Path("shared/iot/asset_data.json") in copied_relative + assert Path("shared/iot/asset_registry.json") in copied_relative + assert Path("shared/failure_code/failure_codes.csv") in copied_relative + assert Path("README.md") in copied_relative + assert not (workspace / "scenario_1001" / "groundtruth.txt").exists() + + +def test_validate_workspace_root_rejects_repo_paths() -> None: + with pytest.raises(ValueError): + mr.validate_workspace_root_outside_repo( + mr.REPO_ROOT / "traces" / "opencode_workspaces", + "--opencode-workspace-root", + ) + + def test_model_dir_name_normalizes_router_model_ids() -> None: assert mr.model_dir_name("tokenrouter/MiniMax-M3") == "tokenrouter-MiniMax-M3" assert ( @@ -162,6 +217,7 @@ def test_build_methods_opencode_workspace_options(tmp_path: Path) -> None: assert opencode.extra_args == ("--allow-files", "--allow-bash") assert opencode.workspace_root == tmp_path / "workspaces" + assert opencode.stage_workspace is True def test_build_methods_opencode_thinking_and_variant() -> None: @@ -384,6 +440,56 @@ def fake_run(cmd, **kwargs): ] +def test_run_agent_for_scenario_stages_opencode_workspace( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + captured = {} + + def fake_run(cmd, **kwargs): + captured["cmd"] = cmd + captured["kwargs"] = kwargs + + monkeypatch.setattr(mr.subprocess, "run", fake_run) + + scenario_root = tmp_path / "scenarios_data" + scenario_dir = scenario_root / "scenario_1001" + shared_iot = scenario_root / "shared" / "iot" + scenario_dir.mkdir(parents=True) + shared_iot.mkdir(parents=True) + (scenario_dir / "question.txt").write_text("Find anomaly.", encoding="utf-8") + (scenario_dir / "groundtruth.txt").write_text("secret", encoding="utf-8") + (scenario_dir / "manifest.json").write_text( + '{"iot": "shared/iot/asset_data.json"}', + encoding="utf-8", + ) + (shared_iot / "asset_data.json").write_text("[]", encoding="utf-8") + + method = mr.MethodConfig( + agent_name="opencode_agent", + command="opencode-agent", + model_id="tokenrouter/MiniMax-M3", + extra_args=("--allow-files",), + workspace_root=tmp_path / "workspaces", + stage_workspace=True, + ) + + mr.run_agent_for_scenario( + method=method, + scenario_id="1001", + question="Find anomaly.", + trajectory_dir=tmp_path / "traj", + dry_run=False, + scenario_root=scenario_root, + ) + + expected_workspace = tmp_path / "workspaces" / "opencode_agent_1001" + assert (expected_workspace / "scenario_1001" / "question.txt").exists() + assert (expected_workspace / "scenario_1001" / "manifest.json").exists() + assert (expected_workspace / "shared" / "iot" / "asset_data.json").exists() + assert not (expected_workspace / "scenario_1001" / "groundtruth.txt").exists() + assert "--workspace-dir" in captured["cmd"] + + def test_run_agent_for_scenario_adds_gemini_workspace( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> None: