From f946feb6fd2e6bc8e25b9c2e608afed30ac03087 Mon Sep 17 00:00:00 2001 From: Matt Galligan Date: Fri, 5 Jun 2026 17:22:28 -0400 Subject: [PATCH] feat: add dispatch send intro --- docs/usage/README.md | 9 +++ skills/dispatch/SKILL.md | 5 ++ skills/dm/SKILL.md | 6 +- .../dispatch/contracts/derive_cli.py | 18 +++++- .../dispatch/contracts/derive_mcp.py | 3 +- src/outfitter/dispatch/contracts/schema.py | 26 ++++++++ src/outfitter/dispatch/core/handlers.py | 33 ++++++++-- src/outfitter/dispatch/core/models.py | 9 +++ src/outfitter/dispatch/surfaces/mcp.py | 3 + tests/core/test_handlers.py | 64 +++++++++++++++++++ tests/surfaces/test_derive_cli.py | 32 +++++++++- tests/surfaces/test_derive_mcp.py | 2 + tests/surfaces/test_mcp_routing.py | 25 +++++++- tests/surfaces/test_parity.py | 7 +- 14 files changed, 226 insertions(+), 16 deletions(-) create mode 100644 src/outfitter/dispatch/contracts/schema.py diff --git a/docs/usage/README.md b/docs/usage/README.md index c77e9d8..0d6679a 100644 --- a/docs/usage/README.md +++ b/docs/usage/README.md @@ -230,6 +230,15 @@ stored in dispatch's durable registry and starts one queued turn per idle transi uv run dispatch send @docs-review "Run this after the active turn." --queue ``` +Use `send --intro` for managed Codex-to-Codex coordination. It prepends a terse +reply hint like `[dispatch] From @Dispatch (). Use dispatch send ... to +reply.` The sender is derived from `CODEX_THREAD_ID`, so the current Codex +thread must already be managed by dispatch: + +```bash +uv run dispatch send @docs-review "Can you sanity-check this?" --intro +``` + ## Thread History, Watch, And Goals `get` is the compact managed-thread summary: diff --git a/skills/dispatch/SKILL.md b/skills/dispatch/SKILL.md index 5f7f623..d9944a8 100644 --- a/skills/dispatch/SKILL.md +++ b/skills/dispatch/SKILL.md @@ -165,12 +165,17 @@ uv run dispatch send @my-lane "Focus on docs first." --steer uv run dispatch send @my-lane "Stop and do this instead." --interject uv run dispatch send @my-lane "Context: use lane publicly, thread internally." --context uv run dispatch send @my-lane "After this finishes, summarize risks." --mode queue +uv run dispatch send @my-lane "Can you check this?" --intro ``` The mode flags and `--mode send|steer|queue|interject|context` are mutually exclusive. `--queue` stores the message durably and starts one queued turn when the lane is idle. +Use `--intro` when you are sending from one managed Codex thread to another and +want the recipient to know how to reply through dispatch. It derives the sender +from `CODEX_THREAD_ID`, so the current thread must already be managed. + Use `stop` to cancel the active turn without replacement text: ```bash diff --git a/skills/dm/SKILL.md b/skills/dm/SKILL.md index 7d9de80..220106a 100644 --- a/skills/dm/SKILL.md +++ b/skills/dm/SKILL.md @@ -65,12 +65,14 @@ Contract: read-only, brief answer, reply in this lane unless asked otherwise. Then deliver it: ```bash -uv run dispatch send '' +uv run dispatch send '' --intro ``` Keep DMs conversational and bounded. Prefer one ask. Include only the context needed for the target lane to answer without reading the sender's full -transcript. Do not use `$dm` to smuggle in broad implementation work. +transcript. If the current Codex thread is not managed by dispatch, omit +`--intro` and include the sender link manually. Do not use `$dm` to smuggle in +broad implementation work. ## Harvesting diff --git a/src/outfitter/dispatch/contracts/derive_cli.py b/src/outfitter/dispatch/contracts/derive_cli.py index 344c62a..3d7effd 100644 --- a/src/outfitter/dispatch/contracts/derive_cli.py +++ b/src/outfitter/dispatch/contracts/derive_cli.py @@ -9,6 +9,7 @@ import inspect import json +import os import shlex from collections.abc import Callable from dataclasses import dataclass @@ -18,6 +19,7 @@ from .op import Op from .registry import OpRegistry +from .schema import is_internal_field, public_schema Invoker = Callable[[str, dict[str, object]], dict[str, object]] Renderer = Callable[[Op, dict[str, object]], None] @@ -141,6 +143,8 @@ def _parameters(op: Op, *, positionals: tuple[str, ...] = ()) -> list[inspect.Pa parameters: list[inspect.Parameter] = [] positional_set = set(positionals) for name, field in op.input.model_fields.items(): + if is_internal_field(field): + continue help_text = field.description or "" kind: inspect._ParameterKind if name in positional_set: @@ -181,6 +185,10 @@ def command( context: Annotated[ bool, typer.Option("--context", help="Inject context without waking.") ] = False, + intro: Annotated[ + bool, + typer.Option("--intro", help="Prepend dispatch sender intro from CODEX_THREAD_ID."), + ] = False, json: Annotated[ bool, typer.Option("--json", help="Render machine-readable JSON output.") ] = False, @@ -191,7 +199,13 @@ def command( if len(chosen) > 1 or (chosen and mode != "send"): typer.secho("dispatch: choose exactly one send mode", fg="red", err=True) raise typer.Exit(code=2) - result = invoke(op.id, {"lane": lane, "text": text, "mode": chosen[0] if chosen else mode}) + params = {"lane": lane, "text": text, "mode": chosen[0] if chosen else mode, "intro": intro} + if intro: + params["caller_thread_id"] = os.environ.get("CODEX_THREAD_ID") + result = invoke( + op.id, + params, + ) render(op, result) _ignore_json(json) @@ -414,7 +428,7 @@ def command( data={ "command": command, "op": op.id, - "input": op.input.model_json_schema(), + "input": public_schema(op.input.model_json_schema()), "output": op.output.model_json_schema(), } ) diff --git a/src/outfitter/dispatch/contracts/derive_mcp.py b/src/outfitter/dispatch/contracts/derive_mcp.py index f9ba328..ae523c4 100644 --- a/src/outfitter/dispatch/contracts/derive_mcp.py +++ b/src/outfitter/dispatch/contracts/derive_mcp.py @@ -15,6 +15,7 @@ from .op import Intent, Op from .registry import OpRegistry +from .schema import public_schema Safety = Literal["read", "write", "destroy"] @@ -166,7 +167,7 @@ def _input_schema(ops: tuple[tuple[str, Op], ...]) -> dict[str, Any]: def _action_input_schema(action: str, op: Op) -> dict[str, Any]: - schema = deepcopy(op.input.model_json_schema()) + schema = public_schema(deepcopy(op.input.model_json_schema())) properties = schema.get("properties") if not isinstance(properties, dict): properties = {} diff --git a/src/outfitter/dispatch/contracts/schema.py b/src/outfitter/dispatch/contracts/schema.py new file mode 100644 index 0000000..ea1dce8 --- /dev/null +++ b/src/outfitter/dispatch/contracts/schema.py @@ -0,0 +1,26 @@ +"""Shared schema projection helpers.""" + +from __future__ import annotations + +from typing import Any + + +def is_internal_field(field: object) -> bool: + extra = getattr(field, "json_schema_extra", None) + return isinstance(extra, dict) and extra.get("x-dispatch-internal") is True + + +def public_schema(schema: dict[str, Any]) -> dict[str, Any]: + properties = schema.get("properties") + if isinstance(properties, dict): + internal = { + name + for name, prop in properties.items() + if isinstance(prop, dict) and prop.get("x-dispatch-internal") is True + } + for name in internal: + del properties[name] + required = schema.get("required") + if isinstance(required, list): + schema["required"] = [name for name in required if name not in internal] + return schema diff --git a/src/outfitter/dispatch/core/handlers.py b/src/outfitter/dispatch/core/handlers.py index 6c19ac8..0a3be1a 100644 --- a/src/outfitter/dispatch/core/handlers.py +++ b/src/outfitter/dispatch/core/handlers.py @@ -9,6 +9,7 @@ from __future__ import annotations import asyncio +import os from datetime import datetime, time from pathlib import Path from typing import TypedDict, cast @@ -88,6 +89,7 @@ from .sync import scan_codex_jsonl _READ_ONLY = SandboxPolicy(type="readOnly") +_INTRO_TEMPLATE = '[dispatch] From {handle} ({ref}). Use `dispatch send {ref} "..."` to reply.' # Bound attach metadata reads: if the app-server is wedged, fail clearly and never # half-register (the registry write only follows a successful metadata read). @@ -152,6 +154,22 @@ def _managed_identity(lane: Lane) -> _ManagedIdentityPayload: } +async def _apply_send_intro(inp: SendInput, ctx: Ctx) -> str: + if not inp.intro: + return inp.text + + sender_id = inp.caller_thread_id or os.environ.get("CODEX_THREAD_ID") + if not sender_id: + raise ValidationError("--intro requires CODEX_THREAD_ID from the current Codex thread") + + sender = await ctx.registry.find_lane(sender_id) + if sender is None: + raise ValidationError("--intro requires the current Codex thread to be managed by dispatch") + + intro = _INTRO_TEMPLATE.format(handle=sender.handle, ref=sender.ref) + return f"{intro}\n\n{inp.text}" + + def _sync_view(sync: LaneSync | None) -> LaneSyncView: if sync is None: return LaneSyncView() @@ -453,13 +471,14 @@ async def send(inp: LaneTextInput, ctx: Ctx) -> ActionAck: async def send_message(inp: SendInput, ctx: Ctx) -> ActionAck: + text = await _apply_send_intro(inp, ctx) match inp.mode: case "send": - return await send(LaneTextInput(lane=inp.lane, text=inp.text), ctx) + return await send(LaneTextInput(lane=inp.lane, text=text), ctx) case "steer": - return await steer(LaneTextInput(lane=inp.lane, text=inp.text), ctx) + return await steer(LaneTextInput(lane=inp.lane, text=text), ctx) case "context": - return await brief(LaneTextInput(lane=inp.lane, text=inp.text), ctx) + return await brief(LaneTextInput(lane=inp.lane, text=text), ctx) case "interject": lane = await _resolve(ctx, inp.lane) _require_writable(lane) @@ -467,16 +486,16 @@ async def send_message(inp: SendInput, ctx: Ctx) -> ActionAck: await ctx.client.turn_interrupt(lane.id, turn_id) await ctx.registry.log_action("interrupt", lane=lane.id, detail="interject") await ctx.client.turn_start( - lane.id, inp.text, cwd=lane.cwd or ".", sandbox_policy=_READ_ONLY + lane.id, text, cwd=lane.cwd or ".", sandbox_policy=_READ_ONLY ) await ctx.registry.update_lane_status(lane.id, "busy") - await ctx.registry.log_action("send", lane=lane.id, detail=inp.text[:120]) + await ctx.registry.log_action("send", lane=lane.id, detail=text[:120]) return ActionAck(**_managed_identity(lane), op="interject") case "queue": lane = await _resolve(ctx, inp.lane) _require_writable(lane) - message = await ctx.registry.enqueue_message(lane=lane.id, text=inp.text) - await ctx.registry.log_action("queue", lane=lane.id, detail=inp.text[:120]) + message = await ctx.registry.enqueue_message(lane=lane.id, text=text) + await ctx.registry.log_action("queue", lane=lane.id, detail=text[:120]) if lane.status == "idle": await queue.drain_next_queued_message(ctx, lane.id) pending = await ctx.registry.pending_message_count(lane.id) diff --git a/src/outfitter/dispatch/core/models.py b/src/outfitter/dispatch/core/models.py index 21dc5a1..e5aace6 100644 --- a/src/outfitter/dispatch/core/models.py +++ b/src/outfitter/dispatch/core/models.py @@ -92,6 +92,15 @@ class SendInput(LaneTextInput): "model-visible context." ), ) + intro: bool = Field( + default=False, + description="Prepend a dispatch reply hint derived from the current CODEX_THREAD_ID.", + ) + caller_thread_id: str | None = Field( + default=None, + exclude=True, + json_schema_extra={"x-dispatch-internal": True}, + ) class LaneInput(BaseModel): diff --git a/src/outfitter/dispatch/surfaces/mcp.py b/src/outfitter/dispatch/surfaces/mcp.py index 878cf59..21f02e7 100644 --- a/src/outfitter/dispatch/surfaces/mcp.py +++ b/src/outfitter/dispatch/surfaces/mcp.py @@ -10,6 +10,7 @@ import asyncio import contextlib import json +import os from pathlib import Path from mcp.server.lowlevel import Server @@ -126,6 +127,8 @@ def _route_tool_call( return _tool_error(f"unknown dispatch MCP action {tool_name}/{action}", code=-32601) params = dict(arguments) del params["op"] + if route.op.id == "send" and params.get("intro") is True: + params["caller_thread_id"] = os.environ.get("CODEX_THREAD_ID") return route.op.id, params diff --git a/tests/core/test_handlers.py b/tests/core/test_handlers.py index 50a4f4b..bffc2db 100644 --- a/tests/core/test_handlers.py +++ b/tests/core/test_handlers.py @@ -186,6 +186,70 @@ async def test_send_modes_context_and_interject(store: Registry) -> None: assert (await store.get_lane("lane-1")).status == "busy" +async def test_send_intro_prepends_managed_sender_from_codex_thread_id( + store: Registry, monkeypatch: pytest.MonkeyPatch +) -> None: + client = FakeLaneClient() + ctx = make_ctx(store, client) + await handlers.open_lane(OpenInput(name="target"), ctx) + client.next_thread_id = "lane-2" + sender = await handlers.open_lane(OpenInput(name="Dispatch"), ctx) + monkeypatch.setenv("CODEX_THREAD_ID", sender.id) + + ack = await handlers.send_message(SendInput(lane="@target", text="hello", intro=True), ctx) + + assert ack.lane == "lane-1" + sent = next(kw["text"] for name, kw in client.calls if name == "turn_start") + assert sent == ( + f'[dispatch] From @Dispatch ({sender.ref}). Use `dispatch send {sender.ref} "..."` ' + "to reply.\n\nhello" + ) + + +async def test_send_intro_requires_codex_thread_id( + store: Registry, monkeypatch: pytest.MonkeyPatch +) -> None: + ctx = make_ctx(store, FakeLaneClient()) + await handlers.open_lane(OpenInput(name="target"), ctx) + monkeypatch.delenv("CODEX_THREAD_ID", raising=False) + + with pytest.raises(ValidationError, match="CODEX_THREAD_ID"): + await handlers.send_message(SendInput(lane="@target", text="hello", intro=True), ctx) + + +async def test_send_intro_requires_managed_sender( + store: Registry, monkeypatch: pytest.MonkeyPatch +) -> None: + ctx = make_ctx(store, FakeLaneClient()) + await handlers.open_lane(OpenInput(name="target"), ctx) + monkeypatch.setenv("CODEX_THREAD_ID", "unknown-thread") + + with pytest.raises(ValidationError, match="managed by dispatch"): + await handlers.send_message(SendInput(lane="@target", text="hello", intro=True), ctx) + + +async def test_send_intro_applies_to_queued_delivery( + store: Registry, monkeypatch: pytest.MonkeyPatch +) -> None: + client = FakeLaneClient() + ctx = make_ctx(store, client) + await handlers.open_lane(OpenInput(name="target"), ctx) + await store.update_lane_status("lane-1", "busy") + client.next_thread_id = "lane-2" + sender = await handlers.open_lane(OpenInput(name="Dispatch"), ctx) + monkeypatch.setenv("CODEX_THREAD_ID", sender.id) + + ack = await handlers.send_message( + SendInput(lane="@target", text="later", mode="queue", intro=True), ctx + ) + + assert ack.op == "queue" + queued = await store.next_pending_message("lane-1") + assert queued is not None + assert queued.text.startswith(f"[dispatch] From @Dispatch ({sender.ref}).") + assert queued.text.endswith("\n\nlater") + + async def test_send_queue_persists_when_lane_is_busy(store: Registry) -> None: client = FakeLaneClient() ctx = make_ctx(store, client) diff --git a/tests/surfaces/test_derive_cli.py b/tests/surfaces/test_derive_cli.py index c0045de..22e3253 100644 --- a/tests/surfaces/test_derive_cli.py +++ b/tests/surfaces/test_derive_cli.py @@ -4,6 +4,7 @@ import json +import pytest import typer from typer.testing import CliRunner @@ -26,7 +27,35 @@ def invoke(op_id: str, params: dict[str, object]) -> dict[str, object]: assert result.exit_code == 0 assert captured["op"] == "send" - assert captured["params"] == {"lane": "@docs", "text": "hi", "mode": "interject"} + assert captured["params"] == { + "lane": "@docs", + "text": "hi", + "mode": "interject", + "intro": False, + } + + +def test_send_intro_flag_maps_to_send_contract(monkeypatch: pytest.MonkeyPatch) -> None: + captured: dict[str, object] = {} + monkeypatch.setenv("CODEX_THREAD_ID", "sender-thread") + + def invoke(op_id: str, params: dict[str, object]) -> dict[str, object]: + captured["op"] = op_id + captured["params"] = params + return {"lane": "L1", "op": "send", "accepted": True} + + app = derive_cli(REGISTRY, invoke) + result = runner.invoke(app, ["send", "@docs", "hi", "--intro"]) + + assert result.exit_code == 0 + assert captured["op"] == "send" + assert captured["params"] == { + "lane": "@docs", + "text": "hi", + "mode": "send", + "intro": True, + "caller_thread_id": "sender-thread", + } def test_send_rejects_multiple_modes() -> None: @@ -279,6 +308,7 @@ def test_schema_command_prints_derived_schema_without_daemon() -> None: assert result.exit_code == 0 assert '"op": "send"' in result.output assert '"mode"' in result.output + assert "caller_thread_id" not in result.output assert "reserved for durable queued delivery" not in result.output diff --git a/tests/surfaces/test_derive_mcp.py b/tests/surfaces/test_derive_mcp.py index 3f6279c..3f6a50d 100644 --- a/tests/surfaces/test_derive_mcp.py +++ b/tests/surfaces/test_derive_mcp.py @@ -38,7 +38,9 @@ def test_action_schema_and_annotations_from_op() -> None: assert lane_write.annotations.idempotentHint is False one_of = lane_write.inputSchema["oneOf"] new_schema = next(s for s in one_of if s["properties"]["op"]["const"] == "new") + send_schema = next(s for s in one_of if s["properties"]["op"]["const"] == "send") assert set(new_schema["properties"]) >= {"op", "name", "preset", "text", "send"} + assert "caller_thread_id" not in send_schema["properties"] assert {s["properties"]["op"]["const"] for s in one_of} >= { "fork", "goal_set", diff --git a/tests/surfaces/test_mcp_routing.py b/tests/surfaces/test_mcp_routing.py index e1db5b6..b53cb15 100644 --- a/tests/surfaces/test_mcp_routing.py +++ b/tests/surfaces/test_mcp_routing.py @@ -5,12 +5,14 @@ from collections.abc import AsyncIterator from pathlib import Path +import pytest import pytest_asyncio +from outfitter.dispatch.contracts.derive_mcp import derive_mcp_projection from outfitter.dispatch.core.ops import REGISTRY from outfitter.dispatch.daemon.control import ControlServer from outfitter.dispatch.registry.store import Registry -from outfitter.dispatch.surfaces.mcp import handle_tool_call +from outfitter.dispatch.surfaces.mcp import _route_tool_call, handle_tool_call from tests.fakes import make_ctx _IDENTITY_FIELDS = {"lane", "ref", "id", "title", "handle", "managed", "source", "status", "cwd"} @@ -119,3 +121,24 @@ async def test_tool_call_rejects_unknown_grouped_action(socket_path: Path) -> No assert result.isError is True assert result.meta is not None assert result.meta["dispatchCode"] == "mcp_route_error" + + +def test_tool_call_routes_intro_with_caller_thread_id(monkeypatch: pytest.MonkeyPatch) -> None: + projection = derive_mcp_projection(REGISTRY) + monkeypatch.setenv("CODEX_THREAD_ID", "sender-thread") + + route = _route_tool_call( + projection, + "dispatch_thread_write", + {"op": "send", "lane": "@docs", "text": "hi", "intro": True}, + ) + + assert route == ( + "send", + { + "lane": "@docs", + "text": "hi", + "intro": True, + "caller_thread_id": "sender-thread", + }, + ) diff --git a/tests/surfaces/test_parity.py b/tests/surfaces/test_parity.py index 103d661..ebb4b72 100644 --- a/tests/surfaces/test_parity.py +++ b/tests/surfaces/test_parity.py @@ -22,6 +22,7 @@ ValidationError, project_error, ) +from outfitter.dispatch.contracts.schema import is_internal_field from outfitter.dispatch.core.ops import REGISTRY @@ -67,7 +68,9 @@ def test_mcp_model_parity_per_op() -> None: assert set(routes_by_op) == set(REGISTRY.ids()) for op in REGISTRY: - fields = set(op.input.model_fields) + fields = { + name for name, field in op.input.model_fields.items() if not is_internal_field(field) + } route = routes_by_op[op.id] tool = mcp_tools[route.tool_name] variants = tool.inputSchema["oneOf"] @@ -141,7 +144,7 @@ def invoke(op_id: str, params: dict[str, object]) -> dict[str, object]: assert runner.invoke(app, ["goal", "status", "@docs"]).exit_code == 0 assert calls == [ - ("send", {"lane": "@docs", "text": "hi", "mode": "context"}), + ("send", {"lane": "@docs", "text": "hi", "mode": "context", "intro": False}), ("stop", {"lane": "@docs"}), ("discover", {"limit": 50}), ("sync", {"lane": "@docs", "full": False}),