Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/usage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,15 @@ stored in dispatch's durable registry and starts one queued turn per idle transi
uv run dispatch send @docs-review "Run this after the active turn." --queue
```

Use `send --intro` for managed Codex-to-Codex coordination. It prepends a terse
reply hint like `[dispatch] From @Dispatch (<ref>). Use dispatch send ... to
reply.` The sender is derived from `CODEX_THREAD_ID`, so the current Codex
thread must already be managed by dispatch:

```bash
uv run dispatch send @docs-review "Can you sanity-check this?" --intro
```

## Thread History, Watch, And Goals

`get` is the compact managed-thread summary:
Expand Down
5 changes: 5 additions & 0 deletions skills/dispatch/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,17 @@ uv run dispatch send @my-lane "Focus on docs first." --steer
uv run dispatch send @my-lane "Stop and do this instead." --interject
uv run dispatch send @my-lane "Context: use lane publicly, thread internally." --context
uv run dispatch send @my-lane "After this finishes, summarize risks." --mode queue
uv run dispatch send @my-lane "Can you check this?" --intro
```

The mode flags and `--mode send|steer|queue|interject|context` are mutually
exclusive. `--queue` stores the message durably and starts one queued turn when
the lane is idle.

Use `--intro` when you are sending from one managed Codex thread to another and
want the recipient to know how to reply through dispatch. It derives the sender
from `CODEX_THREAD_ID`, so the current thread must already be managed.

Use `stop` to cancel the active turn without replacement text:

```bash
Expand Down
6 changes: 4 additions & 2 deletions skills/dm/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,14 @@ Contract: read-only, brief answer, reply in this lane unless asked otherwise.
Then deliver it:

```bash
uv run dispatch send <target-ref> '<message>'
uv run dispatch send <target-ref> '<message>' --intro
```

Keep DMs conversational and bounded. Prefer one ask. Include only the context
needed for the target lane to answer without reading the sender's full
transcript. Do not use `$dm` to smuggle in broad implementation work.
transcript. If the current Codex thread is not managed by dispatch, omit
`--intro` and include the sender link manually. Do not use `$dm` to smuggle in
broad implementation work.

## Harvesting

Expand Down
18 changes: 16 additions & 2 deletions src/outfitter/dispatch/contracts/derive_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import inspect
import json
import os
import shlex
from collections.abc import Callable
from dataclasses import dataclass
Expand All @@ -18,6 +19,7 @@

from .op import Op
from .registry import OpRegistry
from .schema import is_internal_field, public_schema

Invoker = Callable[[str, dict[str, object]], dict[str, object]]
Renderer = Callable[[Op, dict[str, object]], None]
Expand Down Expand Up @@ -141,6 +143,8 @@ def _parameters(op: Op, *, positionals: tuple[str, ...] = ()) -> list[inspect.Pa
parameters: list[inspect.Parameter] = []
positional_set = set(positionals)
for name, field in op.input.model_fields.items():
if is_internal_field(field):
continue
help_text = field.description or ""
kind: inspect._ParameterKind
if name in positional_set:
Expand Down Expand Up @@ -181,6 +185,10 @@ def command(
context: Annotated[
bool, typer.Option("--context", help="Inject context without waking.")
] = False,
intro: Annotated[
bool,
typer.Option("--intro", help="Prepend dispatch sender intro from CODEX_THREAD_ID."),
] = False,
json: Annotated[
bool, typer.Option("--json", help="Render machine-readable JSON output.")
] = False,
Expand All @@ -191,7 +199,13 @@ def command(
if len(chosen) > 1 or (chosen and mode != "send"):
typer.secho("dispatch: choose exactly one send mode", fg="red", err=True)
raise typer.Exit(code=2)
result = invoke(op.id, {"lane": lane, "text": text, "mode": chosen[0] if chosen else mode})
params = {"lane": lane, "text": text, "mode": chosen[0] if chosen else mode, "intro": intro}
if intro:
params["caller_thread_id"] = os.environ.get("CODEX_THREAD_ID")
result = invoke(
op.id,
params,
)
render(op, result)
_ignore_json(json)

Expand Down Expand Up @@ -414,7 +428,7 @@ def command(
data={
"command": command,
"op": op.id,
"input": op.input.model_json_schema(),
"input": public_schema(op.input.model_json_schema()),
"output": op.output.model_json_schema(),
}
)
Expand Down
3 changes: 2 additions & 1 deletion src/outfitter/dispatch/contracts/derive_mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from .op import Intent, Op
from .registry import OpRegistry
from .schema import public_schema

Safety = Literal["read", "write", "destroy"]

Expand Down Expand Up @@ -166,7 +167,7 @@ def _input_schema(ops: tuple[tuple[str, Op], ...]) -> dict[str, Any]:


def _action_input_schema(action: str, op: Op) -> dict[str, Any]:
schema = deepcopy(op.input.model_json_schema())
schema = public_schema(deepcopy(op.input.model_json_schema()))
properties = schema.get("properties")
if not isinstance(properties, dict):
properties = {}
Expand Down
26 changes: 26 additions & 0 deletions src/outfitter/dispatch/contracts/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Shared schema projection helpers."""

from __future__ import annotations

from typing import Any


def is_internal_field(field: object) -> bool:
extra = getattr(field, "json_schema_extra", None)
return isinstance(extra, dict) and extra.get("x-dispatch-internal") is True


def public_schema(schema: dict[str, Any]) -> dict[str, Any]:
properties = schema.get("properties")
if isinstance(properties, dict):
internal = {
name
for name, prop in properties.items()
if isinstance(prop, dict) and prop.get("x-dispatch-internal") is True
}
for name in internal:
del properties[name]
required = schema.get("required")
if isinstance(required, list):
schema["required"] = [name for name in required if name not in internal]
return schema
33 changes: 26 additions & 7 deletions src/outfitter/dispatch/core/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from __future__ import annotations

import asyncio
import os
from datetime import datetime, time
from pathlib import Path
from typing import TypedDict, cast
Expand Down Expand Up @@ -88,6 +89,7 @@
from .sync import scan_codex_jsonl

_READ_ONLY = SandboxPolicy(type="readOnly")
_INTRO_TEMPLATE = '[dispatch] From {handle} ({ref}). Use `dispatch send {ref} "..."` to reply.'

# Bound attach metadata reads: if the app-server is wedged, fail clearly and never
# half-register (the registry write only follows a successful metadata read).
Expand Down Expand Up @@ -152,6 +154,22 @@ def _managed_identity(lane: Lane) -> _ManagedIdentityPayload:
}


async def _apply_send_intro(inp: SendInput, ctx: Ctx) -> str:
if not inp.intro:
return inp.text

sender_id = inp.caller_thread_id or os.environ.get("CODEX_THREAD_ID")
if not sender_id:
raise ValidationError("--intro requires CODEX_THREAD_ID from the current Codex thread")

sender = await ctx.registry.find_lane(sender_id)
if sender is None:
raise ValidationError("--intro requires the current Codex thread to be managed by dispatch")

intro = _INTRO_TEMPLATE.format(handle=sender.handle, ref=sender.ref)
return f"{intro}\n\n{inp.text}"


def _sync_view(sync: LaneSync | None) -> LaneSyncView:
if sync is None:
return LaneSyncView()
Expand Down Expand Up @@ -453,30 +471,31 @@ async def send(inp: LaneTextInput, ctx: Ctx) -> ActionAck:


async def send_message(inp: SendInput, ctx: Ctx) -> ActionAck:
text = await _apply_send_intro(inp, ctx)
match inp.mode:
case "send":
return await send(LaneTextInput(lane=inp.lane, text=inp.text), ctx)
return await send(LaneTextInput(lane=inp.lane, text=text), ctx)
case "steer":
return await steer(LaneTextInput(lane=inp.lane, text=inp.text), ctx)
return await steer(LaneTextInput(lane=inp.lane, text=text), ctx)
case "context":
return await brief(LaneTextInput(lane=inp.lane, text=inp.text), ctx)
return await brief(LaneTextInput(lane=inp.lane, text=text), ctx)
case "interject":
lane = await _resolve(ctx, inp.lane)
_require_writable(lane)
turn_id = _require_active_turn(lane, "interject")
await ctx.client.turn_interrupt(lane.id, turn_id)
await ctx.registry.log_action("interrupt", lane=lane.id, detail="interject")
await ctx.client.turn_start(
lane.id, inp.text, cwd=lane.cwd or ".", sandbox_policy=_READ_ONLY
lane.id, text, cwd=lane.cwd or ".", sandbox_policy=_READ_ONLY
)
await ctx.registry.update_lane_status(lane.id, "busy")
await ctx.registry.log_action("send", lane=lane.id, detail=inp.text[:120])
await ctx.registry.log_action("send", lane=lane.id, detail=text[:120])
return ActionAck(**_managed_identity(lane), op="interject")
case "queue":
lane = await _resolve(ctx, inp.lane)
_require_writable(lane)
message = await ctx.registry.enqueue_message(lane=lane.id, text=inp.text)
await ctx.registry.log_action("queue", lane=lane.id, detail=inp.text[:120])
message = await ctx.registry.enqueue_message(lane=lane.id, text=text)
await ctx.registry.log_action("queue", lane=lane.id, detail=text[:120])
if lane.status == "idle":
await queue.drain_next_queued_message(ctx, lane.id)
pending = await ctx.registry.pending_message_count(lane.id)
Expand Down
9 changes: 9 additions & 0 deletions src/outfitter/dispatch/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ class SendInput(LaneTextInput):
"model-visible context."
),
)
intro: bool = Field(
default=False,
description="Prepend a dispatch reply hint derived from the current CODEX_THREAD_ID.",
)
caller_thread_id: str | None = Field(
default=None,
exclude=True,
json_schema_extra={"x-dispatch-internal": True},
)


class LaneInput(BaseModel):
Expand Down
3 changes: 3 additions & 0 deletions src/outfitter/dispatch/surfaces/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import asyncio
import contextlib
import json
import os
from pathlib import Path

from mcp.server.lowlevel import Server
Expand Down Expand Up @@ -126,6 +127,8 @@ def _route_tool_call(
return _tool_error(f"unknown dispatch MCP action {tool_name}/{action}", code=-32601)
params = dict(arguments)
del params["op"]
if route.op.id == "send" and params.get("intro") is True:
params["caller_thread_id"] = os.environ.get("CODEX_THREAD_ID")
return route.op.id, params


Expand Down
64 changes: 64 additions & 0 deletions tests/core/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,70 @@ async def test_send_modes_context_and_interject(store: Registry) -> None:
assert (await store.get_lane("lane-1")).status == "busy"


async def test_send_intro_prepends_managed_sender_from_codex_thread_id(
store: Registry, monkeypatch: pytest.MonkeyPatch
) -> None:
client = FakeLaneClient()
ctx = make_ctx(store, client)
await handlers.open_lane(OpenInput(name="target"), ctx)
client.next_thread_id = "lane-2"
sender = await handlers.open_lane(OpenInput(name="Dispatch"), ctx)
monkeypatch.setenv("CODEX_THREAD_ID", sender.id)

ack = await handlers.send_message(SendInput(lane="@target", text="hello", intro=True), ctx)

assert ack.lane == "lane-1"
sent = next(kw["text"] for name, kw in client.calls if name == "turn_start")
assert sent == (
f'[dispatch] From @Dispatch ({sender.ref}). Use `dispatch send {sender.ref} "..."` '
"to reply.\n\nhello"
)


async def test_send_intro_requires_codex_thread_id(
store: Registry, monkeypatch: pytest.MonkeyPatch
) -> None:
ctx = make_ctx(store, FakeLaneClient())
await handlers.open_lane(OpenInput(name="target"), ctx)
monkeypatch.delenv("CODEX_THREAD_ID", raising=False)

with pytest.raises(ValidationError, match="CODEX_THREAD_ID"):
await handlers.send_message(SendInput(lane="@target", text="hello", intro=True), ctx)


async def test_send_intro_requires_managed_sender(
store: Registry, monkeypatch: pytest.MonkeyPatch
) -> None:
ctx = make_ctx(store, FakeLaneClient())
await handlers.open_lane(OpenInput(name="target"), ctx)
monkeypatch.setenv("CODEX_THREAD_ID", "unknown-thread")

with pytest.raises(ValidationError, match="managed by dispatch"):
await handlers.send_message(SendInput(lane="@target", text="hello", intro=True), ctx)


async def test_send_intro_applies_to_queued_delivery(
store: Registry, monkeypatch: pytest.MonkeyPatch
) -> None:
client = FakeLaneClient()
ctx = make_ctx(store, client)
await handlers.open_lane(OpenInput(name="target"), ctx)
await store.update_lane_status("lane-1", "busy")
client.next_thread_id = "lane-2"
sender = await handlers.open_lane(OpenInput(name="Dispatch"), ctx)
monkeypatch.setenv("CODEX_THREAD_ID", sender.id)

ack = await handlers.send_message(
SendInput(lane="@target", text="later", mode="queue", intro=True), ctx
)

assert ack.op == "queue"
queued = await store.next_pending_message("lane-1")
assert queued is not None
assert queued.text.startswith(f"[dispatch] From @Dispatch ({sender.ref}).")
assert queued.text.endswith("\n\nlater")


async def test_send_queue_persists_when_lane_is_busy(store: Registry) -> None:
client = FakeLaneClient()
ctx = make_ctx(store, client)
Expand Down
32 changes: 31 additions & 1 deletion tests/surfaces/test_derive_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import json

import pytest
import typer
from typer.testing import CliRunner

Expand All @@ -26,7 +27,35 @@ def invoke(op_id: str, params: dict[str, object]) -> dict[str, object]:

assert result.exit_code == 0
assert captured["op"] == "send"
assert captured["params"] == {"lane": "@docs", "text": "hi", "mode": "interject"}
assert captured["params"] == {
"lane": "@docs",
"text": "hi",
"mode": "interject",
"intro": False,
}


def test_send_intro_flag_maps_to_send_contract(monkeypatch: pytest.MonkeyPatch) -> None:
captured: dict[str, object] = {}
monkeypatch.setenv("CODEX_THREAD_ID", "sender-thread")

def invoke(op_id: str, params: dict[str, object]) -> dict[str, object]:
captured["op"] = op_id
captured["params"] = params
return {"lane": "L1", "op": "send", "accepted": True}

app = derive_cli(REGISTRY, invoke)
result = runner.invoke(app, ["send", "@docs", "hi", "--intro"])

assert result.exit_code == 0
assert captured["op"] == "send"
assert captured["params"] == {
"lane": "@docs",
"text": "hi",
"mode": "send",
"intro": True,
"caller_thread_id": "sender-thread",
}


def test_send_rejects_multiple_modes() -> None:
Expand Down Expand Up @@ -279,6 +308,7 @@ def test_schema_command_prints_derived_schema_without_daemon() -> None:
assert result.exit_code == 0
assert '"op": "send"' in result.output
assert '"mode"' in result.output
assert "caller_thread_id" not in result.output
assert "reserved for durable queued delivery" not in result.output


Expand Down
2 changes: 2 additions & 0 deletions tests/surfaces/test_derive_mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ def test_action_schema_and_annotations_from_op() -> None:
assert lane_write.annotations.idempotentHint is False
one_of = lane_write.inputSchema["oneOf"]
new_schema = next(s for s in one_of if s["properties"]["op"]["const"] == "new")
send_schema = next(s for s in one_of if s["properties"]["op"]["const"] == "send")
assert set(new_schema["properties"]) >= {"op", "name", "preset", "text", "send"}
assert "caller_thread_id" not in send_schema["properties"]
assert {s["properties"]["op"]["const"] for s in one_of} >= {
"fork",
"goal_set",
Expand Down
Loading