s01 → ... → s10 → s11 → s12 → s13 → s14 → ... → s19
"大目标拆成小任务, 排好序, 持久化" — 文件持久化的任务图, 多 agent 协作的基础。
Harness 层: 任务 — 持久化的目标, 可恢复的进度。
Agent 接到一个项目:搭数据库、写 API、加测试。它用 s05 的 TodoWrite 列了一张清单,然后开始干活——先写 API,写到一半发现没数据库表,回头改;加测试时发现 API 接口签名又改了...
盖房子不能先盖屋顶再打地基。任务之间有先后——这种"谁先谁后"的关系,有个名字叫有向无环图(DAG)。
s05 的 TodoWrite 是一个列表。没有依赖关系、没有持久化、对话结束列表就没了。你需要的是任务系统:每个任务是一个 JSON 文件,任务之间有 blockedBy 依赖,跨会话持久化在磁盘上。
s11 的循环、prompt 组装全部保留。唯一的变动:新增 4 个任务工具 + .tasks/ 目录持久化 + blockedBy 依赖检查。任务系统与错误恢复是独立层——CC 源码中 utils/tasks.ts 只管 CRUD,query.ts 的 with_retry/RecoveryState 管错误恢复,互不耦合。
TodoWrite vs Task System:
| TodoWrite (s05) | Task System (s12) | |
|---|---|---|
| 存储 | 内存列表 | .tasks/ JSON 文件 |
| 依赖 | 无 | blockedBy 有向无环图 |
| 持久性 | 对话结束即丢 | 跨会话 |
| 多 Agent | 无 | owner 字段 |
| 状态 | checked / unchecked | pending → in_progress → completed |
每个任务是一个 JSON 文件,存于 .tasks/ 目录:
@dataclass
class Task:
id: str
subject: str
description: str
status: str # pending | in_progress | completed
owner: str | None # Agent 名(多 Agent 场景)
blockedBy: list[str] # 依赖的任务 ID 列表ID 用 timestamp + random hex 生成——简单但够用。CC 用顺序 ID + highwatermark 文件防止 ID 重用,是更严谨的设计。
def create_task(subject: str, description: str = "",
blockedBy: list[str] | None = None) -> Task:
task = Task(
id=f"task_{int(time.time())}_{random_hex(4)}",
subject=subject, description=description,
status="pending", owner=None,
blockedBy=blockedBy or [],
)
save_task(task)
return task创建时自动 save_task 到 .tasks/{id}.json。blockedBy 声明依赖——"写 API"的 blockedBy 是 ["task_schema"]。
一个任务只能在它的 blockedBy 全部 completed 之后才能开始:
def can_start(task_id: str) -> bool:
task = load_task(task_id)
for dep_id in task.blockedBy:
dep = load_task(dep_id)
if dep.status != "completed":
return False
return Truecan_start 是 claim_task 的前置检查——blockedBy 里有任何一个不是 completed,就不能认领。
Agent 开始做一个任务时,调用 claim_task:设置 owner,状态从 pending → in_progress。owner 字段记录谁在做这个任务——多 Agent 场景下防止重复认领:
def claim_task(task_id: str, owner: str = "agent") -> str:
task = load_task(task_id)
if task.status != "pending":
return f"Task {task_id} is {task.status}, cannot claim"
if not can_start(task_id):
deps = [d for d in task.blockedBy
if load_task(d).status != "completed"]
return f"Blocked by: {deps}"
task.owner = owner
task.status = "in_progress"
save_task(task)
return f"Claimed {task_id} ({task.subject})"如果任务已被别人认领(status != "pending"),或者依赖没完成(can_start 返回 False),拒绝认领。
任务做完后,设为 completed。同时扫描所有其他任务,找出刚刚被解锁的下游任务:
def complete_task(task_id: str) -> str:
task = load_task(task_id)
task.status = "completed"
save_task(task)
# 找出被解锁的下游任务
unblocked = [t.subject for t in list_tasks()
if t.status == "pending" and t.blockedBy
and can_start(t.id)]
msg = f"Completed {task_id} ({task.subject})"
if unblocked:
msg += f"\nUnblocked: {', '.join(unblocked)}"
return msg完成 "schema" 后,"endpoints" 和 "docs" 的 can_start 返回 True,它们可以开始。
pending ──claim──→ in_progress ──complete──→ completed
- claim:
pending→in_progress。设置 owner,开始工作。 - complete:
in_progress→completed。解锁下游。
CC 没有 in_progress → pending 的 release 路径。如果 Agent 崩溃或放弃,CC 用 unassignTeammateTasks() 清除 owner,但 status 保持在 in_progress——任务不会回退到 pending。教学版遵循同样的设计。
# 创建有依赖的任务
schema = create_task("setup database schema")
endpoints = create_task("create API endpoints", blockedBy=[schema.id])
tests = create_task("write tests", blockedBy=[endpoints.id])
docs = create_task("write docs", blockedBy=[schema.id])
# Agent 认领第一个可做的任务
claim_task(schema.id) # ✓ Claimed (无依赖)
complete_task(schema.id) # ✓ Completed → 解锁 endpoints, docs
claim_task(endpoints.id) # ✓ Claimed (schema 已完成)
complete_task(endpoints.id) # ✓ Completed → 解锁 tests
claim_task(docs.id) # ✓ Claimed (schema 已完成)
complete_task(docs.id) # ✓ Completed
claim_task(tests.id) # ✓ Claimed (endpoints 已完成)
complete_task(tests.id) # ✓ Completed每个 create_task 写一个 JSON 文件,每个 claim_task / complete_task 更新文件。跨会话时,.tasks/ 目录还在,Agent 读文件就能恢复进度。
| 组件 | 之前 (s11) | 之后 (s12) |
|---|---|---|
| 任务管理 | 无 | Task dataclass + 4 个工具 |
| 新类型 | — | Task(id, subject, description, status, owner, blockedBy) |
| 存储 | 无持久化 | .tasks/{id}.json 跨会话 |
| 依赖 | 无 | blockedBy 图 + can_start 检查 |
| 工具 | bash, read_file, write_file (3) | + create_task, list_tasks, claim_task, complete_task (7) |
| 生命周期 | — | pending → in_progress → completed(无 release 回退) |
cd learn-claude-code
python s12_task_system/code.py试试这些 prompt:
Create tasks: setup database schema, create API endpoints (depends on schema), write tests (depends on endpoints), write docs (depends on schema)List all tasks and their statusesClaim the first unblocked task and complete itList tasks again — which ones are now unblocked?
观察重点:.tasks/ 目录下是否生成了 JSON 文件?完成任务后,被阻塞的任务是否解锁?
任务图有了。但有些任务要跑很久——比如跑全量测试、部署到服务器。Agent 不能干等着——它在调 LLM,时间就是钱。
s13 Background Tasks → 慢操作放后台。Agent 继续思考,后台跑完了通知它。
深入 CC 源码
以下基于 CC 源码
utils/tasks.ts(862 行)、TaskCreateTool.ts、TaskUpdateTool.ts(406 行)、useTaskListWatcher.ts(222 行)的完整分析。
教学版只讲了 id、subject、status、owner、blockedBy。CC 实际有 9 个字段(utils/tasks.ts:75-91):
| 字段 | 类型 | 用途 |
|---|---|---|
id |
string | 递增整数 ID |
subject |
string | 简短标题 |
description |
string | 自由格式描述 |
activeForm |
string? | 进行时态,in_progress 时在 spinner 显示 |
owner |
string? | 分配的 agent ID |
status |
pending/in_progress/completed | 生命周期 |
blocks |
string[] | 此任务阻塞的任务 ID(下游) |
blockedBy |
string[] | 阻塞此任务的任务 ID(上游) |
metadata |
Record? | 任意扩展键值对 |
存储位置:~/.claude/tasks/{taskListId}/{id}.json。每个任务一个文件。
CC 中 Task System 和 TodoWrite 同时存在,通过 isTodoV2Enabled() 切换(utils/tasks.ts:133)——非交互式会话(SDK)默认用 Task,交互式用 TodoWrite。Task 有 TodoWrite 没有的:文件锁并发保护、依赖强制执行、ownership、fs.watch 响应式监听、生命周期 hooks。
claimTask()(utils/tasks.ts:541-612)用双重锁防竞争:
任务文件锁:proper-lockfile 锁住 {taskId}.json(最多重试 30 次,指数退避 5-100ms)。锁内:
- 重新读取任务(防 TOCTOU)
- 检查已被他人认领 →
already_claimed - 检查已完成 →
already_resolved - 检查上游未完成 →
blocked - 设置 owner
列表级锁(agent busy 检查时):.lock 文件,原子性扫描所有任务并检查该 agent 是否已有其他 open task。
.highwatermark 文件记录曾分配过的最高任务 ID。即使任务被删除,ID 也不会被重用。
CC 的任务系统有四个工具(不是教学版的一个通用 Task 工具):TaskCreate、TaskGet、TaskUpdate、TaskList。全部设置 isConcurrencySafe: true 和 shouldDefer: true(工具 schema 不在初始 prompt 中,需 ToolSearch 后才可见)。