Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@
- Use `httpx.AsyncClient` for all HTTP calls; never `requests`.
- Pydantic v2 models for all structured data.
- Errors from Agamemnon should raise typed exceptions, not generic ones.
- Tests use `pytest-asyncio` and mock the `AgamemnonClient` at the boundary.
- Tests are split into **unit** tests (mock `AgamemnonClient`) and **integration** tests (drive a real `httpx.AsyncClient` through `tests/stub_agamemnon.py`, an in-process ASGI stub). Mark new lifecycle/end-to-end tests with `@pytest.mark.integration`. `just test` runs the full suite (unit + integration); `just test-unit` skips integration for fast iteration; `just test-integration` runs only the lifecycle suite. The stub returns HTTP 501 (not 404) for any endpoint it does not implement so that a new Agamemnon endpoint surfaces as a named test failure. Integration tests construct stub-bound clients through `make_client_for(stub)` and register them with the `client_pool` fixture — never inline `httpx.AsyncClient(...)` in a test.

Check failure on line 148 in CLAUDE.md

View workflow job for this annotation

GitHub Actions / markdownlint

Line length

CLAUDE.md:148:121 MD013/line-length Line length [Expected: 120; Actual: 737] https://github.com/DavidAnson/markdownlint/blob/v0.40.0/doc/md013.md
- CI enforces a `--cov-fail-under=75` coverage floor (sourced from `pyproject.toml` `[tool.coverage.report]`). Local `just test` does not pass `--cov` by default — reproduce the CI check with `pixi run pytest --cov=telemachy --cov-report=term-missing`.

Check failure on line 149 in CLAUDE.md

View workflow job for this annotation

GitHub Actions / markdownlint

Line length

CLAUDE.md:149:121 MD013/line-length Line length [Expected: 120; Actual: 252] https://github.com/DavidAnson/markdownlint/blob/v0.40.0/doc/md013.md

## Agent Guardrails

