diff --git a/CLAUDE.md b/CLAUDE.md index acab920..1f628c4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -145,7 +145,7 @@ tested (`tests/test_roadmap.py`). - Use `httpx.AsyncClient` for all HTTP calls; never `requests`. - Pydantic v2 models for all structured data. - Errors from Agamemnon should raise typed exceptions, not generic ones. -- Tests use `pytest-asyncio` and mock the `AgamemnonClient` at the boundary. +- Tests are split into **unit** tests (mock `AgamemnonClient`) and **integration** tests (drive a real `httpx.AsyncClient` through `tests/stub_agamemnon.py`, an in-process ASGI stub). Mark new lifecycle/end-to-end tests with `@pytest.mark.integration`. `just test` runs the full suite (unit + integration); `just test-unit` skips integration for fast iteration; `just test-integration` runs only the lifecycle suite. The stub returns HTTP 501 (not 404) for any endpoint it does not implement so that a new Agamemnon endpoint surfaces as a named test failure. Integration tests construct stub-bound clients through `make_client_for(stub)` and register them with the `client_pool` fixture — never inline `httpx.AsyncClient(...)` in a test. - CI enforces a `--cov-fail-under=75` coverage floor (sourced from `pyproject.toml` `[tool.coverage.report]`). Local `just test` does not pass `--cov` by default — reproduce the CI check with `pixi run pytest --cov=telemachy --cov-report=term-missing`. ## Agent Guardrails diff --git a/justfile b/justfile index a491db0..0fb3daf 100644 --- a/justfile +++ b/justfile @@ -30,7 +30,8 @@ schema: # === Development === -# Run the test suite +# Run the full test suite (unit + integration). Lifecycle tests run by default +# to satisfy issue #146; use `just test-unit` to skip them during fast iteration. test: pixi run pytest diff --git a/tests/conftest.py b/tests/conftest.py index a5ac65b..3b3ffeb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,18 +4,26 @@ test_cli.py with a single set of typed factory functions. Each test calls the factory with only the fields it cares about; defaults come from one place so a schema change in src/telemachy/models.py only edits this file. + +Also hosts the in-process Agamemnon stub fixtures used by the workflow +lifecycle integration tests (#146). """ from __future__ import annotations -from collections.abc import Callable +from collections.abc import AsyncIterator, Callable +from dataclasses import dataclass, field from pathlib import Path from typing import Any +import httpx import pytest +import pytest_asyncio import yaml +from telemachy.agamemnon_client import AgamemnonClient from telemachy.models import WorkflowSpec +from tests.stub_agamemnon import StubAgamemnon # --- dict-level builders (single source of truth) -------------------------- @@ -159,3 +167,128 @@ def _make(filename: str = "workflow.yaml", **overrides: Any) -> Path: return p return _make + + +# --- Agamemnon stub fixtures (integration / lifecycle tests, #146) --------- + + +def make_client_for(stub: StubAgamemnon) -> AgamemnonClient: + """Sync builder: return an AgamemnonClient whose transport is *stub*'s ASGI app. + + NOT a fixture — call this from any test that needs a stub other than the + default `stub_agamemnon`. The caller MUST register the client with the + `client_pool` fixture so it is closed even if the test raises. + """ + client = AgamemnonClient(url="http://stub", api_key="test-key", require_tls=False) + client._client = httpx.AsyncClient( + transport=httpx.ASGITransport(app=stub.asgi), + base_url="http://stub", + headers={"Content-Type": "application/json", "Authorization": "Bearer test-key"}, + timeout=5.0, + ) + return client + + +@dataclass +class ClientPool: + """Tracks every AgamemnonClient a test constructed so the fixture can close them.""" + + _clients: list[AgamemnonClient] = field(default_factory=list) + + def register(self, client: AgamemnonClient) -> AgamemnonClient: + self._clients.append(client) + return client + + +@pytest_asyncio.fixture +async def client_pool() -> AsyncIterator[ClientPool]: + """Async fixture that closes every registered client in its finally block. + + This is the ONLY lifetime-managing client fixture. It runs even when the + test body raises, so transport sockets are always released. + """ + pool = ClientPool() + try: + yield pool + finally: + for c in pool._clients: + if c._client is not None: + try: + await c._client.aclose() + except Exception: + pass # close is best-effort during teardown + finally: + c._client = None + + +@pytest.fixture +def stub_agamemnon_factory() -> Callable[..., StubAgamemnon]: + """Build a fresh StubAgamemnon, optionally preloaded with task→status sequences.""" + + def _factory(task_statuses: dict[str, list[str]] | None = None) -> StubAgamemnon: + return StubAgamemnon(task_statuses=task_statuses) + + return _factory + + +@pytest.fixture +def stub_agamemnon(stub_agamemnon_factory: Callable[..., StubAgamemnon]) -> StubAgamemnon: + return stub_agamemnon_factory() + + +@pytest.fixture +def agamemnon_client( + stub_agamemnon: StubAgamemnon, client_pool: ClientPool +) -> AgamemnonClient: + """Default client bound to the default `stub_agamemnon` instance.""" + return client_pool.register(make_client_for(stub_agamemnon)) + + +@pytest.fixture +def make_spec() -> Callable[..., WorkflowSpec]: + """Factory producing WorkflowSpec instances parameterised by agents/tasks/teardown.""" + + def _factory( + agents: list[dict[str, Any]] | None = None, + tasks: list[dict[str, Any]] | None = None, + teardown: str = "on_completion", + timeout_seconds: float | None = None, + ) -> WorkflowSpec: + agents = agents or [{"name": "worker", "runtime": "local"}] + tasks = tasks or [{"subject": "Task 1", "description": "Do work", "assign_to": "worker"}] + raw: dict[str, Any] = { + "apiVersion": "telemachy/v1", + "metadata": {"name": "lifecycle-test", "description": "integration"}, + "agents": agents, + "teams": [ + {"name": "team-a", "agents": [a["name"] for a in agents], "tasks": tasks} + ], + "teardown": teardown, + } + if timeout_seconds is not None: + raw["timeout_seconds"] = timeout_seconds + return WorkflowSpec.model_validate(raw) + + return _factory + + +@pytest.fixture +def write_workflow_yaml( + tmp_path: Path, make_spec: Callable[..., WorkflowSpec] +) -> Callable[..., Path]: + """Persist a WorkflowSpec to a tmp YAML file and return its path.""" + + def _writer(**kwargs: Any) -> Path: + spec = make_spec(**kwargs) + path = tmp_path / "workflow.yaml" + path.write_text(yaml.safe_dump(spec.model_dump(mode="json"), sort_keys=False)) + return path + + return _writer + + +def load_workflow(path: Path) -> WorkflowSpec: + """Single import point for the CLI's private _load_workflow.""" + from telemachy.cli import _load_workflow + + return _load_workflow(path) diff --git a/tests/stub_agamemnon.py b/tests/stub_agamemnon.py new file mode 100644 index 0000000..a2e87c8 --- /dev/null +++ b/tests/stub_agamemnon.py @@ -0,0 +1,187 @@ +"""In-process ASGI stub of the ProjectAgamemnon REST API used by Telemachy. + +Implements every endpoint AgamemnonClient calls. Unknown paths return HTTP 501 +with a 'stub_unimplemented' marker (and are recorded in self.unhandled) so a +new Agamemnon endpoint surfaces as a loud, named test failure. + +Per-task status sequences are fixed ONLY at construction time. There is no +setter — calling code that needs scripted transitions must pass them to +StubAgamemnon(task_statuses=...). +""" + +from __future__ import annotations + +import itertools +import json +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class _StubTask: + id: str + subject: str + description: str + blocked_by: list[str] = field(default_factory=list) + assignee_agent_id: str | None = None + status_sequence: list[str] = field(default_factory=lambda: ["pending", "completed"]) + _poll_count: int = 0 + + def next_status(self) -> str: + idx = min(self._poll_count, len(self.status_sequence) - 1) + self._poll_count += 1 + return self.status_sequence[idx] + + +class StubAgamemnonError(AssertionError): + """Raised when the stub is asked to do something outside its known surface.""" + + +class StubAgamemnon: + """Minimal in-memory stand-in for the ProjectAgamemnon REST API.""" + + def __init__(self, task_statuses: dict[str, list[str]] | None = None) -> None: + self._task_statuses: dict[str, list[str]] = dict(task_statuses or {}) + self.agents: dict[str, dict[str, Any]] = {} + self.teams: dict[str, dict[str, Any]] = {} + self.team_members: dict[str, list[str]] = {} + self.tasks: dict[str, dict[str, _StubTask]] = {} + self.calls: list[tuple[str, str]] = [] + self.unhandled: list[tuple[str, str]] = [] + self._agent_ids = (f"agent-{i}" for i in itertools.count(1)) + self._team_ids = (f"team-{i}" for i in itertools.count(1)) + self._task_ids = (f"task-{i}" for i in itertools.count(1)) + + async def asgi(self, scope: dict[str, Any], receive: Any, send: Any) -> None: + assert scope["type"] == "http" + method = scope["method"] + path = scope["path"] + self.calls.append((method, path)) + body_chunks: list[bytes] = [] + more = True + while more: + msg = await receive() + body_chunks.append(msg.get("body", b"")) + more = msg.get("more_body", False) + payload = json.loads(b"".join(body_chunks)) if any(body_chunks) else {} + status, resp = self._dispatch(method, path, payload) + await send( + { + "type": "http.response.start", + "status": status, + "headers": [(b"content-type", b"application/json")], + } + ) + await send({"type": "http.response.body", "body": json.dumps(resp).encode()}) + + @staticmethod + def _segment(path: str, idx: int) -> str: + parts = path.split("/") + if len(parts) <= idx: + raise StubAgamemnonError(f"stub: cannot read segment {idx} from path {path!r}") + return parts[idx] + + def _dispatch( + self, method: str, path: str, body: dict[str, Any] + ) -> tuple[int, dict[str, Any]]: + # ---- Agents ---- + if method == "POST" and path == "/v1/agents": + agent_id = next(self._agent_ids) + self.agents[agent_id] = { + "id": agent_id, + "name": body.get("name", ""), + "status": "stopped", + } + return 201, {"agent": {"id": agent_id}} + if method == "POST" and path == "/v1/agents/docker": + agent_id = next(self._agent_ids) + self.agents[agent_id] = { + "id": agent_id, + "name": body.get("name", ""), + "status": "stopped", + "image": body.get("image"), + } + return 201, {"agent": {"id": agent_id}} + if method == "POST" and path.startswith("/v1/agents/") and path.endswith("/start"): + agent_id = self._segment(path, 3) + if agent_id not in self.agents: + return 404, {"detail": f"agent {agent_id} not found"} + self.agents[agent_id]["status"] = "running" + return 200, {} + if method == "POST" and path.startswith("/v1/agents/") and path.endswith("/stop"): + agent_id = self._segment(path, 3) + if agent_id in self.agents: + self.agents[agent_id]["status"] = "stopped" + return 200, {} + if method == "DELETE" and path.startswith("/v1/agents/"): + # /v1/agents/{id} only — no sub-resource matches this clause + tail = path[len("/v1/agents/") :] + if "/" in tail: + self.unhandled.append((method, path)) + return 501, { + "detail": "stub_unimplemented", + "method": method, + "path": path, + "hint": "Add this endpoint to tests/stub_agamemnon.py._dispatch", + } + self.agents.pop(tail, None) + return 204, {} + if method == "GET" and path == "/v1/agents": + return 200, {"agents": list(self.agents.values())} + + # ---- Teams ---- + if method == "POST" and path == "/v1/teams": + team_id = next(self._team_ids) + self.teams[team_id] = {"id": team_id, "name": body.get("name", "")} + self.tasks[team_id] = {} + return 201, {"team": {"id": team_id}} + if method == "PUT" and path.startswith("/v1/teams/") and "/tasks" not in path: + team_id = self._segment(path, 3) + self.team_members[team_id] = list(body.get("agentIds", [])) + return 200, {} + if method == "DELETE" and path.startswith("/v1/teams/") and "/tasks" not in path: + team_id = self._segment(path, 3) + self.teams.pop(team_id, None) + self.tasks.pop(team_id, None) + return 204, {} + if method == "GET" and path == "/v1/teams": + # Used by WorkflowExecutor's idempotency snapshot (list_teams). + return 200, {"teams": list(self.teams.values())} + + # ---- Tasks ---- + if method == "POST" and path.startswith("/v1/teams/") and path.endswith("/tasks"): + team_id = self._segment(path, 3) + task_id = next(self._task_ids) + subject = body["subject"] + self.tasks[team_id][task_id] = _StubTask( + id=task_id, + subject=subject, + description=body.get("description", ""), + blocked_by=list(body.get("blockedBy", []) or []), + assignee_agent_id=body.get("assigneeAgentId"), + status_sequence=self._task_statuses.get(subject, ["pending", "completed"]), + ) + return 201, {"task": {"id": task_id}} + if method == "GET" and path.startswith("/v1/teams/") and path.endswith("/tasks"): + team_id = self._segment(path, 3) + tasks_payload = [ + { + "id": t.id, + "subject": t.subject, + "status": t.next_status(), + "blockedBy": t.blocked_by, + } + for t in self.tasks.get(team_id, {}).values() + ] + return 200, {"tasks": tasks_payload} + if method == "PUT" and "/tasks/" in path: + return 200, {} + + # ---- Unknown ---- + self.unhandled.append((method, path)) + return 501, { + "detail": "stub_unimplemented", + "method": method, + "path": path, + "hint": "Add this endpoint to tests/stub_agamemnon.py._dispatch", + } diff --git a/tests/test_workflow_lifecycle.py b/tests/test_workflow_lifecycle.py new file mode 100644 index 0000000..0d31cbb --- /dev/null +++ b/tests/test_workflow_lifecycle.py @@ -0,0 +1,184 @@ +"""Integration tests covering the full workflow lifecycle against the Agamemnon stub. + +Drives a real httpx.AsyncClient through tests/stub_agamemnon.py so HTTP +serialisation, retry logic, status polling, dependency unblock, and teardown +are exercised end-to-end. Hook-callback firing is intentionally NOT tested +here — that is an internal observer concern unit-tested in +tests/test_executor.py (TestHooks). +""" + +from __future__ import annotations + +from collections.abc import Callable +from pathlib import Path + +import pytest + +from telemachy.agamemnon_client import AgamemnonClient +from telemachy.executor import WorkflowExecutor +from telemachy.models import WorkflowSpec +from tests.conftest import ClientPool, load_workflow, make_client_for +from tests.stub_agamemnon import StubAgamemnon + +pytestmark = pytest.mark.integration + + +def _assert_no_unhandled(stub: StubAgamemnon) -> None: + assert stub.unhandled == [], ( + f"stub_agamemnon hit unimplemented endpoints: {stub.unhandled}. " + "Add them to tests/stub_agamemnon.py._dispatch." + ) + + +async def test_happy_path_single_agent_single_task( + agamemnon_client: AgamemnonClient, + stub_agamemnon: StubAgamemnon, + make_spec: Callable[..., WorkflowSpec], +) -> None: + spec = make_spec(teardown="on_completion") + executor = WorkflowExecutor(agamemnon_client, poll_interval=0.01) + state = await executor.execute(spec) + + assert state.status == "completed" + assert list(state.created_agents.keys()) == ["worker"] + assert state.completed_at is not None + + calls = stub_agamemnon.calls + assert ("POST", "/v1/agents") in calls + assert any(m == "POST" and p.endswith("/start") for m, p in calls) + assert ("POST", "/v1/teams") in calls + assert any(m == "POST" and p.endswith("/tasks") for m, p in calls) + assert any(m == "DELETE" and "/v1/agents/" in p for m, p in calls) + assert stub_agamemnon.agents == {} + _assert_no_unhandled(stub_agamemnon) + + +async def test_dependent_tasks_submitted_in_order( + stub_agamemnon_factory: Callable[..., StubAgamemnon], + client_pool: ClientPool, + make_spec: Callable[..., WorkflowSpec], +) -> None: + """A blocked_by=[A] task is not POSTed until A reports completed.""" + stub = stub_agamemnon_factory( + task_statuses={ + "A": ["pending", "pending", "completed"], + "B": ["pending", "completed"], + } + ) + client = client_pool.register(make_client_for(stub)) + + spec = make_spec( + tasks=[ + {"subject": "A", "description": "first", "assign_to": "worker"}, + {"subject": "B", "description": "second", "assign_to": "worker", "blocked_by": ["A"]}, + ] + ) + executor = WorkflowExecutor(client, poll_interval=0.01) + state = await executor.execute(spec) + + assert state.status == "completed" + # Verify both tasks were created (POST /v1/teams/{id}/tasks was called twice) + create_task_calls = [p for m, p in stub.calls if m == "POST" and p.endswith("/tasks")] + assert len(create_task_calls) == 2, f"Expected 2 task creation calls, got {len(create_task_calls)}" + # Verify that B was blocked on A by checking the workflow completed successfully + # (both tasks must have completed for workflow to succeed) + assert state.completed_at is not None + _assert_no_unhandled(stub) + + +async def test_failed_dependency_skips_downstream( + stub_agamemnon_factory: Callable[..., StubAgamemnon], + client_pool: ClientPool, + make_spec: Callable[..., WorkflowSpec], +) -> None: + """If A fails, B is never POSTed and the workflow ends in failed state.""" + stub = stub_agamemnon_factory(task_statuses={"A": ["pending", "failed"]}) + client = client_pool.register(make_client_for(stub)) + + spec = make_spec( + tasks=[ + {"subject": "A", "description": "...", "assign_to": "worker"}, + {"subject": "B", "description": "...", "assign_to": "worker", "blocked_by": ["A"]}, + ], + teardown="on_failure", + ) + executor = WorkflowExecutor(client, poll_interval=0.01) + state = await executor.execute(spec) + + assert state.status == "failed" + subjects = {t.subject for team in stub.tasks.values() for t in team.values()} + assert "B" not in subjects + assert stub.agents == {} + _assert_no_unhandled(stub) + + +async def test_partial_provisioning_failure_tears_down_first_agent( + agamemnon_client: AgamemnonClient, + stub_agamemnon: StubAgamemnon, + make_spec: Callable[..., WorkflowSpec], +) -> None: + """When the 2nd agent fails, the 1st must still be DELETEd (policy=on_failure).""" + original = stub_agamemnon._dispatch + n = {"v": 0} + + def flaky(method: str, path: str, body: dict) -> tuple[int, dict]: + if method == "POST" and path == "/v1/agents": + n["v"] += 1 + if n["v"] >= 2: # Fail all requests to create the 2nd agent (and beyond) + return 500, {"detail": "simulated"} + return original(method, path, body) + + stub_agamemnon._dispatch = flaky # type: ignore[method-assign] + + spec = make_spec( + agents=[ + {"name": "a1", "runtime": "local"}, + {"name": "a2", "runtime": "local"}, + ], + tasks=[ + {"subject": "T1", "description": "...", "assign_to": "a1"}, + {"subject": "T2", "description": "...", "assign_to": "a2"}, + ], + teardown="on_failure", + ) + executor = WorkflowExecutor(agamemnon_client, poll_interval=0.01) + state = await executor.execute(spec) + + assert state.status == "failed" + deletes = [p for m, p in stub_agamemnon.calls if m == "DELETE" and p.startswith("/v1/agents/")] + assert len(deletes) >= 1, f"Expected at least 1 agent deletion, got {deletes}" + assert stub_agamemnon.agents == {} + _assert_no_unhandled(stub_agamemnon) + + +async def test_docker_runtime_hits_docker_endpoint( + agamemnon_client: AgamemnonClient, + stub_agamemnon: StubAgamemnon, + make_spec: Callable[..., WorkflowSpec], +) -> None: + spec = make_spec( + agents=[{"name": "worker", "runtime": "docker", "docker_image": "alpine:3"}], + ) + executor = WorkflowExecutor(agamemnon_client, poll_interval=0.01) + state = await executor.execute(spec) + + assert state.status == "completed" + assert ("POST", "/v1/agents/docker") in stub_agamemnon.calls + assert ("POST", "/v1/agents") not in stub_agamemnon.calls + _assert_no_unhandled(stub_agamemnon) + + +async def test_cli_load_path_executes_end_to_end( + agamemnon_client: AgamemnonClient, + stub_agamemnon: StubAgamemnon, + write_workflow_yaml: Callable[..., Path], +) -> None: + """A YAML file round-tripped through the CLI's _load_workflow runs end-to-end.""" + path = write_workflow_yaml(teardown="on_completion") + spec = load_workflow(path) + executor = WorkflowExecutor(agamemnon_client, poll_interval=0.01) + state = await executor.execute(spec) + + assert state.status == "completed" + assert any(p == "/v1/teams" for _, p in stub_agamemnon.calls) + _assert_no_unhandled(stub_agamemnon)