From f5802f33629eff38b38606ee1cc435d04d734e69 Mon Sep 17 00:00:00 2001
From: zhangPinkdolphin <2308717915@qq.com>
Date: Sat, 30 May 2026 15:15:08 +0800
Subject: [PATCH] =?UTF-8?q?feat:=20Auto-Summary=20=E4=BC=9A=E8=AF=9D?=
=?UTF-8?q?=E6=97=A5=E5=BF=97=E8=87=AA=E5=8A=A8=E6=91=98=E8=A6=81=E5=B7=A5?=
=?UTF-8?q?=E5=85=B7?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
在 Agent 工作流中自动检测关键决策点(方案选择、确认执行、阶段完成),
将摘要写入 discussion_log.md。
- auto_summary.py: 核心模块,支持 online(钩子模式)和 offline(批量扫描)
- plugins/auto_summary_plugin.py: 通过 hooks 系统注册 turn_after 事件
- tests/test_auto_summary.py: 48 个单元测试覆盖所有功能路径
---
auto_summary.py | 543 +++++++++++++++++++++++++++++
plugins/auto_summary_plugin.py | 67 ++++
tests/test_auto_summary.py | 600 +++++++++++++++++++++++++++++++++
3 files changed, 1210 insertions(+)
create mode 100644 auto_summary.py
create mode 100644 plugins/auto_summary_plugin.py
create mode 100644 tests/test_auto_summary.py
diff --git a/auto_summary.py b/auto_summary.py
new file mode 100644
index 000000000..e6dc785a7
--- /dev/null
+++ b/auto_summary.py
@@ -0,0 +1,543 @@
+#!/usr/bin/env python3
+"""
+Auto-Summary: 会话日志自动摘要工具
+
+在 Agent 工作流中自动检测关键决策点,将摘要写入 discussion_log.md。
+
+两种模式:
+ 1. online(data_dict) — 作为钩子被 agent_loop 调用,传入当前轮次的上下文
+ 2. offline(log_path) — 对已有的 model_responses_*.txt 批量扫描
+
+触发条件(任一条命中即触发):
+ - 用户给出明确方案选择("选X"、"方案X"、"用X")
+ - 用户确认决策("执行"、"确认"、"同意"、"可以")
+ - 完成一个阶段("完成"、"结束"、"下一")
+ - 出现重要事实/结论/教训
+ - 出现总结性内容("总结"、"综上"、"所以")
+
+零依赖(仅标准库)。
+"""
+
+import re
+import json
+import os
+import glob
+from datetime import datetime
+
+# ── 路径配置 ──
+_script_dir = os.path.dirname(os.path.abspath(__file__))
+# 当 auto_summary.py 在代码根目录时,日志在 temp/model_responses/
+DEFAULT_LOG_DIR = _script_dir # 代码根 = GenericAgent3/
+DISCUSSION_LOG = os.path.join(DEFAULT_LOG_DIR, 'discussion_log.md')
+
+
+# ── 触发关键词 ──
+
+_DECISION_PATTERNS = [
+ r'(选|选择|采用|用|取)(\s*方案\s*)?[ABCD一二三四]',
+ r'方案\s*[ABCD一二三四]',
+ r'(选|选择)\s*方案\s*\d',
+ r'(就|就按)\s*(方案|这个|这个方案|你说的)',
+ r'走\s*(方案|路线|方向)\s*[ABCD一二三四]',
+]
+
+_CONFIRM_PATTERNS = [
+ r'^(好|行|可以|同意|确认|执行|开始|就这么办|没问题|OK|ok)',
+ r'^(确认|同意|批准)\s*(执行|开始)',
+ r'(可以|同意)\s*(执行|开始|实施)',
+ r'就这么\s*(定|办|决定)',
+]
+
+_COMPLETION_PATTERNS = [
+ r'(完成|结束|收工|搞定|完毕|通过|交付)',
+ r'下一(步|个|阶段|章节|部分)',
+ r'阶段\s*\d\s*(完成|结束)',
+ r'总结|综上|总而言之|总的来说',
+]
+
+_FACT_PATTERNS = [
+ r'(重要|关键|核心)\s*(发现|结论|事实|教训|启示)',
+ r'(记一下|记住|注意|别忘了|重要的)',
+ r'(教训|经验|学到)',
+ r'(原理|本质|原因)是',
+ r'这是因为|原因在于|根源是',
+]
+
+# 应过滤掉的低价值状态更新模式
+_FILTER_PATTERNS = [
+ r'Subagent.*?(工作|Turn|到).*?完成',
+ r'正在.*?执行.*?步骤',
+ r'子任务.*?完成',
+ r'Tool.*?returned',
+ r'观察.*?结果', # 状态观察
+ r'再等一会儿|等待.*?完成|继续观察',
+ r'已读取完毕',
+ r'etc\.\.\.', # 当 summary 里全是 `...`
+ r'^\s*$', # 空行
+]
+
+
+def _compile(*patterns):
+ return [re.compile(p) for p in patterns]
+
+
+# ── 辅助函数 ──
+
+def _extract_user_text(prompt_block: str) -> str:
+ """从 Prompt JSON 块中提取用户的纯文本消息(过滤掉 tool_result)。"""
+ try:
+ data = json.loads(prompt_block)
+ if not isinstance(data, dict):
+ return ''
+ content = data.get('content', [])
+ texts = []
+ for item in content:
+ if isinstance(item, dict) and item.get('type') == 'text':
+ text = item.get('text', '')
+ if text and not text.startswith('\n### [WORKING MEMORY]') and not text.startswith('\n[SYSTEM'):
+ texts.append(text)
+ return '\n'.join(texts)
+ except (json.JSONDecodeError, Exception):
+ return ''
+
+
+def _extract_agent_text(response_block: str) -> str:
+ """从 Response Python repr 中提取 Agent 的 text 内容。"""
+ try:
+ data = json.loads(response_block)
+ except (json.JSONDecodeError, Exception):
+ try:
+ data = eval(response_block, {'__builtins__': {}}, {})
+ except Exception:
+ return ''
+ if isinstance(data, dict):
+ data = [data]
+ if not isinstance(data, list):
+ return ''
+ texts = []
+ for item in data:
+ if isinstance(item, dict) and item.get('type') == 'text':
+ t = item.get('text', '')
+ if t:
+ texts.append(t)
+ return '\n'.join(texts)
+
+
+def _extract_agent_thinking(response_block: str) -> str:
+ """从 Response 中提取 thinking 内容(用于话题识别)。"""
+ try:
+ data = json.loads(response_block)
+ except Exception:
+ try:
+ data = eval(response_block, {'__builtins__': {}}, {})
+ except Exception:
+ return ''
+ if isinstance(data, dict):
+ data = [data]
+ if not isinstance(data, list):
+ return ''
+ thoughts = []
+ for item in data:
+ if isinstance(item, dict) and item.get('type') == 'thinking':
+ t = item.get('thinking', '')
+ if t:
+ thoughts.append(t)
+ return '\n'.join(thoughts)
+
+
+def _extract_text_from_response(response) -> str:
+ """从 LLM response 对象中提取纯文本(用于 hook 模式)。
+
+ 兼容:
+ - Anthropic: response.content = [TextBlock(text='...'), ...]
+ - OpenAI: response.content = '...'
+ """
+ if isinstance(response, str):
+ return response
+ if hasattr(response, 'content'):
+ content = response.content
+ if isinstance(content, str):
+ return content
+ if isinstance(content, list):
+ texts = []
+ for block in content:
+ if isinstance(block, dict) and block.get('type') == 'text':
+ texts.append(block.get('text', ''))
+ elif hasattr(block, 'type') and block.type == 'text':
+ texts.append(getattr(block, 'text', ''))
+ return '\n'.join(texts)
+ # 兼容 dict 格式的 response(如 {'content': '...'})
+ if isinstance(response, dict):
+ return response.get('content', str(response))
+ return str(response)
+
+
+def _is_low_value(text: str) -> bool:
+ """判断是否为低价值的自动状态更新。"""
+ if not text or len(text) < 10:
+ return True
+ for pat in _FILTER_PATTERNS:
+ if re.search(pat, text, re.IGNORECASE):
+ return True
+ return False
+
+
+def _detect_triggers(text: str) -> list:
+ """检测文本命中哪些触发条件,返回标签列表。"""
+ tags = []
+ if any(p.search(text) for p in _compile(*_DECISION_PATTERNS)):
+ tags.append('决策:方案选择')
+ if any(p.search(text) for p in _compile(*_CONFIRM_PATTERNS)):
+ tags.append('决策:确认')
+ if any(p.search(text) for p in _compile(*_COMPLETION_PATTERNS)):
+ tags.append('阶段完成')
+ if any(p.search(text) for p in _compile(*_FACT_PATTERNS)):
+ tags.append('重要事实')
+ return tags
+
+
+def _extract_topic(user_text: str, agent_text: str, thinking: str = '') -> str:
+ """从对话中提取话题名称(首句或关键句)。"""
+ for text in [user_text, agent_text, thinking]:
+ if not text:
+ continue
+ lines = text.strip().split('\n')
+ for line in lines:
+ line = line.strip()
+ if not line:
+ continue
+ line = re.sub(r'|', '', line).strip()
+ if not line:
+ continue
+ if len(line) >= 8:
+ return line[:80]
+ return '(未能提取话题)'
+
+
+def _extract_need(user_text: str) -> str:
+ """从用户消息中提取需求描述(前两句话)。"""
+ lines = [l.strip() for l in user_text.split('\n') if l.strip()]
+ need_lines = []
+ for line in lines:
+ if line.startswith('{') or line.startswith('['):
+ continue
+ if line.startswith('') or line.startswith('###'):
+ continue
+ need_lines.append(line)
+ if len(need_lines) >= 2:
+ break
+ return ' '.join(need_lines)[:120] if need_lines else '(未能提取需求)'
+
+
+def _extract_decision(user_text: str, agent_text: str) -> str:
+ """尝试从对话中提取明确的决策内容。"""
+ combined = f"{user_text}\n{agent_text}"
+ # 先清理 HTML/标签噪音
+ cleaned = re.sub(r'|', '', combined)
+
+ for pattern in _DECISION_PATTERNS + _CONFIRM_PATTERNS:
+ m = re.search(pattern, cleaned, re.MULTILINE)
+ if m:
+ start = max(0, m.start() - 40)
+ end = min(len(cleaned), m.end() + 40)
+ context = cleaned[start:end].replace('\n', ' ')
+ context = context.strip()
+ # 过滤掉含有文件路径的片段
+ if re.search(r'[/\\][\w.-]+\.[\w]+', context):
+ continue
+ return context[:100]
+
+ for sentence in re.split(r'[。!?\n]', cleaned):
+ # 要求句子不包含文件路径
+ if re.search(r'[/\\][\w.-]+\.[\w]+', sentence):
+ continue
+ # 关键词检查(不含"用",因其太常见如"用户""使用")
+ if any(kw in sentence for kw in ['选', '决定', '确认', '同意']):
+ if len(sentence) > 5:
+ return sentence.strip()[:100]
+
+ return ''
+
+
+def _generate_tags(user_text: str, agent_text: str, trigger_tags: list) -> list:
+ """自动生成标签。"""
+ tags = set(trigger_tags)
+ combined = (user_text + ' ' + agent_text).lower()
+ keyword_tags = {
+ '写作': '#写作', '代码': '#代码', 'bug': '#bug',
+ '部署': '#部署', '调试': '#调试', '测试': '#测试',
+ '设计': '#设计', '方案': '#方案', 'pr': '#PR',
+ 'github': '#GitHub', '文档': '#文档', '配置': '#配置',
+ '浏览器': '#浏览器', '搜索': '#搜索', '蛋白质': '#蛋白质',
+ '模型': '#模型', '数据': '#数据', '复盘': '#复盘',
+ '计划': '#计划', '决策': '#决策', '讨论': '#讨论',
+ '交付': '#交付', '邮件': '#邮件',
+ }
+ for kw, tag in keyword_tags.items():
+ if kw in combined:
+ tags.add(tag)
+ return sorted(tags)
+
+
+def _format_entry(timestamp: str, topic: str, need: str,
+ discussion: str, decision: str, tags: list) -> str:
+ """格式化为 Markdown 条目。"""
+ parts = ['---', f'{timestamp}']
+ if topic:
+ parts.append(f' 话题: {topic}')
+ if need:
+ parts.append(f' 用户需求: {need}')
+ if discussion:
+ if len(discussion) > 200:
+ discussion = discussion[:200] + '...'
+ parts.append(f' 讨论内容: {discussion}')
+ if decision:
+ parts.append(f' 决策: {decision}')
+ if tags:
+ parts.append(f' 标签: {" ".join(tags)}')
+ parts.append('')
+ return '\n'.join(parts)
+
+
+def _last_entry_hash(log_path: str) -> str:
+ """读取最后一个条目的粗略 hash(去重用)。"""
+ if not os.path.isfile(log_path):
+ return ''
+ try:
+ with open(log_path, 'r', encoding='utf-8') as f:
+ content = f.read()
+ entries = content.strip().split('\n---\n')
+ if entries and entries[-1].strip():
+ # 取最后条目的前 80 个字符作为 hash
+ return entries[-1].strip()[:80]
+ return ''
+ except Exception:
+ return ''
+
+
+# ═══════════════════════════════════════════════
+# 管道模式(Online):单轮次摘要
+# ═══════════════════════════════════════════════
+
+def online(user_message: str = '', agent_response: str = '',
+ turn: int = 0, timestamp: str = '',
+ log_path: str = None,
+ response_obj=None) -> dict:
+ """
+ 在线模式:传入当前轮次的用户消息和 Agent 回复,检测是否需写摘要。
+
+ 参数:
+ user_message: 当前轮次的用户消息(纯文本)
+ agent_response: 当前轮次的 Agent 回复(纯文本)
+ turn: 轮次数
+ timestamp: 时间戳字符串(如 '2026-05-30 11:45')
+ log_path: discussion_log.md 路径
+ response_obj: 原始 LLM response 对象(用于自动提取 agent_response)
+
+ 返回:
+ {'written': bool, 'tags': list, 'entry': str}
+ """
+ if log_path is None:
+ log_path = DISCUSSION_LOG
+
+ if not timestamp:
+ timestamp = datetime.now().strftime('%Y-%m-%d %H:%M')
+
+ # 如果传入了 response_obj,自动提取 agent_response
+ if response_obj and not agent_response:
+ agent_response = _extract_text_from_response(response_obj) or ''
+
+ # 如果 user_message 和 agent_response 都为空,跳过
+ if not user_message and not agent_response:
+ return {'written': False, 'tags': [], 'entry': ''}
+
+ # 检测是否为低价值状态更新
+ combined = f"{user_message}\n{agent_response}"
+ if _is_low_value(combined):
+ return {'written': False, 'tags': [], 'entry': ''}
+
+ # 检测触发
+ trigger_tags = _detect_triggers(combined)
+
+ if not trigger_tags:
+ return {'written': False, 'tags': [], 'entry': ''}
+
+ # 提取信息
+ topic = _extract_topic(user_message, agent_response)
+ need = _extract_need(user_message) if user_message else ''
+ discussion = agent_response[:200] if agent_response else ''
+ decision = _extract_decision(user_message, agent_response)
+ tags = _generate_tags(user_message, agent_response, trigger_tags)
+
+ # 格式化为 Markdown 条目
+ entry = _format_entry(timestamp, topic, need, discussion, decision, tags)
+
+ # 去重检查:与最后一条比较
+ last_hash = _last_entry_hash(log_path)
+ if last_hash and entry.strip()[:80] == last_hash:
+ return {'written': False, 'tags': tags, 'entry': entry.strip(), 'dedup': True}
+
+ # 写入日志
+ log_path = os.path.abspath(log_path)
+ os.makedirs(os.path.dirname(log_path), exist_ok=True)
+ with open(log_path, 'a', encoding='utf-8') as f:
+ f.write(entry + '\n')
+
+ return {'written': True, 'tags': tags, 'entry': entry.strip()}
+
+
+# ═══════════════════════════════════════════════
+# 离线模式:扫描已有日志文件
+# ═══════════════════════════════════════════════
+
+def _parse_log_file(filepath: str) -> list:
+ """
+ 解析 model_responses_*.txt,返回轮次列表。
+ 每轮: {'timestamp': str, 'user_msg': str, 'agent_msg': str, 'thinking': str}
+ """
+ turns = []
+ with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
+ content = f.read()
+
+ blocks = re.split(r'^=== (Prompt|Response) === (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\n',
+ content, flags=re.MULTILINE)
+
+ current_prompt_ts = None
+ current_prompt_body = None
+
+ i = 0
+ while i < len(blocks):
+ if blocks[i] in ('Prompt', 'Response'):
+ label = blocks[i]
+ ts = blocks[i+1]
+ body = blocks[i+2] if i+2 < len(blocks) else ''
+ i += 3
+
+ if label == 'Prompt':
+ current_prompt_ts = ts
+ current_prompt_body = body
+ elif label == 'Response':
+ if current_prompt_ts:
+ user_text = _extract_user_text(current_prompt_body or '')
+ agent_text = _extract_agent_text(body)
+ thinking = _extract_agent_thinking(body)
+ turns.append({
+ 'timestamp': current_prompt_ts,
+ 'user_msg': user_text,
+ 'agent_msg': agent_text,
+ 'thinking': thinking,
+ })
+ current_prompt_ts = None
+ current_prompt_body = None
+ else:
+ i += 1
+
+ return turns
+
+
+def offline(log_path: str = None, output_path: str = None,
+ all_files: bool = True, max_files: int = 0) -> dict:
+ """
+ 离线模式:扫描已有日志文件,批量生成摘要。
+
+ 参数:
+ log_path: 单个日志文件路径(如果指定,则只扫这个文件)
+ output_path: discussion_log.md 路径(默认为 temp/discussion_log.md)
+ all_files: 是否扫描 model_responses/ 下所有文件
+ max_files: 扫描文件数上限(0=不限,仅当 all_files=True 时生效)
+
+ 返回:
+ {'entries_written': int, 'files_scanned': int, 'turns_analyzed': int}
+ """
+ if output_path is None:
+ output_path = DISCUSSION_LOG
+
+ if log_path:
+ files = [log_path]
+ else:
+ responses_dir = os.path.join(DEFAULT_LOG_DIR, 'temp', 'model_responses')
+ if not os.path.isdir(responses_dir):
+ # 回退:从 temp/ 外的代码根找
+ responses_dir = os.path.join(DEFAULT_LOG_DIR, 'model_responses')
+ files = sorted(glob.glob(os.path.join(responses_dir, 'model_responses_*.txt')))
+ if max_files > 0 and len(files) > max_files:
+ files = files[-max_files:]
+
+ total_entries = 0
+ total_turns = 0
+
+ for fpath in files:
+ if not os.path.isfile(fpath):
+ continue
+ try:
+ turns = _parse_log_file(fpath)
+ except Exception as e:
+ print(f" ⚠ 解析失败: {fpath} — {e}")
+ continue
+
+ total_turns += len(turns)
+
+ for turn_data in turns:
+ result = online(
+ user_message=turn_data['user_msg'],
+ agent_response=turn_data['agent_msg'],
+ timestamp=turn_data['timestamp'][:16],
+ log_path=output_path,
+ )
+ if result.get('written'):
+ total_entries += 1
+
+ return {
+ 'entries_written': total_entries,
+ 'files_scanned': len(files),
+ 'turns_analyzed': total_turns,
+ }
+
+
+# ═══════════════════════════════════════════════
+# 命令行入口
+# ═══════════════════════════════════════════════
+
+def main():
+ import argparse
+
+ parser = argparse.ArgumentParser(
+ description='Auto-Summary: 会话日志自动摘要工具',
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog="""
+示例:
+ # 扫描所有日志生成摘要
+ python auto_summary.py
+
+ # 扫描单个日志文件
+ python auto_summary.py -f model_responses_967045.txt
+
+ # 指定输出路径
+ python auto_summary.py -o ~/discussion_log.md
+ """
+ )
+ parser.add_argument('-f', '--file', help='指定单个日志文件路径')
+ parser.add_argument('-o', '--output', help='discussion_log.md 输出路径')
+ parser.add_argument('-n', '--max-files', type=int, default=0,
+ help='扫描文件数上限(0=全部)')
+
+ args = parser.parse_args()
+
+ print("Auto-Summary 开始扫描...")
+ print(f" 输出目标: {args.output or DISCUSSION_LOG}")
+
+ result = offline(
+ log_path=args.file,
+ output_path=args.output,
+ all_files=(args.max_files == 0),
+ max_files=args.max_files,
+ )
+
+ print(f" 扫描文件: {result['files_scanned']}")
+ print(f" 分析轮次: {result['turns_analyzed']}")
+ print(f" 写入条目: {result['entries_written']}")
+ print("完成。")
+
+
+if __name__ == '__main__':
+ main()
diff --git a/plugins/auto_summary_plugin.py b/plugins/auto_summary_plugin.py
new file mode 100644
index 000000000..acdaade10
--- /dev/null
+++ b/plugins/auto_summary_plugin.py
@@ -0,0 +1,67 @@
+"""
+Auto-Summary Plugin: 通过 hook 系统自动记录关键决策/阶段完成到 discussion_log.md。
+
+安装:
+ - 确保 auto_summary.py 在代码根目录
+ - 无需修改 agent_loop.py,插件通过 import 自动注册 hook
+ - 可通过移除环境变量 AUTO_SUMMARY_DISABLE=1 禁用
+
+工作原理:
+ 钩在 'turn_after' 事件上,提取用户消息和 Agent 回复,
+ 调用 auto_summary.online() 检测触发条件并写入 discussion_log.md。
+"""
+
+import os
+import sys
+
+# 将代码根加入 path(这样能 import auto_summary)
+_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
+_CODE_ROOT = os.path.abspath(os.path.join(_SCRIPT_DIR, '..'))
+if _CODE_ROOT not in sys.path:
+ sys.path.insert(0, _CODE_ROOT)
+
+# 如果设置了禁用环境变量,跳过
+if os.environ.get('AUTO_SUMMARY_DISABLE'):
+ # 静默跳过
+ pass
+else:
+ import plugins.hooks as hooks
+ import auto_summary
+
+ @hooks.register('turn_after')
+ def _auto_summary_on_turn_end(ctx):
+ """在每次 Agent 轮次结束时检查是否需记录摘要。"""
+ try:
+ # 提取用户消息
+ user_msg = ''
+ # 优先用原始的 user_input
+ if ctx.get('user_input'):
+ user_msg = ctx['user_input']
+ # 如果有 next_prompts,用最新的
+ next_prompts = ctx.get('next_prompts') or []
+ if next_prompts and next_prompts[-1]:
+ user_msg = next_prompts[-1]
+
+ # 提取 Agent 回复
+ response = ctx.get('response')
+
+ # 获取轮次和时间
+ turn = ctx.get('turn', 0)
+ from datetime import datetime
+ timestamp = datetime.now().strftime('%Y-%m-%d %H:%M')
+
+ # 调用 auto_summary.online()
+ result = auto_summary.online(
+ user_message=user_msg,
+ response_obj=response,
+ turn=turn,
+ timestamp=timestamp,
+ )
+
+ if result.get('written'):
+ tags = ' '.join(result.get('tags', []))
+ print(f"[Auto-Summary] ✓ 记录摘要 ({tags})")
+
+ except Exception as e:
+ # 插件不中断主流程
+ print(f"[Auto-Summary] ⚠ 插件异常: {e}")
diff --git a/tests/test_auto_summary.py b/tests/test_auto_summary.py
new file mode 100644
index 000000000..3835829bf
--- /dev/null
+++ b/tests/test_auto_summary.py
@@ -0,0 +1,600 @@
+#!/usr/bin/env python3
+"""
+Auto-Summary 单元测试。
+"""
+
+import os
+import sys
+import json
+import tempfile
+import unittest
+
+# 把代码根加入 path
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
+
+from auto_summary import (
+ _extract_user_text,
+ _extract_agent_text,
+ _extract_text_from_response,
+ _is_low_value,
+ _detect_triggers,
+ _extract_topic,
+ _extract_need,
+ _extract_decision,
+ _generate_tags,
+ _format_entry,
+ _last_entry_hash,
+ online,
+ offline,
+ _parse_log_file,
+)
+
+
+class TestExtractUserText(unittest.TestCase):
+ """测试从 Prompt JSON 块提取用户文本。"""
+
+ def test_extract_simple(self):
+ """简单文本提取。"""
+ block = json.dumps({
+ 'content': [
+ {'type': 'text', 'text': '帮我写一个 Python 脚本'},
+ ]
+ })
+ result = _extract_user_text(block)
+ self.assertEqual(result, '帮我写一个 Python 脚本')
+
+ def test_extract_multiple_texts(self):
+ """多个文本块。"""
+ block = json.dumps({
+ 'content': [
+ {'type': 'text', 'text': '第一步'},
+ {'type': 'text', 'text': '第二步'},
+ ]
+ })
+ result = _extract_user_text(block)
+ self.assertIn('第一步', result)
+ self.assertIn('第二步', result)
+
+ def test_ignore_tool_result(self):
+ """忽略 tool_result 或其他类型。"""
+ block = json.dumps({
+ 'content': [
+ {'type': 'tool_result', 'content': '{}'},
+ {'type': 'text', 'text': '用户消息'},
+ ]
+ })
+ result = _extract_user_text(block)
+ self.assertEqual(result, '用户消息')
+
+ def test_ignore_system_blocks(self):
+ """忽略带 WORKING MEMORY 和 SYSTEM 的文本块。"""
+ block = json.dumps({
+ 'content': [
+ {'type': 'text', 'text': '\n### [WORKING MEMORY]\n...'},
+ {'type': 'text', 'text': '实际用户消息'},
+ ]
+ })
+ result = _extract_user_text(block)
+ self.assertEqual(result, '实际用户消息')
+
+ def test_empty_block(self):
+ """空块返回空字符串。"""
+ self.assertEqual(_extract_user_text('{}'), '')
+ self.assertEqual(_extract_user_text(''), '')
+ self.assertEqual(_extract_user_text('not json'), '')
+
+
+class TestExtractAgentText(unittest.TestCase):
+ """测试从 Response 块提取 Agent 文本。"""
+
+ def test_extract_simple(self):
+ """简单列表格式。"""
+ block = json.dumps([
+ {'type': 'text', 'text': '这是回复内容。'}
+ ])
+ result = _extract_agent_text(block)
+ self.assertEqual(result, '这是回复内容。')
+
+ def test_extract_with_thinking(self):
+ """混合 thinking 和 text。"""
+ block = json.dumps([
+ {'type': 'thinking', 'thinking': '内部思考...'},
+ {'type': 'text', 'text': '最终回复。'},
+ ])
+ result = _extract_agent_text(block)
+ self.assertEqual(result, '最终回复。')
+
+ def test_extract_multiple_texts(self):
+ """多个 text 块。"""
+ block = json.dumps([
+ {'type': 'text', 'text': '第一段。'},
+ {'type': 'text', 'text': '第二段。'},
+ ])
+ result = _extract_agent_text(block)
+ self.assertEqual(result, '第一段。\n第二段。')
+
+ def test_extract_no_text(self):
+ """没有 text 块。"""
+ block = json.dumps([
+ {'type': 'thinking', 'thinking': '思考中'}
+ ])
+ self.assertEqual(_extract_agent_text(block), '')
+
+
+class TestExtractTextFromResponse(unittest.TestCase):
+ """测试从 LLM response 对象提取文本。"""
+
+ def test_string_response(self):
+ """字符串直接返回。"""
+ self.assertEqual(_extract_text_from_response('hello'), 'hello')
+
+ def test_dict_response(self):
+ """带 content 的 dict 格式。"""
+ response = type('Response', (), {'content': 'text content'})()
+ self.assertEqual(_extract_text_from_response(response), 'text content')
+
+ def test_list_response(self):
+ """带 content list 的格式(类 Anthropic)。"""
+ class TextBlock:
+ def __init__(self, text):
+ self.type = 'text'
+ self.text = text
+ response = type('Response', (), {'content': [
+ TextBlock('hello'),
+ TextBlock('world'),
+ ]})()
+ self.assertEqual(_extract_text_from_response(response), 'hello\nworld')
+
+ def test_dict_list_response(self):
+ """content list 中是 dict 格式。"""
+ response = type('Response', (), {'content': [
+ {'type': 'text', 'text': 'hello'},
+ {'type': 'text', 'text': 'world'},
+ ]})()
+ self.assertEqual(_extract_text_from_response(response), 'hello\nworld')
+
+
+class TestIsLowValue(unittest.TestCase):
+ """测试低价值状态更新过滤。"""
+
+ def test_short_message(self):
+ """短消息被认为是低价值。"""
+ self.assertTrue(_is_low_value('short'))
+ self.assertTrue(_is_low_value(''))
+
+ def test_subagent_status(self):
+ """Subagent 状态更新被过滤。"""
+ texts = [
+ 'Subagent 正在工作(Turn 1 已完成环境探测)',
+ 'Subagent 已到 Turn 4,继续观察完成状态',
+ 'Subagent 在工作完成中',
+ ]
+ for t in texts:
+ self.assertTrue(_is_low_value(t), f"未过滤: {t}")
+
+ def test_wait_patterns(self):
+ """等待/观察模式被过滤。"""
+ texts = [
+ '接近完成,再等一会儿收结果',
+ '等待子任务完成',
+ '继续观察完成状态',
+ ]
+ for t in texts:
+ self.assertTrue(_is_low_value(t), f"未过滤: {t}")
+
+ def test_read_complete(self):
+ """已读取完毕被过滤。"""
+ self.assertTrue(_is_low_value('已读取完毕。以下是内容...'))
+
+ def test_meaningful_text_not_filtered(self):
+ """有意义的文本不应被过滤。"""
+ texts = [
+ '确认执行方案B,开始实现代码吧', # 15 chars, >= 10 threshold
+ '我决定采用Plan Mode来规划任务',
+ '重要发现:模型在10轮后开始过拟合',
+ '总结一下:这个方案有三个优点',
+ ]
+ for t in texts:
+ self.assertFalse(_is_low_value(t), f"被误过滤: {t}")
+
+
+class TestDetectTriggers(unittest.TestCase):
+ """测试触发条件检测。"""
+
+ def test_detect_decision(self):
+ """检测方案选择。"""
+ texts = [
+ '我选方案A',
+ '采用方案B来处理',
+ '用方案一二三',
+ '就按你说的方案来',
+ '走方案C路线',
+ ]
+ for t in texts:
+ tags = _detect_triggers(t)
+ self.assertIn('决策:方案选择', tags, f"未检测到决策: {t}")
+
+ def test_detect_confirm(self):
+ """检测确认。"""
+ texts = [
+ '可以,开始执行',
+ '确认执行计划',
+ '同意,开始实施',
+ '就这么办',
+ '好,开始吧',
+ ]
+ for t in texts:
+ tags = _detect_triggers(t)
+ self.assertIn('决策:确认', tags, f"未检测到确认: {t}")
+
+ def test_detect_completion(self):
+ """检测阶段完成。"""
+ texts = [
+ '任务完成',
+ '第一阶段结束',
+ '收工',
+ '搞定',
+ '总结一下方案',
+ ]
+ for t in texts:
+ tags = _detect_triggers(t)
+ self.assertIn('阶段完成', tags, f"未检测到完成: {t}")
+
+ def test_detect_fact(self):
+ """检测重要事实。"""
+ texts = [
+ '重要发现:性能提升了50%',
+ '关键结论是这个方案可行',
+ '教训是不要过早优化',
+ '记一下这个参数很重要',
+ ]
+ for t in texts:
+ tags = _detect_triggers(t)
+ self.assertIn('重要事实', tags, f"未检测到事实: {t}")
+
+ def test_no_trigger(self):
+ """普通文本不应触发。"""
+ self.assertEqual(_detect_triggers('今天天气不错'), [])
+ self.assertEqual(_detect_triggers('让我先查一下文档'), [])
+ self.assertEqual(_detect_triggers('今天天气挺不错的'), [])
+
+
+class TestExtractTopic(unittest.TestCase):
+ """测试话题提取。"""
+
+ def test_from_user_text(self):
+ """从用户文本提取话题。"""
+ topic = _extract_topic(
+ '帮我写一个 Python 脚本处理数据',
+ '好的,我来写这个脚本。'
+ )
+ self.assertIn('Python', topic)
+
+ def test_from_agent_text(self):
+ """从 Agent 文本提取话题。"""
+ topic = _extract_topic(
+ '',
+ '已经完成数据分析。'
+ )
+ self.assertEqual(topic, '已经完成数据分析。')
+
+ def test_fallback(self):
+ """无有效话题时返回占位。"""
+ topic = _extract_topic('', '', '')
+ self.assertEqual(topic, '(未能提取话题)')
+
+
+class TestExtractNeed(unittest.TestCase):
+ """测试用户需求提取。"""
+
+ def test_simple_need(self):
+ """提取前两句。"""
+ need = _extract_need('帮我写个脚本。需要处理 CSV 文件。')
+ self.assertIn('帮我写个脚本', need)
+
+ def test_ignore_blocks(self):
+ """忽略 JSON 块和摘要标签。"""
+ need = _extract_need('{}\n[1,2,3]\n实际需求\n补充说明')
+ self.assertIn('实际需求', need)
+ self.assertIn('补充说明', need)
+ self.assertNotIn('{', need)
+
+
+class TestExtractDecision(unittest.TestCase):
+ """测试决策内容提取。"""
+
+ def test_extract_decision_pattern(self):
+ """从方案选择模式提取。"""
+ decision = _extract_decision(
+ '我选方案B。',
+ '好的,开始执行方案B。'
+ )
+ self.assertTrue(len(decision) > 0)
+
+ def test_extract_confirm(self):
+ """从确认模式提取。"""
+ decision = _extract_decision(
+ '确认执行。',
+ '收到,开始执行计划。'
+ )
+ self.assertIn('确认', decision)
+
+ def test_no_filepath_noise(self):
+ """不提取含文件路径的决策。"""
+ decision = _extract_decision(
+ '读一下 projects/session_index_pr/HANDOFF.md',
+ '用户要求读取一个文件。'
+ )
+ # "用户"中的"用"不应触发决策提取
+ self.assertEqual(decision, '')
+
+ def test_no_false_positive(self):
+ """普通对话不应提取出虚假决策。"""
+ decision = _extract_decision(
+ '继续推进项目',
+ '了解,我来看一下具体方案。'
+ )
+ self.assertEqual(decision, '')
+
+
+class TestGenerateTags(unittest.TestCase):
+ """测试标签生成。"""
+
+ def test_keyword_tags(self):
+ """关键词标签自动追加。"""
+ tags = _generate_tags(
+ '帮我写一个 Python 脚本处理数据',
+ '好的,使用pandas处理',
+ ['阶段完成']
+ )
+ self.assertIn('#数据', tags)
+
+ def test_pr_tag(self):
+ """PR 关键词触发 #PR 标签。"""
+ tags = _generate_tags(
+ '提交一个 PR',
+ '',
+ []
+ )
+ self.assertIn('#PR', tags)
+
+
+class TestFormatEntry(unittest.TestCase):
+ """测试 Markdown 格式化。"""
+
+ def test_format_simple(self):
+ """基本格式检查。"""
+ entry = _format_entry(
+ '2026-05-30 12:00',
+ '话题',
+ '需求',
+ '讨论内容',
+ '决策内容',
+ ['阶段完成', '#代码']
+ )
+ self.assertIn('2026-05-30 12:00', entry)
+ self.assertIn('话题: 话题', entry)
+ self.assertIn('用户需求: 需求', entry)
+ self.assertIn('讨论内容: 讨论内容', entry)
+ self.assertIn('决策: 决策内容', entry)
+ self.assertIn('标签: 阶段完成 #代码', entry)
+
+ def test_truncated_discussion(self):
+ """讨论内容超长截断。"""
+ long_text = 'A' * 250
+ entry = _format_entry(
+ '2026-05-30 12:00', '话题', '', long_text, '', []
+ )
+ self.assertIn('...', entry)
+ self.assertLess(len(entry), 400)
+
+
+class TestLastEntryHash(unittest.TestCase):
+ """测试去重哈希。"""
+
+ def test_empty_file(self):
+ """空文件返回空。"""
+ with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
+ f.write('')
+ path = f.name
+ try:
+ self.assertEqual(_last_entry_hash(path), '')
+ finally:
+ os.unlink(path)
+
+ def test_last_entry(self):
+ """返回最后一条内容的前 80 字符。"""
+ content = '---\n2026-05-30 12:00\n 话题: test\n 标签: done\n\n'
+ with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
+ f.write(content)
+ path = f.name
+ try:
+ h = _last_entry_hash(path)
+ self.assertTrue(h.startswith('---'), msg=f"Hash starts with: {h[:20]!r}")
+ finally:
+ os.unlink(path)
+
+
+class TestOnline(unittest.TestCase):
+ """测试在线模式。"""
+
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp()
+ self.log_path = os.path.join(self.tmpdir, 'discussion_log.md')
+
+ def tearDown(self):
+ if os.path.isfile(self.log_path):
+ os.unlink(self.log_path)
+ os.rmdir(self.tmpdir)
+
+ def test_decision_triggers_write(self):
+ """方案选择触发写入。"""
+ result = online(
+ user_message='我选方案B,开始实现吧',
+ agent_response='好的,开始执行方案B。',
+ log_path=self.log_path,
+ )
+ self.assertTrue(result['written'])
+ self.assertIn('决策:方案选择', result['tags'])
+
+ def test_confirm_triggers_write(self):
+ """确认触发写入。"""
+ result = online(
+ user_message='确认执行',
+ agent_response='收到,开始执行。',
+ log_path=self.log_path,
+ )
+ self.assertTrue(result['written'])
+ self.assertIn('决策:确认', result['tags'])
+
+ def test_completion_triggers_write(self):
+ """阶段完成触发写入。"""
+ result = online(
+ user_message='第一阶段完成',
+ agent_response='总结一下成果。',
+ log_path=self.log_path,
+ )
+ self.assertTrue(result['written'])
+ self.assertIn('阶段完成', result['tags'])
+
+ def test_low_value_skipped(self):
+ """低价值状态更新跳过。"""
+ result = online(
+ user_message='',
+ agent_response='Subagent 正在工作(Turn 1 已完成环境探测)',
+ log_path=self.log_path,
+ )
+ self.assertFalse(result['written'])
+
+ def test_no_trigger_skipped(self):
+ """无触发条件时跳过。"""
+ result = online(
+ user_message='今天天气不错',
+ agent_response='是的,适合户外活动。',
+ log_path=self.log_path,
+ )
+ self.assertFalse(result['written'])
+
+ def test_dedup(self):
+ """连续相同条目去重。"""
+ result1 = online(
+ user_message='确认执行方案',
+ agent_response='收到。',
+ log_path=self.log_path,
+ )
+ if result1['written']:
+ result2 = online(
+ user_message='确认执行方案',
+ agent_response='收到。',
+ log_path=self.log_path,
+ )
+ self.assertFalse(result2.get('written', False),
+ msg="应检测到重复条目")
+
+
+class TestParseLogFile(unittest.TestCase):
+ """测试日志文件解析。"""
+
+ def _make_log(self, lines: list) -> str:
+ """Helper: 创建临时日志文件。"""
+ fd, path = tempfile.mkstemp(suffix='.txt', prefix='model_responses_')
+ with os.fdopen(fd, 'w') as f:
+ f.write('\n'.join(lines))
+ return path
+
+ def test_simple_turn(self):
+ """解析一个完整的轮次。"""
+ prompt = json.dumps({
+ 'content': [{'type': 'text', 'text': '用户消息'}]
+ })
+ response = json.dumps([
+ {'type': 'text', 'text': 'Agent回复'}
+ ])
+ log_lines = [
+ f'=== Prompt === 2026-05-30 12:00:00',
+ prompt,
+ '',
+ f'=== Response === 2026-05-30 12:00:05',
+ response,
+ ]
+ path = self._make_log(log_lines)
+ try:
+ turns = _parse_log_file(path)
+ self.assertEqual(len(turns), 1)
+ self.assertEqual(turns[0]['user_msg'], '用户消息')
+ self.assertEqual(turns[0]['agent_msg'], 'Agent回复')
+ finally:
+ os.unlink(path)
+
+ def test_multiple_turns(self):
+ """解析多个轮次。"""
+ prompt1 = json.dumps({'content': [{'type': 'text', 'text': '第一轮'}]})
+ resp1 = json.dumps([{'type': 'text', 'text': '第一轮回复'}])
+ prompt2 = json.dumps({'content': [{'type': 'text', 'text': '第二轮'}]})
+ resp2 = json.dumps([{'type': 'text', 'text': '第二轮回复'}])
+ log_lines = [
+ f'=== Prompt === 2026-05-30 12:00:00',
+ prompt1, '',
+ f'=== Response === 2026-05-30 12:00:05',
+ resp1, '',
+ f'=== Prompt === 2026-05-30 12:01:00',
+ prompt2, '',
+ f'=== Response === 2026-05-30 12:01:05',
+ resp2,
+ ]
+ path = self._make_log(log_lines)
+ try:
+ turns = _parse_log_file(path)
+ self.assertEqual(len(turns), 2)
+ finally:
+ os.unlink(path)
+
+
+class TestOfflineIntegration(unittest.TestCase):
+ """集成测试:离线模式处理真实日志。"""
+
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp()
+ self.output = os.path.join(self.tmpdir, 'discussion_log.md')
+
+ def tearDown(self):
+ if os.path.isfile(self.output):
+ os.unlink(self.output)
+ os.rmdir(self.tmpdir)
+
+ def test_process_real_log(self):
+ """处理真实日志文件。"""
+ # 使用项目目录下的真实日志
+ log_dir = os.path.join(
+ os.path.dirname(__file__), '..', 'temp', 'model_responses'
+ )
+ logs = sorted([
+ os.path.join(log_dir, f)
+ for f in os.listdir(log_dir)
+ if f.startswith('model_responses_') and f.endswith('.txt')
+ ])
+ if not logs:
+ self.skipTest("无真实日志文件可用")
+
+ result = offline(
+ log_path=logs[0],
+ output_path=self.output,
+ )
+ self.assertGreaterEqual(result['turns_analyzed'], 0)
+ self.assertGreaterEqual(result['files_scanned'], 1)
+ # 可能是0,如果日志中没有触发条件
+ self.assertGreaterEqual(result['entries_written'], 0)
+
+ def test_no_logs(self):
+ """日志目录不存在时优雅降级。"""
+ result = offline(
+ log_path='/tmp/nonexistent_log.txt',
+ output_path=self.output,
+ )
+ self.assertEqual(result['files_scanned'], 1)
+ self.assertEqual(result['turns_analyzed'], 0)
+ self.assertEqual(result['entries_written'], 0)
+
+
+if __name__ == '__main__':
+ unittest.main()