Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,22 @@ teardown: on_completion # on_completion | on_failure | never
- `on_failure` — delete only on failure (preserve state on success for inspection)
- `never` — never auto-teardown; manual cleanup required

## Idempotency

Telemachy tags every agent and team it creates with a `tlm-<workflow>-<resource>` key stored in
Agamemnon. When a workflow is re-run (e.g. after a partial failure), resources whose keys already
exist in Agamemnon are reused rather than re-created, making retries safe by default.

**Team membership caveat:** When a team is reused from a prior run, its member list is trusted
verbatim from Agamemnon and is not reconciled against the current workflow spec. If a partial prior
run left a team with stale agent IDs (e.g. because an agent was re-created under a new ID), the
reused team will still point at the old agent ID and the freshly-created member will not be wired
in. Use `--force` to force creation of fresh resources and avoid this inconsistency:

```bash
just run workflows/example.yaml -- --force
```

## Development

```bash
Expand Down
13 changes: 6 additions & 7 deletions pixi.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 18 additions & 7 deletions src/telemachy/agamemnon_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,17 @@ async def _request_with_retry(

# === Agent endpoints ===

async def create_agent(self, spec: AgentSpec) -> str:
async def create_agent(self, spec: AgentSpec, idempotency_name: str | None = None) -> str:
"""Create a local or docker agent. Returns the Agamemnon agent id."""
if spec.runtime == "docker":
return await self._create_docker_agent(spec)
return await self._create_local_agent(spec)
return await self._create_docker_agent(spec, idempotency_name)
return await self._create_local_agent(spec, idempotency_name)

async def _create_local_agent(self, spec: AgentSpec) -> str:
async def _create_local_agent(
self, spec: AgentSpec, idempotency_name: str | None = None
) -> str:
payload: dict[str, object] = {
"name": spec.name,
"name": idempotency_name or spec.name,
"label": spec.name,
"program": spec.program,
"workingDirectory": spec.working_dir,
Expand All @@ -152,9 +154,11 @@ async def _create_local_agent(self, spec: AgentSpec) -> str:
self._raise_for_status(response)
return str(_require(response.json(), "agent", "id", context="create_agent"))

async def _create_docker_agent(self, spec: AgentSpec) -> str:
async def _create_docker_agent(
self, spec: AgentSpec, idempotency_name: str | None = None
) -> str:
payload: dict[str, object] = {
"name": spec.name,
"name": idempotency_name or spec.name,
"hostId": self._host_id,
"image": spec.docker_image,
"cpus": spec.cpus,
Expand Down Expand Up @@ -188,6 +192,13 @@ async def list_agents(self) -> list[dict[str, object]]:
agents: list[dict[str, object]] = response.json().get("agents", [])
return agents

async def list_teams(self) -> list[dict[str, object]]:
"""List all teams. Used to detect prior idempotent provisioning."""
response = await self._request_with_retry("GET", "/v1/teams")
self._raise_for_status(response)
teams: list[dict[str, object]] = response.json().get("teams", [])
return teams

# === Team endpoints ===

async def create_team(self, name: str, agent_ids: list[str]) -> str:
Expand Down
40 changes: 38 additions & 2 deletions src/telemachy/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ def run(
bool,
typer.Option("--dry-run/--no-dry-run", help="Simulate execution without calling Agamemnon"),
] = False,
force: Annotated[
bool,
typer.Option(
"--force/--no-force",
help="Bypass idempotency lookup and create fresh agents/teams. "
"Pre-existing tlm-* resources from prior runs are NOT deleted.",
),
] = False,
) -> None:
"""Execute a workflow YAML file."""
_validate_workflow_path(workflow_path)
Expand All @@ -137,7 +145,7 @@ def run(
if dry_run:
console.print(f"[bold yellow][dry-run][/bold yellow] Simulating workflow: {spec.name}")
_print_plan(spec)
state = asyncio.run(run_workflow(spec, dry_run=True))
state = asyncio.run(run_workflow(spec, dry_run=True, force=force))
console.print(
f"[bold yellow][dry-run][/bold yellow] Simulation complete. id={state.workflow_id}"
)
Expand All @@ -149,6 +157,8 @@ def run(
total_tasks = sum(len(team.tasks) for team in spec.teams)

async def _run_with_signals() -> None:
from telemachy.idempotency import make_key

stop_event = asyncio.Event()
completed_count = 0

Expand Down Expand Up @@ -185,7 +195,33 @@ def _on_task_complete(**kwargs: object) -> None:
)

async with AgamemnonClient(**settings.client_kwargs()) as client:
executor = WorkflowExecutor(client, stop_event=stop_event)
# Take ONE snapshot, use it for both the --force warning AND
# the executor's lookup tables (avoid two round-trips).
snapshot: tuple[list, list] | None = None
if not force:
snapshot = (await client.list_agents(), await client.list_teams())
else:
agents_now = await client.list_agents()
teams_now = await client.list_teams()
expected = {make_key(spec.name, a.name) for a in spec.agents} | {
make_key(spec.name, t.name) for t in spec.teams
}
matched = [a for a in agents_now if str(a.get("name", "")) in expected]
matched += [t for t in teams_now if str(t.get("name", "")) in expected]
if matched:
err_console.print(
"[yellow]--force: bypassing idempotency; "
f"{len(matched)} pre-existing tlm-* resource(s) for "
f"workflow '{spec.name}' will be left behind. "
"Clean up manually if desired.[/yellow]"
)

executor = WorkflowExecutor(
client,
stop_event=stop_event,
force=force,
existing_snapshot=snapshot,
)
executor.add_hook("on_task_complete", _on_task_complete)
result = await executor.execute(spec)

Expand Down
Loading
Loading