Expand Down Expand Up @@ -192,7 +192,7 @@
just format # ruff format
```

## Planned Features

Check failure on line 195 in CLAUDE.md

View workflow job for this annotation

GitHub Actions / markdownlint

Multiple headings with the same content

CLAUDE.md:195 MD024/no-duplicate-heading Multiple headings with the same content [Context: "Planned Features"] https://github.com/DavidAnson/markdownlint/blob/v0.40.0/doc/md024.md

`status`, `list`, and `cancel` commands are not yet implemented. They
require a persistent workflow-state backend (no design selected yet —
Expand All @@ -204,7 +204,7 @@

- `just test` — full suite (unit + integration); this is what CI runs
- `just test-unit` — unit tests only; mocks `AgamemnonClient` at the method level
- `just test-integration` — integration tests under `tests/integration/`; exercises `WorkflowExecutor` against an in-process mock Agamemnon HTTP server (`respx`)

Check failure on line 207 in CLAUDE.md

View workflow job for this annotation

GitHub Actions / markdownlint

Line length

CLAUDE.md:207:121 MD013/line-length Line length [Expected: 120; Actual: 161] https://github.com/DavidAnson/markdownlint/blob/v0.40.0/doc/md013.md

Integration tests must declare `pytestmark = [pytest.mark.integration, pytest.mark.asyncio]` at the top of each module.

Expand Down
3 changes: 2 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ schema:

# === Development ===

# Run the test suite
# Run the full test suite (unit + integration). Lifecycle tests run by default
# to satisfy issue #146; use `just test-unit` to skip them during fast iteration.
test:
pixi run pytest

Expand Down
135 changes: 134 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,26 @@
test_cli.py with a single set of typed factory functions. Each test calls
the factory with only the fields it cares about; defaults come from one
place so a schema change in src/telemachy/models.py only edits this file.

Also hosts the in-process Agamemnon stub fixtures used by the workflow
lifecycle integration tests (#146).
"""

from __future__ import annotations

from collections.abc import Callable
from collections.abc import AsyncIterator, Callable
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

import httpx
import pytest
import pytest_asyncio
import yaml

from telemachy.agamemnon_client import AgamemnonClient
from telemachy.models import WorkflowSpec
from tests.stub_agamemnon import StubAgamemnon

# --- dict-level builders (single source of truth) --------------------------

Expand Down Expand Up @@ -159,3 +167,128 @@ def _make(filename: str = "workflow.yaml", **overrides: Any) -> Path:
return p

return _make


# --- Agamemnon stub fixtures (integration / lifecycle tests, #146) ---------


def make_client_for(stub: StubAgamemnon) -> AgamemnonClient:
"""Sync builder: return an AgamemnonClient whose transport is *stub*'s ASGI app.

NOT a fixture — call this from any test that needs a stub other than the
default `stub_agamemnon`. The caller MUST register the client with the
`client_pool` fixture so it is closed even if the test raises.
"""
client = AgamemnonClient(url="http://stub", api_key="test-key", require_tls=False)
client._client = httpx.AsyncClient(
transport=httpx.ASGITransport(app=stub.asgi),
base_url="http://stub",
headers={"Content-Type": "application/json", "Authorization": "Bearer test-key"},
timeout=5.0,
)
return client


@dataclass
class ClientPool:
"""Tracks every AgamemnonClient a test constructed so the fixture can close them."""

_clients: list[AgamemnonClient] = field(default_factory=list)

def register(self, client: AgamemnonClient) -> AgamemnonClient:
self._clients.append(client)
return client


@pytest_asyncio.fixture
async def client_pool() -> AsyncIterator[ClientPool]:
"""Async fixture that closes every registered client in its finally block.

This is the ONLY lifetime-managing client fixture. It runs even when the
test body raises, so transport sockets are always released.
"""
pool = ClientPool()
try:
yield pool
finally:
for c in pool._clients:
if c._client is not None:
try:
await c._client.aclose()
except Exception:
pass # close is best-effort during teardown
finally:
c._client = None


@pytest.fixture
def stub_agamemnon_factory() -> Callable[..., StubAgamemnon]:
"""Build a fresh StubAgamemnon, optionally preloaded with task→status sequences."""

def _factory(task_statuses: dict[str, list[str]] | None = None) -> StubAgamemnon:
return StubAgamemnon(task_statuses=task_statuses)

return _factory


@pytest.fixture
def stub_agamemnon(stub_agamemnon_factory: Callable[..., StubAgamemnon]) -> StubAgamemnon:
return stub_agamemnon_factory()


@pytest.fixture
def agamemnon_client(
stub_agamemnon: StubAgamemnon, client_pool: ClientPool
) -> AgamemnonClient:
"""Default client bound to the default `stub_agamemnon` instance."""
return client_pool.register(make_client_for(stub_agamemnon))


@pytest.fixture
def make_spec() -> Callable[..., WorkflowSpec]:
"""Factory producing WorkflowSpec instances parameterised by agents/tasks/teardown."""

def _factory(
agents: list[dict[str, Any]] | None = None,
tasks: list[dict[str, Any]] | None = None,
teardown: str = "on_completion",
timeout_seconds: float | None = None,
) -> WorkflowSpec:
agents = agents or [{"name": "worker", "runtime": "local"}]
tasks = tasks or [{"subject": "Task 1", "description": "Do work", "assign_to": "worker"}]
raw: dict[str, Any] = {
"apiVersion": "telemachy/v1",
"metadata": {"name": "lifecycle-test", "description": "integration"},
"agents": agents,
"teams": [
{"name": "team-a", "agents": [a["name"] for a in agents], "tasks": tasks}
],
"teardown": teardown,
}
if timeout_seconds is not None:
raw["timeout_seconds"] = timeout_seconds
return WorkflowSpec.model_validate(raw)

return _factory


@pytest.fixture
def write_workflow_yaml(
tmp_path: Path, make_spec: Callable[..., WorkflowSpec]
) -> Callable[..., Path]:
"""Persist a WorkflowSpec to a tmp YAML file and return its path."""

def _writer(**kwargs: Any) -> Path:
spec = make_spec(**kwargs)
path = tmp_path / "workflow.yaml"
path.write_text(yaml.safe_dump(spec.model_dump(mode="json"), sort_keys=False))
return path

return _writer


def load_workflow(path: Path) -> WorkflowSpec:
"""Single import point for the CLI's private _load_workflow."""
from telemachy.cli import _load_workflow

return _load_workflow(path)
187 changes: 187 additions & 0 deletions tests/stub_agamemnon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""In-process ASGI stub of the ProjectAgamemnon REST API used by Telemachy.

Implements every endpoint AgamemnonClient calls. Unknown paths return HTTP 501
with a 'stub_unimplemented' marker (and are recorded in self.unhandled) so a
new Agamemnon endpoint surfaces as a loud, named test failure.

Per-task status sequences are fixed ONLY at construction time. There is no
setter — calling code that needs scripted transitions must pass them to
StubAgamemnon(task_statuses=...).
"""

from __future__ import annotations

import itertools
import json
from dataclasses import dataclass, field
from typing import Any


@dataclass
class _StubTask:
id: str
subject: str
description: str
blocked_by: list[str] = field(default_factory=list)
assignee_agent_id: str | None = None
status_sequence: list[str] = field(default_factory=lambda: ["pending", "completed"])
_poll_count: int = 0

def next_status(self) -> str:
idx = min(self._poll_count, len(self.status_sequence) - 1)
self._poll_count += 1
return self.status_sequence[idx]


class StubAgamemnonError(AssertionError):
"""Raised when the stub is asked to do something outside its known surface."""


class StubAgamemnon:
"""Minimal in-memory stand-in for the ProjectAgamemnon REST API."""

def __init__(self, task_statuses: dict[str, list[str]] | None = None) -> None:
self._task_statuses: dict[str, list[str]] = dict(task_statuses or {})
self.agents: dict[str, dict[str, Any]] = {}
self.teams: dict[str, dict[str, Any]] = {}
self.team_members: dict[str, list[str]] = {}
self.tasks: dict[str, dict[str, _StubTask]] = {}
self.calls: list[tuple[str, str]] = []
self.unhandled: list[tuple[str, str]] = []
self._agent_ids = (f"agent-{i}" for i in itertools.count(1))
self._team_ids = (f"team-{i}" for i in itertools.count(1))
self._task_ids = (f"task-{i}" for i in itertools.count(1))

async def asgi(self, scope: dict[str, Any], receive: Any, send: Any) -> None:
assert scope["type"] == "http"
method = scope["method"]
path = scope["path"]
self.calls.append((method, path))
body_chunks: list[bytes] = []
more = True
while more:
msg = await receive()
body_chunks.append(msg.get("body", b""))
more = msg.get("more_body", False)
payload = json.loads(b"".join(body_chunks)) if any(body_chunks) else {}
status, resp = self._dispatch(method, path, payload)
await send(
{
"type": "http.response.start",
"status": status,
"headers": [(b"content-type", b"application/json")],
}
)
await send({"type": "http.response.body", "body": json.dumps(resp).encode()})

@staticmethod
def _segment(path: str, idx: int) -> str:
parts = path.split("/")
if len(parts) <= idx:
raise StubAgamemnonError(f"stub: cannot read segment {idx} from path {path!r}")
return parts[idx]

def _dispatch(
self, method: str, path: str, body: dict[str, Any]
) -> tuple[int, dict[str, Any]]:
# ---- Agents ----
if method == "POST" and path == "/v1/agents":
agent_id = next(self._agent_ids)
self.agents[agent_id] = {
"id": agent_id,
"name": body.get("name", ""),
"status": "stopped",
}
return 201, {"agent": {"id": agent_id}}
if method == "POST" and path == "/v1/agents/docker":
agent_id = next(self._agent_ids)
self.agents[agent_id] = {
"id": agent_id,
"name": body.get("name", ""),
"status": "stopped",
"image": body.get("image"),
}
return 201, {"agent": {"id": agent_id}}
if method == "POST" and path.startswith("/v1/agents/") and path.endswith("/start"):
agent_id = self._segment(path, 3)
if agent_id not in self.agents:
return 404, {"detail": f"agent {agent_id} not found"}
self.agents[agent_id]["status"] = "running"
return 200, {}
if method == "POST" and path.startswith("/v1/agents/") and path.endswith("/stop"):
agent_id = self._segment(path, 3)
if agent_id in self.agents:
self.agents[agent_id]["status"] = "stopped"
return 200, {}
if method == "DELETE" and path.startswith("/v1/agents/"):
# /v1/agents/{id} only — no sub-resource matches this clause
tail = path[len("/v1/agents/") :]
if "/" in tail:
self.unhandled.append((method, path))
return 501, {
"detail": "stub_unimplemented",
"method": method,
"path": path,
"hint": "Add this endpoint to tests/stub_agamemnon.py._dispatch",
}
self.agents.pop(tail, None)
return 204, {}
if method == "GET" and path == "/v1/agents":
return 200, {"agents": list(self.agents.values())}

# ---- Teams ----
if method == "POST" and path == "/v1/teams":
team_id = next(self._team_ids)
self.teams[team_id] = {"id": team_id, "name": body.get("name", "")}
self.tasks[team_id] = {}
return 201, {"team": {"id": team_id}}
if method == "PUT" and path.startswith("/v1/teams/") and "/tasks" not in path:
team_id = self._segment(path, 3)
self.team_members[team_id] = list(body.get("agentIds", []))
return 200, {}
if method == "DELETE" and path.startswith("/v1/teams/") and "/tasks" not in path:
team_id = self._segment(path, 3)
self.teams.pop(team_id, None)
self.tasks.pop(team_id, None)
return 204, {}
if method == "GET" and path == "/v1/teams":
# Used by WorkflowExecutor's idempotency snapshot (list_teams).
return 200, {"teams": list(self.teams.values())}

# ---- Tasks ----
if method == "POST" and path.startswith("/v1/teams/") and path.endswith("/tasks"):
team_id = self._segment(path, 3)
task_id = next(self._task_ids)
subject = body["subject"]
self.tasks[team_id][task_id] = _StubTask(
id=task_id,
subject=subject,
description=body.get("description", ""),
blocked_by=list(body.get("blockedBy", []) or []),
assignee_agent_id=body.get("assigneeAgentId"),
status_sequence=self._task_statuses.get(subject, ["pending", "completed"]),
)
return 201, {"task": {"id": task_id}}
if method == "GET" and path.startswith("/v1/teams/") and path.endswith("/tasks"):
team_id = self._segment(path, 3)
tasks_payload = [
{
"id": t.id,
"subject": t.subject,
"status": t.next_status(),
"blockedBy": t.blocked_by,
}
for t in self.tasks.get(team_id, {}).values()
]
return 200, {"tasks": tasks_payload}
if method == "PUT" and "/tasks/" in path:
return 200, {}

# ---- Unknown ----
self.unhandled.append((method, path))
return 501, {
"detail": "stub_unimplemented",
"method": method,
"path": path,
"hint": "Add this endpoint to tests/stub_agamemnon.py._dispatch",
}
Loading
Loading