-
Notifications
You must be signed in to change notification settings - Fork 9.6k
Expand file tree
/
Copy pathcode.py
More file actions
308 lines (254 loc) · 14.9 KB
/
code.py
File metadata and controls
308 lines (254 loc) · 14.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
#!/usr/bin/env python3
"""
s08_context_compact.py - Context Compact
Four-layer compaction pipeline inserted before LLM calls:
L1: snip_compact — trim middle messages when count > 50
L2: micro_compact — replace old tool_results with placeholders
L3: tool_result_budget — persist large results to disk
L4: compact_history — LLM full summary (1 API call)
Emergency: reactive_compact — when API still returns prompt_too_long
┌─────────────────────────────────────────────────────────────┐
│ messages[] │
│ ↓ │
│ L1 snip ─→ L2 micro ─→ L3 budget ─→ [token > threshold?] │
│ ├─ No → LLM │
│ └─ Yes → L4 summary │
│ ↓ │
│ LLM call │
│ [prompt_too_long?] │
│ └─ Yes → reactive │
└─────────────────────────────────────────────────────────────┘
Core principle: cheap first, expensive last.
Builds on s07 (skill loading). Usage:
python s08_context_compact/code.py
Needs: pip install anthropic python-dotenv + ANTHROPIC_API_KEY in .env
"""
import os, subprocess, json, time
from pathlib import Path
try:
import readline
readline.parse_and_bind('set bind-tty-special-chars off')
except ImportError:
pass
from anthropic import Anthropic
from dotenv import load_dotenv
load_dotenv(override=True)
if os.getenv("ANTHROPIC_BASE_URL"): os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
WORKDIR = Path.cwd()
SKILLS_DIR = WORKDIR / "skills"
TRANSCRIPT_DIR = WORKDIR / ".transcripts"
TOOL_RESULTS_DIR = WORKDIR / ".task_outputs" / "tool-results"
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
SYSTEM = f"You are a coding agent at {WORKDIR}. Keep working step by step, and use compact if the conversation gets too long."
# ═══════════════════════════════════════════════════════════
# FROM s02-s07 (unchanged): Basic Tools
# ═══════════════════════════════════════════════════════════
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR): raise ValueError(f"Path escapes workspace: {p}")
return path
def run_bash(cmd: str) -> str:
try:
r = subprocess.run(cmd, shell=True, cwd=WORKDIR, capture_output=True, text=True, timeout=120)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired: return "Error: Timeout (120s)"
def run_read(path: str, limit: int | None = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines): lines = lines[:limit] + [f"... ({len(lines) - limit} more lines)"]
return "\n".join(lines)
except Exception as e: return f"Error: {e}"
def run_write(path: str, content: str) -> str:
try:
file_path = safe_path(path); file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content); return f"Wrote {len(content)} bytes to {path}"
except Exception as e: return f"Error: {e}"
def extract_text(content) -> str:
if not isinstance(content, list): return str(content)
return "\n".join(getattr(b, "text", "") for b in content if getattr(b, "type", None) == "text")
def spawn_subagent(task: str) -> str:
sub_tools = [{"name": "bash", "description": "Run a shell command.", "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}}]
sub_handlers = {"bash": run_bash, "read_file": run_read}
messages = [{"role": "user", "content": task}]
while True:
response = client.messages.create(model=MODEL, system=SYSTEM, messages=messages, tools=sub_tools, max_tokens=8000)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use": break
results = []
for block in response.content:
if block.type == "tool_use":
h = sub_handlers.get(block.name)
output = h(**block.input) if h else f"Unknown: {block.name}"
results.append({"type": "tool_result", "tool_use_id": block.id, "content": output})
messages.append({"role": "user", "content": results})
return extract_text(messages[-1]["content"])
def list_skills() -> str:
if not SKILLS_DIR.exists(): return "(no skills)"
skills = []
for d in sorted(SKILLS_DIR.iterdir()):
if d.is_dir() and (d / "SKILL.md").exists():
skills.append(f"- **{d.name}**: {(d/'SKILL.md').read_text().split(chr(10))[0].lstrip('#').strip()}")
return "\n".join(skills) if skills else "(no skills)"
def load_skill(name: str) -> str:
m = SKILLS_DIR / name / "SKILL.md"
return m.read_text() if m.exists() else f"Skill not found: {name}"
# ═══════════════════════════════════════════════════════════
# NEW in s08: Four-Layer Compaction Pipeline
# ═══════════════════════════════════════════════════════════
CONTEXT_LIMIT = 50000
KEEP_RECENT = 3
PERSIST_THRESHOLD = 30000
def estimate_size(msgs): return len(str(msgs))
# L1: snipCompact — trim middle messages
def snip_compact(messages, max_messages=50):
if len(messages) <= max_messages: return messages
keep_head, keep_tail = 3, max_messages - 3
snipped = len(messages) - keep_head - keep_tail
return messages[:keep_head] + [{"role": "user", "content": f"[snipped {snipped} messages]"}] + messages[-keep_tail:]
# L2: microCompact — old result placeholders
def collect_tool_results(messages):
blocks = []
for mi, msg in enumerate(messages):
if msg.get("role") != "user" or not isinstance(msg.get("content"), list): continue
for bi, block in enumerate(msg["content"]):
if isinstance(block, dict) and block.get("type") == "tool_result":
blocks.append((mi, bi, block))
return blocks
def micro_compact(messages):
tool_results = collect_tool_results(messages)
if len(tool_results) <= KEEP_RECENT: return messages
for _, _, block in tool_results[:-KEEP_RECENT]:
if len(block.get("content", "")) > 120:
block["content"] = "[Earlier tool result compacted. Re-run if needed.]"
return messages
# L3: toolResultBudget — persist large results to disk
def persist_large_output(tool_use_id, output):
if len(output) <= PERSIST_THRESHOLD: return output
TOOL_RESULTS_DIR.mkdir(parents=True, exist_ok=True)
path = TOOL_RESULTS_DIR / f"{tool_use_id}.txt"
if not path.exists(): path.write_text(output)
return f"<persisted-output>\nFull output: {path}\nPreview:\n{output[:2000]}\n</persisted-output>"
def tool_result_budget(messages, max_bytes=200_000):
last = messages[-1] if messages else None
if not last or last.get("role") != "user" or not isinstance(last.get("content"), list): return messages
blocks = [(i, b) for i, b in enumerate(last["content"]) if isinstance(b, dict) and b.get("type") == "tool_result"]
total = sum(len(str(b.get("content", ""))) for _, b in blocks)
if total <= max_bytes: return messages
ranked = sorted(blocks, key=lambda p: len(str(p[1].get("content", ""))), reverse=True)
for _, block in ranked:
if total <= max_bytes: break
content = str(block.get("content", ""))
if len(content) <= PERSIST_THRESHOLD: continue
tid = block.get("tool_use_id", "unknown")
block["content"] = persist_large_output(tid, content)
total = sum(len(str(b.get("content", ""))) for _, b in blocks)
return messages
# L4: autoCompact — LLM full summary
def write_transcript(messages):
TRANSCRIPT_DIR.mkdir(parents=True, exist_ok=True)
path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
with path.open("w") as f:
for msg in messages: f.write(json.dumps(msg, default=str) + "\n")
return path
def summarize_history(messages):
conversation = json.dumps(messages, default=str)[:80000]
prompt = ("Summarize this coding-agent conversation so work can continue.\n"
"Preserve: 1. current goal, 2. key findings/decisions, 3. files read/changed, "
"4. remaining work, 5. user constraints.\nBe compact but concrete.\n\n" + conversation)
response = client.messages.create(model=MODEL, messages=[{"role": "user", "content": prompt}], max_tokens=2000)
return response.content[0].text.strip()
def compact_history(messages):
transcript_path = write_transcript(messages)
print(f"[transcript saved: {transcript_path}]")
summary = summarize_history(messages)
return [{"role": "user", "content": f"[Compacted]\n\n{summary}"}]
# Emergency: reactiveCompact — on API error
def reactive_compact(messages):
transcript = write_transcript(messages)
summary = summarize_history(messages)
return [{"role": "user", "content": f"[Reactive compact]\n\n{summary}"}, *messages[-5:]]
# ═══════════════════════════════════════════════════════════
# FROM s07 (unchanged): Tool Definitions
# ═══════════════════════════════════════════════════════════
TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to a file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "task", "description": "Launch a subagent.",
"input_schema": {"type": "object", "properties": {"description": {"type": "string"}}, "required": ["description"]}},
{"name": "list_skills", "description": "List available skills.", "input_schema": {"type": "object", "properties": {}}},
{"name": "load_skill", "description": "Load skill by name.",
"input_schema": {"type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"]}},
# s08 change: new compact tool
{"name": "compact", "description": "Summarize earlier conversation to free context space.",
"input_schema": {"type": "object", "properties": {"focus": {"type": "string"}}}},
]
TOOL_HANDLERS = {"bash": run_bash, "read_file": run_read, "write_file": run_write, "task": spawn_subagent,
"list_skills": list_skills, "load_skill": load_skill, "compact": lambda **kw: "Compacting..."}
# FROM s04 (unchanged): Hooks
HOOKS = {"PreToolUse": []}
def trigger_hooks(event, *args):
for cb in HOOKS[event]:
r = cb(*args)
if r is not None: return r
return None
DENY_LIST = ["rm -rf /", "sudo", "shutdown"]
def permission_hook(block):
if block.name == "bash":
for p in DENY_LIST:
if p in block.input.get("command", ""): return "Permission denied"
return None
HOOKS["PreToolUse"].append(permission_hook)
# ═══════════════════════════════════════════════════════════
# agent_loop — s08 core: run compaction pipeline before LLM
# ═══════════════════════════════════════════════════════════
def agent_loop(messages: list):
while True:
# s08 change: three preprocessors (0 API calls, cheap first)
messages[:] = snip_compact(messages)
messages[:] = micro_compact(messages)
messages[:] = tool_result_budget(messages)
# s08 change: tokens still over threshold → LLM summary (1 API call)
if estimate_size(messages) > CONTEXT_LIMIT:
print("[auto compact]")
messages[:] = compact_history(messages)
try:
response = client.messages.create(model=MODEL, system=SYSTEM, messages=messages, tools=TOOLS, max_tokens=8000)
except Exception as e:
if "prompt_too_long" in str(e).lower() or "too many tokens" in str(e).lower():
print("[reactive compact]")
messages[:] = reactive_compact(messages)
continue
raise
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use": return
results = []
for block in response.content:
if block.type != "tool_use": continue
print(f"\033[36m> {block.name}\033[0m")
blocked = trigger_hooks("PreToolUse", block)
if blocked: results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(blocked)}); continue
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input) if handler else f"Unknown: {block.name}"
print(str(output)[:200])
results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
messages.append({"role": "user", "content": results})
if __name__ == "__main__":
print("s08: Context Compact")
print("输入问题,回车发送。输入 q 退出。\n")
history = []
while True:
try: query = input("\033[36ms08 >> \033[0m")
except (EOFError, KeyboardInterrupt): break
if query.strip().lower() in ("q", "exit", ""): break
history.append({"role": "user", "content": query})
agent_loop(history)
for block in history[-1]["content"]:
if getattr(block, "type", None) == "text": print(block.text)
print()