diff --git a/AGENTS.md b/AGENTS.md index 064fe2b..fee8bba 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -14,6 +14,7 @@ just test # pytest just lint # ruff check just fmt # ruff format just typecheck # mypy --strict +just scenario -- tests/scenarios/basic_coordination.toml just run -- ... # run the dispatch CLI in-tree uv run dispatch --help uv run dispatchd --help @@ -34,6 +35,7 @@ dispatch owns one `codex app-server` subprocess (stdio JSONL, shared `~/.codex`) - `.agents/plans/v0/` — phased plan (`PLAN.md`) + references (`REFS.md`); tracked. - `spikes/` — App Server probe scripts; seed of the integration suite. - `tests/fixtures/` — small named App Server, JSONL, CLI-smoke, and registry fixtures. +- `tests/scenarios/` — live agent workflow fixtures run intentionally with `just scenario`. - `.agents/notes/` — working notes, session recaps, learnings; **gitignored, local only**. - `skills/` — first-party Codex skills for operating dispatch (`dispatch`) and dispatch-backed direct messages (`dm`). - `plugins/dispatch/` — workspace-local Codex plugin bundle exposing the skills and MCP server. @@ -66,6 +68,7 @@ Use the project language consistently: - **Async core, sync CLI.** The daemon is asyncio end-to-end; the CLI is a thin sync client over the control socket. No blocking calls in the loop (use `aiosqlite`, asyncio subprocess, `run_in_executor`). See [python-conventions](.claude/rules/python-conventions.md). - **Never touch the user's live state in tests.** Integration tests use a real ephemeral app-server with an isolated `CODEX_HOME` and `ephemeral:true` lanes. - **Fixtures should be exercised.** Add checked-in cases under `tests/fixtures/` only when a test loads them; prefer Python builders over binary SQLite fixtures. +- **Live scenarios are opt-in.** Scenario fixtures start real isolated Dispatch/Codex daemons and make model calls; keep them small, synthetic, low-effort, and outside `just check`. ## Source control diff --git a/docs/development/design.md b/docs/development/design.md index e3d7ce3..c9bf994 100644 --- a/docs/development/design.md +++ b/docs/development/design.md @@ -161,6 +161,7 @@ The client supports the full responder loop. v1 surfaces `waiting_on_approval` a - MCP: the official Python **`mcp`** SDK (stdio transport first). Scheduling: small custom asyncio scheduler + `croniter` for cron (interval needs no lib). No `dateutil`/RRULE in v1. - Tests: **pytest** + **pytest-asyncio**. Hooks: **lefthook** (polyglot; runs ruff/mypy/pytest). Task runner: **just** (justfile) for `test`/`lint`/`typecheck`/`run`. Daemon keep-alive: **launchd** LaunchAgent plist. CI: GitHub Actions + `astral-sh/setup-uv`. - Fixture corpus: `tests/fixtures/` stores small named App Server payloads, Codex JSONL sync sources, CLI-smoke notes, and registry builders. Every checked-in fixture should be loaded by a test. Prefer builders over binary SQLite files. +- Live scenarios: `tests/scenarios/` plus `scripts/run_scenario.py` exercise agent-level workflows through the public CLI with isolated `DISPATCH_HOME`/`CODEX_HOME`. They use real model calls and are run intentionally via `just scenario -- `, not as part of the default gate. ## Data model (registry, SQLite) @@ -187,6 +188,7 @@ The client supports the full responder loop. v1 surfaces `waiting_on_approval` a - `test_examples(registry)` runs op examples as assertions. - Unit: message router (canned JSONL), trigger/guard evaluation, registry, error projections. - Release smoke: `just pypi-smoke -- --package-spec outfitter-dispatch==` installs the published package with `uvx`, uses a temporary `DISPATCH_HOME`, verifies daemon/model/list paths, and shuts down cleanly. +- Agent workflow smoke: `just scenario -- tests/scenarios/basic_coordination.toml` starts a real isolated daemon, creates live Codex lanes, waits for completion, and verifies list/get/tail state. ## Rough build slices (detailed by the implementation plan) diff --git a/docs/usage/README.md b/docs/usage/README.md index a64c409..9f1a016 100644 --- a/docs/usage/README.md +++ b/docs/usage/README.md @@ -53,6 +53,17 @@ derived `models` schema, starts the daemon, reads the live App Server model catalog, verifies the cached registry read, checks the empty first-run lane list, and shuts the daemon down. +For an agent-level live scenario against the in-tree CLI, run: + +```bash +just scenario -- tests/scenarios/basic_coordination.toml +``` + +This starts Dispatch with temporary `DISPATCH_HOME` and `CODEX_HOME`, creates +synthetic Codex lanes, waits for their turns to complete, verifies `list`/`get` +/`tail` state, then shuts the daemon down. It uses real Codex auth/model calls, +so it is intentionally separate from `just check`. + If `dispatch doctor` fails before the app-server smoke because the Codex CLI is not installed or authenticated, fix that first and rerun the doctor. Use `dispatch doctor --no-app-server` when you only need to inspect package, PATH, diff --git a/justfile b/justfile index 5744895..64d8031 100644 --- a/justfile +++ b/justfile @@ -22,6 +22,10 @@ test-int *args: pypi-smoke *args: uv run python scripts/check_pypi_smoke.py {{args}} +# Run a live agent scenario against an isolated DISPATCH_HOME/CODEX_HOME. +scenario *args: + uv run python scripts/run_scenario.py {{args}} + # Lint with ruff. lint: uv run ruff check . diff --git a/scripts/run_scenario.py b/scripts/run_scenario.py new file mode 100644 index 0000000..ae256de --- /dev/null +++ b/scripts/run_scenario.py @@ -0,0 +1,343 @@ +"""Run live Dispatch/Codex scenario fixtures. + +Scenarios exercise the public Dispatch CLI against an isolated ``DISPATCH_HOME`` +and ``CODEX_HOME``. They are intentionally outside ``just check`` because they +use real Codex auth/model calls. Use them before releases or when validating +agent-level workflows end to end. +""" + +from __future__ import annotations + +import argparse +import json +import os +import shlex +import shutil +import subprocess +import sys +import tempfile +import time +import tomllib +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Literal + +Status = Literal["pending", "started", "completed", "failed"] + + +@dataclass(frozen=True) +class ScenarioLane: + alias: str + name: str + prompt: str + expect_contains: str + + +@dataclass(frozen=True) +class Scenario: + name: str + description: str + timeout_seconds: float + poll_seconds: float + preferred_model: str | None + allow_model_fallback: bool + effort: str + parallel: bool + lanes: list[ScenarioLane] + + +@dataclass +class LaneRun: + alias: str + id: str + handle: str | None + expect_contains: str + + +def main(argv: list[str] | None = None) -> int: + args = _parse_args(argv) + scenario = _load_scenario(args.scenario) + if args.dry_run: + _print_plan(scenario, args) + return 0 + runner = ScenarioRunner(scenario=scenario, args=args) + return runner.run() + + +def _parse_args(argv: list[str] | None) -> argparse.Namespace: + raw_args = sys.argv[1:] if argv is None else argv + if raw_args and raw_args[0] == "--": + raw_args = raw_args[1:] + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("scenario", type=Path, help="TOML scenario fixture to run.") + parser.add_argument( + "--dispatch-bin", + default="uv run dispatch", + help="Command used to run dispatch, shell-split. Defaults to in-tree uv.", + ) + parser.add_argument( + "--keep-home", + action="store_true", + help="Keep temporary homes/work dir for debugging.", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Validate and print the scenario plan without starting Dispatch/Codex.", + ) + return parser.parse_args(raw_args) + + +def _load_scenario(path: Path) -> Scenario: + data = tomllib.loads(path.read_text()) + if not isinstance(data, dict): + raise SystemExit(f"{path} must contain a TOML table") + raw_lanes = data.get("lanes") + if not isinstance(raw_lanes, list) or not raw_lanes: + raise SystemExit(f"{path} must define at least one [[lanes]] entry") + lanes = [_load_lane(index, raw) for index, raw in enumerate(raw_lanes, start=1)] + aliases = [lane.alias for lane in lanes] + if len(set(aliases)) != len(aliases): + raise SystemExit(f"{path} has duplicate lane aliases") + return Scenario( + name=_string(data, "name", fallback=path.stem), + description=_string(data, "description", fallback=""), + timeout_seconds=float(data.get("timeout_seconds", 120.0)), + poll_seconds=float(data.get("poll_seconds", 1.0)), + preferred_model=_optional_string(data, "preferred_model"), + allow_model_fallback=bool(data.get("allow_model_fallback", True)), + effort=_string(data, "effort", fallback="low"), + parallel=bool(data.get("parallel", True)), + lanes=lanes, + ) + + +def _load_lane(index: int, raw: object) -> ScenarioLane: + if not isinstance(raw, dict): + raise SystemExit(f"lane #{index} must be a TOML table") + return ScenarioLane( + alias=_string(raw, "alias"), + name=_string(raw, "name"), + prompt=_string(raw, "prompt"), + expect_contains=_string(raw, "expect_contains"), + ) + + +def _string(data: dict[str, object], key: str, *, fallback: str | None = None) -> str: + value = data.get(key, fallback) + if not isinstance(value, str) or not value: + raise SystemExit(f"missing or invalid string field: {key}") + return value + + +def _optional_string(data: dict[str, object], key: str) -> str | None: + value = data.get(key) + if value is None: + return None + if not isinstance(value, str) or not value: + raise SystemExit(f"invalid string field: {key}") + return value + + +def _print_plan(scenario: Scenario, args: argparse.Namespace) -> None: + print(f"scenario={scenario.name}") + if scenario.description: + print(f"description={scenario.description}") + print(f"dispatch_bin={args.dispatch_bin}") + print(f"preferred_model={scenario.preferred_model or ''}") + print(f"effort={scenario.effort}") + print(f"parallel={scenario.parallel}") + for lane in scenario.lanes: + print(f"lane {lane.alias}: name={lane.name!r} expect={lane.expect_contains!r}") + + +class ScenarioRunner: + def __init__(self, *, scenario: Scenario, args: argparse.Namespace) -> None: + self.scenario = scenario + self.args = args + self.dispatch_cmd = shlex.split(args.dispatch_bin) + self.dispatch_home = _mktemp("dispatch-scenario-home.") + self.codex_home = _mktemp("dispatch-scenario-codex.") + self.work_dir = _mktemp("dispatch-scenario-work.") + self.env = os.environ.copy() + self.env["DISPATCH_HOME"] = str(self.dispatch_home) + self.env["CODEX_HOME"] = str(self.codex_home) + + def run(self) -> int: + print(f"scenario={self.scenario.name}") + print(f"DISPATCH_HOME={self.dispatch_home}") + print(f"CODEX_HOME={self.codex_home}") + print(f"work_dir={self.work_dir}") + try: + self._prepare_codex_home() + self._dispatch_json(["up", "--json"]) + model = self._resolve_model() + lanes = self._start_lanes(model) + self._assert_list_contains(lanes) + for lane in lanes: + self._wait_for_completion(lane) + self._assert_tail_contains(lane) + self._dispatch_json(["down", "--json"]) + print("scenario passed") + return 0 + finally: + self._dispatch(["down", "--json"], check=False) + if not self.args.keep_home: + shutil.rmtree(self.dispatch_home, ignore_errors=True) + shutil.rmtree(self.codex_home, ignore_errors=True) + shutil.rmtree(self.work_dir, ignore_errors=True) + else: + print("kept temporary homes for debugging") + + def _prepare_codex_home(self) -> None: + auth = Path.home() / ".codex" / "auth.json" + if not shutil.which("codex"): + raise SystemExit("codex binary not on PATH; cannot run live scenario") + if not auth.exists(): + raise SystemExit("no ~/.codex/auth.json; cannot run live scenario") + shutil.copy2(auth, self.codex_home / "auth.json") + + def _resolve_model(self) -> str | None: + models = self._dispatch_json(["models", "--json"]) + available = { + item.get("id") + for item in _list(models.get("models")) + if isinstance(item, dict) and isinstance(item.get("id"), str) + } + preferred = self.scenario.preferred_model + if preferred is None: + return None + if preferred in available: + print(f"model={preferred}") + return preferred + if self.scenario.allow_model_fallback: + print(f"model={preferred} unavailable; falling back to Codex default") + return None + raise SystemExit( + f"preferred model {preferred!r} is unavailable; available: {sorted(available)}" + ) + + def _start_lanes(self, model: str | None) -> list[LaneRun]: + if self.scenario.parallel: + return [self._start_lane(lane, model) for lane in self.scenario.lanes] + runs: list[LaneRun] = [] + for lane in self.scenario.lanes: + run = self._start_lane(lane, model) + self._wait_for_completion(run) + runs.append(run) + return runs + + def _start_lane(self, lane: ScenarioLane, model: str | None) -> LaneRun: + args = [ + "new", + "--name", + lane.name, + "--cwd", + str(self.work_dir), + "--text", + lane.prompt, + "--effort", + self.scenario.effort, + "--no-ephemeral", + "--json", + ] + if model is not None: + args.extend(["--model", model]) + out = self._dispatch_json(args, timeout=self.scenario.timeout_seconds) + lane_id = out.get("id") + if not isinstance(lane_id, str): + raise SystemExit(f"new did not return lane id: {out}") + if out.get("message_accepted") is not True: + raise SystemExit(f"new did not accept initial message: {out}") + handle = out.get("handle") + print(f"started {lane.alias}: id={lane_id} handle={handle}") + return LaneRun( + alias=lane.alias, + id=lane_id, + handle=handle if isinstance(handle, str) else None, + expect_contains=lane.expect_contains, + ) + + def _assert_list_contains(self, lanes: list[LaneRun]) -> None: + out = self._dispatch_json(["list", "--json"]) + items = _list(out.get("lanes")) + ids = {item.get("id") for item in items if isinstance(item, dict)} + missing = [lane.id for lane in lanes if lane.id not in ids] + if missing: + raise SystemExit(f"list did not include scenario lanes: {missing}; got {ids}") + + def _wait_for_completion(self, lane: LaneRun) -> None: + deadline = time.monotonic() + self.scenario.timeout_seconds + last_status: Status | None = None + while time.monotonic() < deadline: + detail = self._dispatch_json( + ["get", lane.id, "--include-transcript", "--json"], + timeout=min(30.0, self.scenario.timeout_seconds), + ) + latest_turn = detail.get("latest_turn") + status = latest_turn.get("status") if isinstance(latest_turn, dict) else None + if status != last_status: + print(f"{lane.alias}: latest_turn={status}") + last_status = status if status in {"started", "completed", "failed"} else None + if status == "completed": + return + if status == "failed": + raise SystemExit(f"{lane.alias} failed: {latest_turn}") + time.sleep(self.scenario.poll_seconds) + raise SystemExit(f"{lane.alias} did not complete within {self.scenario.timeout_seconds}s") + + def _assert_tail_contains(self, lane: LaneRun) -> None: + out = self._dispatch_json(["tail", lane.id, "--limit", "20", "--json"]) + haystack = json.dumps(out, sort_keys=True).lower() + needle = lane.expect_contains.lower() + if needle not in haystack: + raise SystemExit(f"{lane.alias} transcript missing {needle!r}: {out}") + + def _dispatch_json(self, args: list[str], *, timeout: float = 90.0) -> dict[str, Any]: + result = self._dispatch(args, timeout=timeout) + try: + parsed = json.loads(result.stdout) + except json.JSONDecodeError as exc: + raise SystemExit( + f"dispatch {' '.join(args)} did not produce JSON\n" + f"stdout:\n{result.stdout}\nstderr:\n{result.stderr}" + ) from exc + if not isinstance(parsed, dict): + raise SystemExit(f"dispatch {' '.join(args)} produced non-object JSON") + return parsed + + def _dispatch( + self, + args: list[str], + *, + timeout: float = 90.0, + check: bool = True, + ) -> subprocess.CompletedProcess[str]: + result = subprocess.run( + [*self.dispatch_cmd, *args], + env=self.env, + cwd=self.work_dir, + text=True, + capture_output=True, + timeout=timeout, + check=False, + ) + if check and result.returncode != 0: + raise SystemExit( + f"dispatch {' '.join(args)} failed with {result.returncode}\n" + f"stdout:\n{result.stdout}\nstderr:\n{result.stderr}" + ) + return result + + +def _list(value: object) -> list[object]: + return value if isinstance(value, list) else [] + + +def _mktemp(prefix: str) -> Path: + root = Path("/tmp") if Path("/tmp").is_dir() else Path(tempfile.gettempdir()) + return Path(tempfile.mkdtemp(prefix=prefix, dir=root)) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/scenarios/README.md b/tests/scenarios/README.md new file mode 100644 index 0000000..432f6d2 --- /dev/null +++ b/tests/scenarios/README.md @@ -0,0 +1,25 @@ +# Live Scenario Fixtures + +Scenario fixtures exercise Dispatch as an agent-facing product: they start a real +daemon, create live Codex threads through the public CLI, wait for work to +complete, and assert observable Dispatch state. + +These are not part of `just check`. They use real Codex auth/model calls, so run +them intentionally: + +```bash +just scenario -- tests/scenarios/basic_coordination.toml +``` + +The runner creates temporary `DISPATCH_HOME`, `CODEX_HOME`, and work directories +under `/tmp`. It copies `~/.codex/auth.json` into the temporary `CODEX_HOME`, +starts `dispatchd`, and removes all temporary state afterward unless +`--keep-home` is passed. + +Keep scenarios: + +- small and deterministic; +- synthetic, with no private thread content; +- cheap (`effort = "low"` and a small preferred model when available); +- focused on user/agent workflows rather than one-off protocol details. + diff --git a/tests/scenarios/basic_coordination.toml b/tests/scenarios/basic_coordination.toml new file mode 100644 index 0000000..0699433 --- /dev/null +++ b/tests/scenarios/basic_coordination.toml @@ -0,0 +1,21 @@ +name = "basic_coordination" +description = "Create two live Dispatch-managed Codex lanes and verify list/get/tail state." +timeout_seconds = 120 +poll_seconds = 1 +preferred_model = "gpt-5.3-codex-spark" +allow_model_fallback = true +effort = "low" +parallel = true + +[[lanes]] +alias = "alpha" +name = "scenario-alpha" +prompt = "Reply with exactly one word: alpha" +expect_contains = "alpha" + +[[lanes]] +alias = "beta" +name = "scenario-beta" +prompt = "Reply with exactly one word: beta" +expect_contains = "beta" + diff --git a/tests/test_scenarios.py b/tests/test_scenarios.py new file mode 100644 index 0000000..06e1440 --- /dev/null +++ b/tests/test_scenarios.py @@ -0,0 +1,25 @@ +"""Fast checks for live scenario fixture definitions.""" + +from __future__ import annotations + +import subprocess +import sys + + +def test_basic_scenario_dry_run_validates() -> None: + result = subprocess.run( + [ + sys.executable, + "scripts/run_scenario.py", + "--dry-run", + "tests/scenarios/basic_coordination.toml", + ], + text=True, + capture_output=True, + check=False, + ) + + assert result.returncode == 0, result.stderr + assert "scenario=basic_coordination" in result.stdout + assert "lane alpha" in result.stdout + assert "lane beta" in result.stdout