Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .agents/skills/map-efficient/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ section when the workflow below points to it. Under `isolation_active` (Slice 5a
the wave-loop creates per-member worktrees, dispatches actor subagents
**sequentially** (one per turn), verifies via `concurrency_ready`, then accepts
atomically via `merge_wave_worktrees`; concurrent fan-out is Slice 5b
(`dispatch_mode==concurrent`).
(`dispatch_mode==concurrent`). Under `dispatch_mode==concurrent` (opt-in via
`execution.concurrent_dispatch: true`), call `run_concurrent_wave`: dispatch N
actor subagents in **one turn** per sub-batch; on any failure `abort_wave_group`
discards the whole group and reruns from base (bounded by `max_wave_retries`).

## Mutation Boundary Constraints

Expand Down
35 changes: 23 additions & 12 deletions .agents/skills/map-efficient/efficient-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ wave-loop on every run. The wave-loop engages **only when ALL THREE hold**
`mapify init` config always runs the legacy sequential walker. Even when the
wave-loop engages, dispatch remains **sequential** in Slice 5a (`isolation_active=True`,
`dispatch_mode` from `get_wave_step` keyed to `sequential`); concurrent fan-out
is Slice 5b (`dispatch_mode==concurrent`, `concurrency_enabled=True`, not yet shipped).
is Slice 5b (`dispatch_mode==concurrent`, `concurrency_enabled=True`),
**ACTIVE when opted in** via `execution.concurrent_dispatch: true`
(gate: `dispatch_mode == 'concurrent'`).

### Sequential walker

Expand Down Expand Up @@ -158,24 +160,26 @@ to base on any conflict or gate failure (worktrees kept for retry). On a single
subtask's Monitor failure, `discard_subtask_worktree` that subtask and retry it
before calling `merge_wave_worktrees`.

### Concurrent actor dispatch — **Slice 5b only** (`dispatch_mode == 'concurrent'`) — GATED EXAMPLE
### Concurrent actor dispatch — **Slice 5b** (`dispatch_mode == 'concurrent'`) — **ACTIVE when opted in**

> **IMPORTANT — read before using this example.**
> Concurrent fan-out (dispatching multiple actor subagents in a single turn) is
> **Slice 5b** and is enabled **only when `concurrency_enabled: true` /
> `parallel_ready` flag set / `dispatch_mode == 'concurrent'`**. In the **current
> framework** (`concurrency_enabled=False`, Slice 5a), dispatch stays **SEQUENTIAL
> **IMPORTANT — read before using this section.**
> Concurrent fan-out (dispatching N actor subagents in a single turn) is
> **ACTIVE when opted in** via `execution.concurrent_dispatch: true`
> (gate: `dispatch_mode == 'concurrent'`). With the **default config**
> (`concurrent_dispatch=false`, Slice 5a), dispatch stays **SEQUENTIAL
> even when a wave has `mode=="parallel"`** — one actor subagent per turn, each
> pinned to its own worktree. The example below is reference material for when
> Slice 5b ships; do NOT treat it as an active instruction now. Use your runtime's
> pinned to its own worktree. Act on the instructions below **only** when
> `get_wave_step` returns `dispatch_mode == 'concurrent'`. Use your runtime's
> own parallel actor-subagent dispatch mechanism — this is the provider-neutral
> shape, not a literal API call.

When Slice 5b concurrency is enabled, a parallel wave with N subtasks dispatches
all N actor subagents in **one turn**:
**Runtime wiring:** when `get_wave_step` returns `dispatch_mode == 'concurrent'`,
call `run_concurrent_wave` (runner), which batch-splits the wave by `max_actors`
and merges each sub-batch atomically via `merge_wave_worktrees`. For each sub-batch,
dispatch all N actor subagents **in one turn** — not one per turn:

```text
# CORRECT (Slice 5b / concurrency_enabled=True / dispatch_mode=='concurrent' only) — one turn, N actor subagents:
# CORRECT (dispatch_mode=='concurrent' only) — N actor subagents in one turn:
dispatch actor subagent -> ST-003 (pinned to its own worktree)
dispatch actor subagent -> ST-004 (pinned to its own worktree)

Expand All @@ -189,6 +193,13 @@ dispatch actor subagent -> ST-004 (pinned to its own worktree)
**`max_actors` cap:** Default 4–8 per wave. Groups larger than `max_actors` are
pre-split into sequential batches before dispatch.

**Retry-discard on failure:** on any actor failure, timeout, or Monitor-reject
within a concurrent group, the runner calls `abort_wave_group`, which discards
the **entire group** (cancels siblings, resets all worktrees to base SHA, removes
group branches) and reruns from base. Retries are bounded by `max_wave_retries`
(default 3); on exhaustion the runner escalates to a human and does **not**
auto-restart. Never merges a successful subset — discard-all-or-merge-all (HC-4).

### Anti-patterns — Slice 5b concurrent dispatch only

> These apply **only** under Slice 5b concurrent dispatch (`dispatch_mode == 'concurrent'`). In Slice 5a and the default sequential walker, one actor dispatch per turn **is** the correct behavior.
Expand Down
2 changes: 1 addition & 1 deletion .claude/skills/map-efficient/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ else
fi
```

**Execution strategy:** `select_execution_strategy` chooses between the legacy sequential walker and the wave-loop. The wave-loop (`get_wave_step` / `validate_wave_step` / `advance_wave`) engages only when `execution.wave_mode ∈ {on, auto}` AND a color group has ≥2 members; otherwise `get_next_step` (sequential walker) runs. Under `isolation_active` (Slice 5a), the wave-loop creates per-member worktrees, dispatches Actors **sequentially** (one per turn, `HC-3`), verifies via `concurrency_ready`, then accepts atomically via `merge_wave_worktrees`; concurrent fan-out is Slice 5b (`dispatch_mode==concurrent`). See [efficient-reference.md](efficient-reference.md#wave-execution) for the decision table and full wave loop.
**Execution strategy:** `select_execution_strategy` chooses between the legacy sequential walker and the wave-loop. The wave-loop (`get_wave_step` / `validate_wave_step` / `advance_wave`) engages only when `execution.wave_mode ∈ {on, auto}` AND a color group has ≥2 members; otherwise `get_next_step` (sequential walker) runs. Under `isolation_active` (Slice 5a), the wave-loop creates per-member worktrees, dispatches Actors **sequentially** (one per turn, `HC-3`), verifies via `concurrency_ready`, then accepts atomically via `merge_wave_worktrees`; concurrent fan-out is Slice 5b (`dispatch_mode==concurrent`). Under `dispatch_mode==concurrent` (opt-in via `execution.concurrent_dispatch: true`), call `run_concurrent_wave`: emit N `Task(actor)` blocks in **one message** per sub-batch; on any failure `abort_wave_group` discards the whole group and reruns from base (bounded by `max_wave_retries`). See [efficient-reference.md](efficient-reference.md#wave-execution) for the decision table and full wave loop.

**Note on resume:** `resume_from_plan` (Step 0) now auto-invokes `set_waves`
when `blueprint.json` is present, so resumed workflows do not need a manual
Expand Down
25 changes: 15 additions & 10 deletions .claude/skills/map-efficient/efficient-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ including clean passes — must carry concrete evidence references.
| `auto` / `on` | `auto` / `required` | no (all groups size 1) | Legacy sequential walker (`get_next_step`) |
| `auto` / `on` | `auto` / `required` | yes | Wave-loop (`get_wave_step` / `validate_wave_step` / `advance_wave`) |

**Defaults (canonical MapConfig):** `execution.wave_mode=auto`, `worktree.isolation=off`. Because the isolation gate (#2) fails by default, a stock `mapify init` config always runs the legacy sequential walker — byte-identical to pre-Slice-3. Even when the wave-loop does engage, dispatch remains **sequential** in Slice 5a (`isolation_active=True`, `dispatch_mode` from `get_wave_step` keyed to `sequential`); concurrent fan-out is Slice 5b (`dispatch_mode==concurrent`, `concurrency_enabled=True`, not yet shipped).
**Defaults (canonical MapConfig):** `execution.wave_mode=auto`, `worktree.isolation=off`. Because the isolation gate (#2) fails by default, a stock `mapify init` config always runs the legacy sequential walker — byte-identical to pre-Slice-3. Even when the wave-loop does engage, dispatch remains **sequential** in Slice 5a (`isolation_active=True`, `dispatch_mode` from `get_wave_step` keyed to `sequential`); concurrent fan-out is Slice 5b (`dispatch_mode==concurrent`, `concurrency_enabled=True`), **ACTIVE when opted in** via `execution.concurrent_dispatch: true` (gate: `dispatch_mode == 'concurrent'`).

### Sequential walker

Expand All @@ -148,21 +148,24 @@ Use `get_wave_step`, `validate_wave_step`, and `advance_wave` when the wave-loop

When the wave-loop engages AND `isolation_active` is true (`worktree.isolation` ∈ {`auto`, `required`}), the Slice 5a flow applies: (a) create a worktree per wave member via `create_subtask_worktree`; (b) dispatch the member Actors **sequentially** — one per turn, each pinned to its own worktree path (`HC-3`); (c) call `concurrency_ready` (ST-003) to verify all member worktrees before merge; (d) accept the whole wave atomically via `merge_wave_worktrees` — never one-at-a-time, with whole-wave rollback on any failure. See [Parallel waves](#worktree-isolation) under Worktree isolation for the full protocol. Concurrent fan-out (dispatching all Actors in one message) is Slice 5b (`dispatch_mode==concurrent`) and is not yet active.

### Concurrent Actor dispatch — **Slice 5b only** (`dispatch_mode == 'concurrent'`) — GATED EXAMPLE
### Concurrent Actor dispatch — **Slice 5b** (`dispatch_mode == 'concurrent'`) — **ACTIVE when opted in**

> **IMPORTANT — read before using this example.**
> **IMPORTANT — read before using this section.**
> Concurrent fan-out (emitting multiple `Task(actor)` calls in a single message) is
> **Slice 5b** and is enabled **only when `concurrency_enabled: true` /
> `parallel_ready` flag set / `dispatch_mode == 'concurrent'`**. In the **current
> framework** (`concurrency_enabled=False`, Slice 5a), dispatch stays **SEQUENTIAL
> **ACTIVE when opted in** via `execution.concurrent_dispatch: true`
> (gate: `dispatch_mode == 'concurrent'`). With the **default config**
> (`concurrent_dispatch=false`, Slice 5a), dispatch stays **SEQUENTIAL
> even when a wave has `mode=="parallel"`** — one Actor per turn, each pinned to
> its own worktree. The example below is reference material for when Slice 5b ships;
> do NOT treat it as an active instruction now.
> its own worktree. Act on the instructions below **only** when `get_wave_step`
> returns `dispatch_mode == 'concurrent'`.

When Slice 5b concurrency is enabled, a parallel wave with N subtasks dispatches all N Actors in **one message** with N `Task` calls — not one per turn:
**Runtime wiring:** when `get_wave_step` returns `dispatch_mode == 'concurrent'`,
call `run_concurrent_wave` (runner), which batch-splits the wave by `max_actors`
and merges each sub-batch atomically via `merge_wave_worktrees`. For each sub-batch,
emit all N `Task(actor)` calls in **one assistant message** — not one per turn:

```text
# CORRECT (Slice 5b / concurrency_enabled=True / dispatch_mode=='concurrent' only) — N Task calls in one message:
# CORRECT (dispatch_mode=='concurrent' only) — N Task calls in one message:
Task(
subagent_type="actor",
description="Implement ST-003",
Expand All @@ -185,6 +188,8 @@ Task(

**`max_actors` cap:** Default 4–8 concurrent actors per wave. Groups larger than `max_actors` are pre-split into sequential batches of `max_actors` before dispatch; do not emit more than `max_actors` Task calls in a single message.

**Retry-discard on failure:** on any actor failure, timeout, or Monitor-reject within a concurrent group, the runner calls `abort_wave_group`, which discards the **entire group** (cancels siblings, resets all worktrees to base SHA, removes group branches) and reruns from base. Retries are bounded by `max_wave_retries` (default 3); on exhaustion the runner escalates to a human and does **not** auto-restart. Never merges a successful subset — discard-all-or-merge-all (HC-4).

### Anti-patterns — Slice 5b concurrent dispatch only

> These apply **only** under Slice 5b concurrent dispatch (`dispatch_mode == 'concurrent'`). In Slice 5a and the default sequential walker, one Task per turn **is** the correct behavior — the first three below are NOT anti-patterns there.
Expand Down
146 changes: 136 additions & 10 deletions .map/scripts/map_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,19 @@ def _extract_subtask_ids_from_plan_artifacts(
WAVE_REASON_NO_WAVES = "no_waves"
WAVE_REASON_WAVE_COMPLETE = "wave_complete"
WAVE_REASON_DISPATCH_SEQUENTIAL = "dispatch_sequential_5a"
# Stable reason codes for compute_dispatch_gate (ST-001, Slice 5b).
WAVE_REASON_CONCURRENT_GATED = "concurrent_gated"
WAVE_REASON_GATE_NOT_PARALLELIZABLE = "gate_not_parallelizable"
# Current wave is width-1 even though a later wave is parallel; dispatch sequentially
# for this wave — not an error, just the natural plan structure.
WAVE_REASON_CURRENT_WAVE_SEQUENTIAL = "current_wave_sequential"


class DispatchGateError(RuntimeError):
"""Raised when concurrent_dispatch=true but a required prerequisite is missing.

HC-3: never silent-degrade — callers must handle explicitly.
"""


def _read_map_config_scalars(project_dir: Path) -> dict[str, str]:
Expand Down Expand Up @@ -2303,8 +2316,16 @@ def get_wave_step(branch: str) -> dict:
state_file = Path(f".map/{branch}/step_state.json")
state = StepState.load(state_file)

# Compute structured dispatch signal fields (ST-002).
dispatch_mode = "concurrent" if WAVE_CONCURRENCY_ENABLED else "sequential"
# Compute structured dispatch signal via config-driven gate (ST-001, Slice 5b).
# compute_dispatch_gate short-circuits to sequential on the first line when
# concurrent_dispatch=false (default), touching no new code/probe/import (HC-1).
gate = compute_dispatch_gate(branch, Path("."))
dispatch_mode = gate["dispatch_mode"]
dispatch_reason = gate["reason"]
# concurrency_enabled alias: True iff dispatch_mode resolved to "concurrent".
# WAVE_CONCURRENCY_ENABLED is kept as a dormant unused const (backward compat).
concurrency_enabled = dispatch_mode == "concurrent"

try:
from map_step_runner import ( # pyright: ignore[reportMissingImports]
_worktree_isolation_mode,
Expand All @@ -2319,8 +2340,8 @@ def get_wave_step(branch: str) -> dict:
"wave_index": 0,
"subtasks": [],
"is_complete": True,
"concurrency_enabled": dispatch_mode == "concurrent",
"dispatch_mode": dispatch_mode,
"concurrency_enabled": concurrency_enabled,
"dispatch_mode": "sequential",
"isolation_active": isolation_active,
"reason": WAVE_REASON_NO_WAVES,
"message": "No execution waves configured. Use sequential mode.",
Expand All @@ -2332,8 +2353,8 @@ def get_wave_step(branch: str) -> dict:
"wave_index": state.current_wave_index,
"subtasks": [],
"is_complete": True,
"concurrency_enabled": dispatch_mode == "concurrent",
"dispatch_mode": dispatch_mode,
"concurrency_enabled": concurrency_enabled,
"dispatch_mode": "sequential",
"isolation_active": isolation_active,
"reason": WAVE_REASON_WAVE_COMPLETE,
}
Expand Down Expand Up @@ -2372,12 +2393,10 @@ def get_wave_step(branch: str) -> dict:
"wave_total": len(state.execution_waves),
"subtasks": subtask_infos,
"is_complete": False,
# concurrency_enabled=False: even when mode=="parallel" (width>=2 wave),
# dispatch is strictly sequential this slice. Slice 5 flips WAVE_CONCURRENCY_ENABLED.
"concurrency_enabled": dispatch_mode == "concurrent",
"concurrency_enabled": concurrency_enabled,
"dispatch_mode": dispatch_mode,
"isolation_active": isolation_active,
"reason": WAVE_REASON_DISPATCH_SEQUENTIAL,
"reason": dispatch_reason,
}


Expand Down Expand Up @@ -2546,6 +2565,113 @@ def select_execution_strategy(
}


def compute_dispatch_gate(
branch: str, project_dir: Optional[Path] = None
) -> dict:
"""Compute the dispatch mode for the current wave, fail-closed on config contradiction.

Gate logic (evaluated in order):

1. If concurrent_dispatch is False (default): return sequential immediately.
FIRST executable line — no probe, no select_execution_strategy call, no import
of any concurrency primitive (HC-1 byte-identity).

2. If concurrent_dispatch is True AND worktree.isolation == 'off':
raise DispatchGateError — config contradiction, HC-3 never silent-degrade.

3. If concurrent_dispatch is True AND isolation != 'off' AND NOT concurrency_allowed:
return sequential with WAVE_REASON_GATE_NOT_PARALLELIZABLE (not an error —
the plan has no parallelizable groups).

4. If concurrent_dispatch is True AND isolation != 'off' AND concurrency_allowed
AND the CURRENT wave (execution_waves[current_wave_index]) has width < 2:
return sequential with WAVE_REASON_CURRENT_WAVE_SEQUENTIAL (not an error —
the current wave is width-1 even though a later wave is parallel).

5. If concurrent_dispatch is True AND isolation != 'off' AND concurrency_allowed
AND the CURRENT wave has width >= 2:
return concurrent with WAVE_REASON_CONCURRENT_GATED.

Args:
branch: Git branch name (sanitized).
project_dir: Project root containing .map/config.yaml.
Defaults to Path('.').

Returns:
{"dispatch_mode": "sequential" | "concurrent", "reason": <stable code>}

Raises:
DispatchGateError: When concurrent_dispatch=true but worktree.isolation='off'
(HC-3: config contradiction must never be silently degraded).
"""
if project_dir is None:
project_dir = Path(".")

# Step 1: short-circuit on flag=false — first gate check after parameter
# normalization. No concurrency probe, no select_execution_strategy call, no
# _worktree_isolation_mode/concurrency_ready call, no dispatcher import runs on
# this path (HC-1 byte-identity). The project_dir None-guard above is a safe
# default-arg normalization, not a concurrency primitive.
try:
from map_step_runner import ( # pyright: ignore[reportMissingImports]
_concurrent_dispatch_enabled,
)
flag_on = _concurrent_dispatch_enabled(project_dir)
except ImportError:
flag_on = False

if not flag_on:
return {
"dispatch_mode": "sequential",
"reason": WAVE_REASON_DISPATCH_SEQUENTIAL,
}

# Step 2: flag is on — check isolation config.
try:
from map_step_runner import ( # pyright: ignore[reportMissingImports]
_worktree_isolation_mode as _wt_iso,
)
isolation = _wt_iso(project_dir)
except ImportError:
isolation = "off"

if isolation == "off":
raise DispatchGateError(
"concurrent_dispatch=true requires worktree.isolation != 'off', "
f"but worktree.isolation is 'off' in {project_dir}. "
"Set worktree.isolation to 'auto' or 'required' to enable concurrent dispatch."
)

# Step 3: check whether the plan is actually parallelizable (any wave has width>=2).
strategy_result = select_execution_strategy(branch, project_dir)
concurrency_allowed = strategy_result.get("concurrency_allowed", False)

if not concurrency_allowed:
return {
"dispatch_mode": "sequential",
"reason": WAVE_REASON_GATE_NOT_PARALLELIZABLE,
}

# Step 4: plan has at least one parallel wave, but gate on the ACTIVE wave.
# select_execution_strategy checks any wave (has_parallel_groups), not the
# current wave index. A width-1 current wave must dispatch sequentially even
# if a later wave is parallel — dispatch_mode is per-wave, not per-plan.
state_file = Path(f".map/{branch}/step_state.json")
state = StepState.load(state_file)
waves = state.execution_waves
idx = state.current_wave_index
if idx >= len(waves) or len(waves[idx]) < 2:
return {
"dispatch_mode": "sequential",
"reason": WAVE_REASON_CURRENT_WAVE_SEQUENTIAL,
}

return {
"dispatch_mode": "concurrent",
"reason": WAVE_REASON_CONCURRENT_GATED,
}


def _write_feedback_file(
branch: str, filename: str, header: str, feedback: str
) -> Optional[str]:
Expand Down
Loading
Loading