From 088dea0e848ccd7e58d191c3a827bcabf802f0d1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 29 May 2026 00:54:21 +0000 Subject: [PATCH 1/3] fix(setup): allow run_setup() current_version override so release notes work in source checkouts Cherry-picked from manolitnora/fix/setup-current-version-override (PR #36) Original commit: 2714f93 --- src/setup.py | 8 +++++++- tests/test_setup_runtime_checks.py | 11 ++++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/setup.py b/src/setup.py index 75d055d..d8344b0 100644 --- a/src/setup.py +++ b/src/setup.py @@ -129,6 +129,7 @@ def run_setup( cwd: Path | None = None, trusted: bool = True, last_seen_version: str | None = None, + current_version: str | None = None, ) -> SetupReport: root = cwd or Path(__file__).resolve().parent.parent prefetches = [ @@ -136,8 +137,13 @@ def run_setup( start_keychain_prefetch(), start_project_scan(root), ] + # When the package is not installed via pip (dev / source checkout), + # `_read_package_version()` returns '0.0.0' — which makes any non-trivial + # `last_seen_version` compare as "newer than current" and suppresses + # release notes. Allow callers (tests, dev runs) to override. + effective_version = current_version if current_version is not None else _read_package_version() release_notes_payload = check_for_release_notes( - current_version=_read_package_version(), + current_version=effective_version, last_seen_version=last_seen_version, cwd=root, ) diff --git a/tests/test_setup_runtime_checks.py b/tests/test_setup_runtime_checks.py index 02de29a..c1b0350 100644 --- a/tests/test_setup_runtime_checks.py +++ b/tests/test_setup_runtime_checks.py @@ -44,7 +44,16 @@ def test_run_setup_reports_runtime_and_release_notes(self) -> None: (Path(tmp) / 'CHANGELOG.md').write_text( '# Changelog\n\n## 9.9.9\n- big release\n', encoding='utf-8', ) - report = run_setup(cwd=Path(tmp), trusted=True, last_seen_version='0.0.1') + # Override current_version because in a source checkout the + # installed package metadata reports 0.0.0, which would make + # last_seen_version='0.0.1' compare as already-current and + # suppress all release notes. + report = run_setup( + cwd=Path(tmp), + trusted=True, + last_seen_version='0.0.1', + current_version='9.9.9', + ) self.assertIsInstance(report, SetupReport) self.assertGreaterEqual(len(report.runtime_checks), 3) self.assertIn('big release', report.release_notes) From 35f86406c242e8edb75b6824bb8bff7187d65efe Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 29 May 2026 00:54:37 +0000 Subject: [PATCH 2/3] fix(delegate): record canonical 'delegate_agent' action regardless of alias used Cherry-picked from manolitnora/fix/delegate-agent-canonical-action (PR #35) Original commit: 8a358e0 --- src/agent_runtime.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/agent_runtime.py b/src/agent_runtime.py index 60c6ec3..e6c3968 100644 --- a/src/agent_runtime.py +++ b/src/agent_runtime.py @@ -2610,7 +2610,11 @@ def _execute_delegate_agent( ok=True, content='\n'.join(summary_lines).strip(), metadata={ - 'action': 'Agent', + # Canonical name regardless of the alias the model invoked + # the tool with ('Agent' or 'delegate_agent'). Downstream + # consumers (file_history, observability) expect the stable + # historical identifier. Tests pin this contract. + 'action': 'delegate_agent', 'subagent_type': agent_def.agent_type, 'child_session_id': child_result.session_id, 'child_session_ids': child_session_ids, From c6fe52cc009ce2670f5fbb42cd7722e2cd423456 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 29 May 2026 01:07:54 +0000 Subject: [PATCH 3/3] fix: resolve merge conflicts for PRs #32, #35, #36, #37, #38 - PR #36: setup version override (already applied) - PR #35: delegate canonical action (already applied) - PR #37: web-fetch binary refusal + tests - PR #38: edit-loop guard, self-authored tagging, bash banner, markdown churn guard + tests - PR #32: secret redaction at tool ingestion (agent_session.py) + secret-bearing path guard in agent_tools.py + tests --- src/agent_session.py | 66 ++++- src/agent_tools.py | 238 +++++++++++++++++- tests/test_agent_tools_secret_path_guard.py | 115 +++++++++ tests/test_churn_and_bash_banner.py | 136 ++++++++++ tests/test_edit_loop_and_self_authored.py | 122 +++++++++ ...test_secret_redaction_on_tool_ingestion.py | 133 ++++++++++ tests/test_web_fetch_binary_refusal.py | 127 ++++++++++ 7 files changed, 931 insertions(+), 6 deletions(-) create mode 100644 tests/test_agent_tools_secret_path_guard.py create mode 100644 tests/test_churn_and_bash_banner.py create mode 100644 tests/test_edit_loop_and_self_authored.py create mode 100644 tests/test_secret_redaction_on_tool_ingestion.py create mode 100644 tests/test_web_fetch_binary_refusal.py diff --git a/src/agent_session.py b/src/agent_session.py index 6504169..b89d9b8 100644 --- a/src/agent_session.py +++ b/src/agent_session.py @@ -1,11 +1,68 @@ from __future__ import annotations +import re from dataclasses import dataclass, field, replace from typing import Any from .agent_types import UsageStats JSONDict = dict[str, Any] + +# --------------------------------------------------------------------------- +# Secret redaction — applied at tool-result ingestion to prevent leaked tokens +# from poisoning the entire message history. +# --------------------------------------------------------------------------- + +_SECRET_PATTERNS = ( + re.compile(r'\bsk-(ant|proj|or|live|test)-[A-Za-z0-9_\-]{8,}'), + # Stripe uses underscores: sk_live_..., sk_test_..., rk_live_..., rk_test_... + re.compile(r'\b(sk|rk|pk)_(live|test)_[A-Za-z0-9]{16,}'), + re.compile(r'\bghp_[A-Za-z0-9]{20,}'), + re.compile(r'\bAKIA[0-9A-Z]{16,}'), + re.compile(r'\bxoxb-[A-Za-z0-9\-]{20,}'), + # Google API keys: documented as AIza + 35 chars from [A-Za-z0-9_-] + re.compile(r'\bAIza[A-Za-z0-9_\-]{35}\b'), + # JWT: three base64url segments separated by dots; first must start with + # eyJ (which is base64 for `{"`). Less false-positive-prone than `\beyJ`. + re.compile(r'\beyJ[A-Za-z0-9_\-]+\.eyJ[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+'), + re.compile(r'-----BEGIN (RSA|OPENSSH|EC|DSA|PRIVATE) (PRIVATE )?KEY-----'), +) + + +def _secret_kind(token: str) -> str: + if token.startswith('sk-'): + return token.split('-', 2)[1] if '-' in token[3:] else 'sk' + if token.startswith(('sk_', 'rk_', 'pk_')): + return 'stripe' + if token.startswith('ghp_'): + return 'github' + if token.startswith('AKIA'): + return 'aws' + if token.startswith('xoxb-'): + return 'slack' + if token.startswith('AIza'): + return 'google' + if token.startswith('eyJ'): + return 'jwt' + if token.startswith('-----BEGIN'): + return 'pem' + return 'secret' + + +def redact_secrets(text: str) -> str: + """Replace any token matching `_SECRET_PATTERNS` with ``[REDACTED:]``. + + Applied at tool-result ingestion so a ``Read`` of an env file does not + poison the entire message history with live credentials. + """ + if not text: + return text + redacted = text + for pattern in _SECRET_PATTERNS: + redacted = pattern.sub( + lambda m: f'[REDACTED:{_secret_kind(m.group(0))}]', redacted + ) + return redacted MAX_MUTATION_HISTORY = 8 @@ -306,6 +363,7 @@ def append_user( ) def append_tool(self, name: str, tool_call_id: str, content: str) -> None: + content = redact_secrets(content) self.messages.append( AgentMessage( role='tool', @@ -371,10 +429,11 @@ def append_tool_delta( merged_metadata = _advance_lineage_revision(merged_metadata) if metadata: merged_metadata.update(metadata) + new_content = redact_secrets(message.content + delta) self.messages[index] = replace( message, - content=message.content + delta, - blocks=_tool_blocks(message.name, message.tool_call_id, message.content + delta), + content=new_content, + blocks=_tool_blocks(message.name, message.tool_call_id, new_content), metadata=merged_metadata, ) @@ -401,6 +460,7 @@ def finalize_tool( merged_metadata = _advance_lineage_revision(merged_metadata) if metadata: merged_metadata.update(metadata) + content = redact_secrets(content) self.messages[index] = replace( message, content=content, @@ -422,6 +482,8 @@ def update_message( ) -> None: message = self.messages[index] merged_metadata = dict(message.metadata) + if content is not None and message.role == 'tool': + content = redact_secrets(content) new_content = message.content if content is None else content new_state = message.state if state is None else state new_stop_reason = message.stop_reason if stop_reason is None else stop_reason diff --git a/src/agent_tools.py b/src/agent_tools.py index dc7c25b..20f2ea5 100644 --- a/src/agent_tools.py +++ b/src/agent_tools.py @@ -62,6 +62,11 @@ class ToolExecutionContext: team_runtime: 'TeamRuntime | None' = None workflow_runtime: 'WorkflowRuntime | None' = None worktree_runtime: 'WorktreeRuntime | None' = None + # Per-session counters and tagging for anti-loop / anti-self-confirm + # heuristics. Mutable contents inside the frozen dataclass — fine, + # only the field references are frozen, not what they point to. + edit_history: dict[str, int] = field(default_factory=dict) + self_authored_paths: set[str] = field(default_factory=set) ToolHandler = Callable[ @@ -1360,15 +1365,148 @@ def _list_dir(arguments: dict[str, Any], context: ToolExecutionContext) -> str: return '\n'.join(lines) if lines else '(empty directory)' +import re as _re + +# Paths whose names strongly indicate secret-bearing content. +# Reading these via the auto-Read path is refused — refusing to +# ingest is the structural fix. Bash can still read them with +# explicit intent if the user really wants to. +_SECRET_BEARING_PATH_PATTERNS = ( + _re.compile(r'(^|/)\.env(\.[^/]*)?$'), # .env, .env.local, ... + _re.compile(r'\.pem$'), + _re.compile(r'\.key$'), + _re.compile(r'(^|/)id_(rsa|ed25519|ecdsa|dsa)(\.pub)?$'), + _re.compile(r'(^|/)credentials(\.json|\.yaml|\.yml)?$', _re.IGNORECASE), + _re.compile(r'(^|/)secrets?(\.json|\.yaml|\.yml|\.toml)?$', _re.IGNORECASE), + _re.compile(r'(^|/)\.aws/credentials$'), + _re.compile(r'(^|/)\.netrc$'), +) + + +def _is_secret_bearing_path(path: Path) -> bool: + """True if path's name/segments match a known secret-bearing convention.""" + text = str(path) + return any(p.search(text) for p in _SECRET_BEARING_PATH_PATTERNS) + + +def _refuse_if_secret_bearing(target: Path) -> None: + """Refuse content-returning tool calls on paths that match known + secret-bearing conventions. Bash retains the ability to read these + paths with explicit user intent. + """ + if _is_secret_bearing_path(target): + raise ToolExecutionError( + f'refused to read secret-bearing path: {target}. ' + 'Reading this via the model-driven tool path would poison ' + 'message history. Use bash with explicit intent if this ' + 'content is genuinely needed.' + ) + + +# Edit-loop threshold: after this many writes/edits to the same path within +# one tool-context lifetime, refuse with a hard error. +_EDIT_LOOP_LIMIT = 5 + +# Markdown-churn threshold: refuse further summary-shape .md writes after +# this many in one context. Catches the pattern where each iteration +# emitted a new "CRITICAL_FINDING_*.md", "SESSION_SUMMARY_*.md", +# "QUICK_REFERENCE.md", "gaps_filled_*.md" etc. as fake progress markers. +_MD_CHURN_LIMIT = 4 + +# Filename signatures that look like self-emitted summary docs. +_CHURN_FILENAME_PATTERNS = re.compile( + r'(?ix)(' + r'summary|finding|critical|session_|gaps?[-_]filled|quick[-_]reference' + r'|deliverables?|status[-_]report|progress[-_]report|wrap[-_]up' + r'|completion[-_]report|implementation[-_]summary|final[-_]summary' + r')' +) +# Conventional doc filenames that should NEVER be flagged as churn. +_DOC_PATH_ALLOWLIST = re.compile(r'(?i)(^|/)(readme|changelog|license|contributing)\b') + + +def _is_churn_markdown(target: Path) -> bool: + """True if the path looks like a summary/findings markdown likely + emitted as fake-progress churn rather than real documentation. + """ + name = target.name + if not name.lower().endswith('.md'): + return False + if _DOC_PATH_ALLOWLIST.search(str(target)): + return False + # Files in docs/ are legitimate docs by convention. + if 'docs' in {p.lower() for p in target.parts}: + return False + return bool(_CHURN_FILENAME_PATTERNS.search(name)) + + +def _track_churn_and_check(target: Path, context: ToolExecutionContext) -> None: + """Refuse if too many summary-shape markdown files written in this + context. Counter is reused from edit_history under a sentinel key so + no new dataclass field is needed. + """ + if not _is_churn_markdown(target): + return + counter_key = '__churn_md_count__' + context.edit_history[counter_key] = context.edit_history.get(counter_key, 0) + 1 + if context.edit_history[counter_key] > _MD_CHURN_LIMIT: + raise ToolExecutionError( + f'churn-doc guard: refused to write {target.name} — this is ' + f'the {context.edit_history[counter_key]}-th summary/findings ' + f'markdown in this session (limit {_MD_CHURN_LIMIT}). ' + 'Writing more summaries is not progress; it is performance. ' + 'If the work is genuinely done, one summary suffices. If it ' + 'is not done, write code or tests, not another summary.' + ) + + +def _track_write_and_check_loop(target: Path, context: ToolExecutionContext) -> None: + """Increment edit count for `target` and refuse if loop limit exceeded. + + Refusal is permanent for the rest of this context's lifetime — there + is no way for the model to talk past it. The fix for hitting this is + to step back and explain what fundamental change is expected, not to + retry with a different threshold. + """ + key = str(target.resolve()) + context.edit_history[key] = context.edit_history.get(key, 0) + 1 + context.self_authored_paths.add(key) + if context.edit_history[key] > _EDIT_LOOP_LIMIT: + raise ToolExecutionError( + f'edit-loop guard: refused to write/edit {target} a ' + f'{context.edit_history[key]}-th time in this session ' + f'(limit {_EDIT_LOOP_LIMIT}). This pattern indicates a ' + 'tweak-and-rerun loop, not progress. Stop and explain to ' + 'the user what fundamental change you expect to be ' + 'different — or what hypothesis you are testing — before ' + 'editing this file again.' + ) + + +def _self_authored_warning(target: Path, context: ToolExecutionContext) -> str: + """If the agent wrote this file earlier in the session, return a + warning header to prepend to the read content. Empty string otherwise. + """ + if str(target.resolve()) in context.self_authored_paths: + return ( + '[self-authored: this file was written or edited by the ' + 'agent earlier in this session. Results from it are not ' + 'independent measurements. Treat with skepticism.]\n' + ) + return '' + + def _read_file(arguments: dict[str, Any], context: ToolExecutionContext) -> str: target = _resolve_path(_require_string(arguments, 'path'), context, allow_missing=False) + _refuse_if_secret_bearing(target) if not target.is_file(): raise ToolExecutionError(f'Path is not a file: {target}') text = target.read_text(encoding='utf-8', errors='replace') + self_warning = _self_authored_warning(target, context) start_line = arguments.get('start_line') end_line = arguments.get('end_line') if start_line is None and end_line is None: - return _truncate_output(text, context.max_output_chars) + return _truncate_output(self_warning + text, context.max_output_chars) if start_line is not None and (isinstance(start_line, bool) or not isinstance(start_line, int) or start_line < 1): raise ToolExecutionError('start_line must be an integer >= 1') if end_line is not None and (isinstance(end_line, bool) or not isinstance(end_line, int) or end_line < 1): @@ -1378,7 +1516,7 @@ def _read_file(arguments: dict[str, Any], context: ToolExecutionContext) -> str: end_idx = end_line or len(lines) selected = lines[start_idx:end_idx] rendered = '\n'.join(f'{start_idx + idx + 1}: {line}' for idx, line in enumerate(selected)) - return _truncate_output(rendered, context.max_output_chars) + return _truncate_output(self_warning + rendered, context.max_output_chars) def _write_file(arguments: dict[str, Any], context: ToolExecutionContext) -> str: @@ -1387,6 +1525,8 @@ def _write_file(arguments: dict[str, Any], context: ToolExecutionContext) -> str content = arguments.get('content') if not isinstance(content, str): raise ToolExecutionError('content must be a string') + _track_churn_and_check(target, context) + _track_write_and_check_loop(target, context) previous_text: str | None = None previous_sha256: str | None = None if target.exists() and target.is_file(): @@ -1420,6 +1560,8 @@ def _write_file(arguments: dict[str, Any], context: ToolExecutionContext) -> str def _edit_file(arguments: dict[str, Any], context: ToolExecutionContext) -> str: _ensure_write_allowed(context) target = _resolve_path(_require_string(arguments, 'path'), context, allow_missing=False) + _refuse_if_secret_bearing(target) + _track_write_and_check_loop(target, context) if not target.is_file(): raise ToolExecutionError(f'Path is not a file: {target}') old_text = arguments.get('old_text') @@ -1572,6 +1714,11 @@ def _grep_search(arguments: dict[str, Any], context: ToolExecutionContext) -> st root = _resolve_path(raw_path, context) if not root.exists(): raise ToolExecutionError(f'Path not found: {raw_path}') + # If the user explicitly grep'd a secret-bearing file, refuse loudly. + # When iterating a directory, secret-bearing entries are skipped + # silently below — they weren't named, so silent skip is honest. + if root.is_file(): + _refuse_if_secret_bearing(root) try: regex = re.compile(re.escape(pattern) if literal else pattern) except re.error as exc: @@ -1581,6 +1728,8 @@ def _grep_search(arguments: dict[str, Any], context: ToolExecutionContext) -> st for file_path in file_iter: if not file_path.is_file(): continue + if _is_secret_bearing_path(file_path): + continue try: text = file_path.read_text(encoding='utf-8', errors='replace') except OSError: @@ -1594,6 +1743,35 @@ def _grep_search(arguments: dict[str, Any], context: ToolExecutionContext) -> st return '\n'.join(hits) if hits else '(no matches)' +def _bash_self_authored_banner(command: str, context: ToolExecutionContext) -> str: + """Banner prepended to bash output when the command references a path + the agent wrote earlier in this session. + + Catches the case where the agent runs a script it just wrote and cites + the output as evidence — e.g. `python3 retrieval_accuracy_test.py` + where the test was authored five turns ago. Without this banner, the + output looks indistinguishable from running an external program. + """ + if not context.self_authored_paths: + return '' + referenced: list[str] = [] + for path in context.self_authored_paths: + # Match the absolute path or its basename in the command. Loose + # match — false positives are mild (extra warning) but false + # negatives miss the whole point. + basename = Path(path).name + if path in command or (basename and basename in command): + referenced.append(basename) + if not referenced: + return '' + names = ', '.join(sorted(set(referenced))) + return ( + f'[self-authored: this command runs files written by the agent ' + f'earlier in this session ({names}). Output is not an ' + f'independent measurement. Treat with skepticism.]\n' + ) + + def _run_bash(arguments: dict[str, Any], context: ToolExecutionContext) -> str: command = _require_string(arguments, 'command') _ensure_shell_allowed(command, context) @@ -1609,6 +1787,7 @@ def _run_bash(arguments: dict[str, Any], context: ToolExecutionContext) -> str: ) stdout = completed.stdout or '' stderr = completed.stderr or '' + banner = _bash_self_authored_banner(command, context) payload = [ f'exit_code={completed.returncode}', '[stdout]', @@ -1616,8 +1795,11 @@ def _run_bash(arguments: dict[str, Any], context: ToolExecutionContext) -> str: '[stderr]', stderr.rstrip(), ] + rendered = '\n'.join(payload).strip() + if banner: + rendered = banner + rendered return ( - _truncate_output('\n'.join(payload).strip(), context.max_output_chars), + _truncate_output(rendered, context.max_output_chars), { 'action': 'bash', 'command': command, @@ -1629,6 +1811,41 @@ def _run_bash(arguments: dict[str, Any], context: ToolExecutionContext) -> str: ) +_BINARY_CONTENT_TYPES = { + 'application/pdf', + 'application/octet-stream', + 'application/zip', + 'application/x-tar', + 'application/x-gzip', + 'application/gzip', + 'application/x-bzip2', + 'application/wasm', + 'application/x-protobuf', +} + + +def _looks_binary(raw: bytes, content_type: str | None) -> bool: + """True if the response is unlikely to be readable text. + + Either the Content-Type announces binary, or the byte stream contains + NUL bytes / a high ratio of non-printable bytes. Catches PDFs even when + they are served with a generic Content-Type header. + """ + if content_type: + ct = content_type.split(';', 1)[0].strip().lower() + if ct in _BINARY_CONTENT_TYPES or ct.startswith(('image/', 'video/', 'audio/')): + return True + if not raw: + return False + if raw[:5] == b'%PDF-': + return True + sample = raw[:4096] + if b'\x00' in sample: + return True + nonprintable = sum(1 for b in sample if b < 9 or 13 < b < 32 or b == 127) + return (nonprintable / len(sample)) > 0.10 + + def _web_fetch(arguments: dict[str, Any], context: ToolExecutionContext) -> str: raw_url = _require_string(arguments, 'url') max_chars = _coerce_int(arguments, 'max_chars', context.max_output_chars) @@ -1643,9 +1860,22 @@ def _web_fetch(arguments: dict[str, Any], context: ToolExecutionContext) -> str: with urllib.request.urlopen(request, timeout=context.command_timeout_seconds) as response: raw_bytes = response.read(max_chars + 1) content_type = response.headers.get_content_type() if hasattr(response, 'headers') else None - text = raw_bytes.decode('utf-8', errors='replace') except (urllib.error.URLError, OSError) as exc: raise ToolExecutionError(f'Failed to fetch {raw_url}: {exc}') from exc + # Binary payloads (PDFs, archives, images) decode to UTF-8 garbage with + # `errors='replace'`. That garbage looks plausibly like prose to the + # model, which then hallucinates content from it. Refuse loudly instead + # of returning faux-text — this is the path that produced fabricated + # paper titles / authors when the model was given an arxiv PDF URL. + if _looks_binary(raw_bytes, content_type): + raise ToolExecutionError( + f'Fetched {raw_url} returned non-text content ' + f'(content_type={content_type!r}, {len(raw_bytes)} bytes). ' + 'web_fetch only extracts UTF-8 text. For PDFs use bash with ' + 'pdftotext; for images use read_file. Do NOT proceed as if ' + 'the content were readable — it is not.' + ) + text = raw_bytes.decode('utf-8', errors='replace') truncated = len(text) > max_chars rendered = text[:max_chars] if truncated: diff --git a/tests/test_agent_tools_secret_path_guard.py b/tests/test_agent_tools_secret_path_guard.py new file mode 100644 index 0000000..88bdea7 --- /dev/null +++ b/tests/test_agent_tools_secret_path_guard.py @@ -0,0 +1,115 @@ +"""Production-tool secret-bearing path guard. + +The runtime tools in `agent_tools.py` (`_read_file`, `_edit_file`, +`_grep_search`) are the ones the model actually invokes via the tool +registry. This pins those production paths. +""" +from __future__ import annotations + +import tempfile +import unittest +from pathlib import Path + +from src.agent_tools import ( + ToolExecutionError, + _edit_file, + _grep_search, + _read_file, + build_tool_context, + default_tool_registry, +) +from src.agent_types import AgentPermissions, AgentRuntimeConfig + + +def _ctx(tmp: str, *, allow_write: bool = False): + config = AgentRuntimeConfig( + cwd=Path(tmp), + permissions=AgentPermissions( + allow_shell_commands=False, + allow_destructive_shell_commands=False, + allow_file_write=allow_write, + ), + ) + return build_tool_context(config, tool_registry=default_tool_registry()) + + +class TestReadFileGuard(unittest.TestCase): + def test_read_file_refuses_dotenv(self): + with tempfile.TemporaryDirectory() as tmp: + (Path(tmp) / '.env').write_text('SECRET=abc\n') + ctx = _ctx(tmp) + with self.assertRaises(ToolExecutionError) as cm: + _read_file({'path': '.env'}, ctx) + self.assertIn('refused to read secret-bearing path', str(cm.exception)) + + def test_read_file_refuses_pem(self): + with tempfile.TemporaryDirectory() as tmp: + (Path(tmp) / 'key.pem').write_text('-----BEGIN PRIVATE KEY-----\nx\n') + ctx = _ctx(tmp) + with self.assertRaises(ToolExecutionError): + _read_file({'path': 'key.pem'}, ctx) + + def test_read_file_allows_normal_text(self): + with tempfile.TemporaryDirectory() as tmp: + (Path(tmp) / 'README.md').write_text('hi') + ctx = _ctx(tmp) + self.assertIn('hi', _read_file({'path': 'README.md'}, ctx)) + + +class TestEditFileGuard(unittest.TestCase): + def test_edit_file_refuses_dotenv(self): + with tempfile.TemporaryDirectory() as tmp: + (Path(tmp) / '.env').write_text('SECRET=abc') + ctx = _ctx(tmp, allow_write=True) + with self.assertRaises(ToolExecutionError) as cm: + _edit_file( + {'path': '.env', 'old_text': 'abc', 'new_text': 'def'}, + ctx, + ) + self.assertIn('refused to read secret-bearing path', str(cm.exception)) + + +class TestSymlinkResolution(unittest.TestCase): + """If a non-secret-named symlink points at a secret-bearing target, + the guard must catch it. The check resolves to the real path before + matching against the pattern set. + """ + + def test_symlink_to_dotenv_refused(self): + with tempfile.TemporaryDirectory() as tmp: + real = Path(tmp) / '.env' + real.write_text('SECRET=abc\n') + link = Path(tmp) / 'config.txt' + link.symlink_to(real) + ctx = _ctx(tmp) + # The guard's pattern set matches names ending in .env. After + # `_resolve_path` resolves the symlink, the target's name is .env + # and the guard fires. + with self.assertRaises(ToolExecutionError) as cm: + _read_file({'path': 'config.txt'}, ctx) + self.assertIn('refused to read secret-bearing path', str(cm.exception)) + + +class TestGrepSearchGuard(unittest.TestCase): + def test_grep_explicit_dotenv_path_refused(self): + with tempfile.TemporaryDirectory() as tmp: + (Path(tmp) / '.env').write_text('SECRET=abc123\n') + ctx = _ctx(tmp) + with self.assertRaises(ToolExecutionError): + _grep_search({'pattern': 'SECRET', 'path': '.env'}, ctx) + + def test_grep_directory_silently_skips_dotenv(self): + """Greping a directory should not leak .env contents but should not + fail loudly — silent skip preserves the user's directory-grep intent. + """ + with tempfile.TemporaryDirectory() as tmp: + (Path(tmp) / '.env').write_text('SECRET=hunter2\n') + (Path(tmp) / 'README.md').write_text('SECRET feature here\n') + ctx = _ctx(tmp) + out = _grep_search({'pattern': 'SECRET', 'path': '.'}, ctx) + assert 'hunter2' not in out + assert 'feature here' in out + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_churn_and_bash_banner.py b/tests/test_churn_and_bash_banner.py new file mode 100644 index 0000000..c342b74 --- /dev/null +++ b/tests/test_churn_and_bash_banner.py @@ -0,0 +1,136 @@ +"""Markdown-churn guard + bash self-authored banner. + +Two more reasoning-pattern guards: + +1. Churn-doc: model writes successive summary/findings/critical-finding + markdown files as fake progress markers. Each is a different path so + the per-path edit-loop limit doesn't catch it. Cumulative count does. + +2. Bash banner: when the agent runs a script it wrote earlier in the + session (e.g., `python3 my_test.py`), the captured output is prefixed + with a [self-authored: ...] banner. Mirrors the read-time warning + for execution-time results. +""" +from __future__ import annotations + +import tempfile +import unittest +from pathlib import Path + +from src.agent_tools import ( + ToolExecutionError, + _is_churn_markdown, + _run_bash, + _write_file, + build_tool_context, + default_tool_registry, +) +from src.agent_types import AgentPermissions, AgentRuntimeConfig + + +def _ctx(tmp: str, *, allow_shell: bool = False): + return build_tool_context( + AgentRuntimeConfig( + cwd=Path(tmp), + permissions=AgentPermissions( + allow_file_write=True, + allow_shell_commands=allow_shell, + allow_destructive_shell_commands=False, + ), + ), + tool_registry=default_tool_registry(), + ) + + +class TestChurnPattern(unittest.TestCase): + def test_recognises_summary_finding_critical_session_filenames(self): + for name in [ + 'CRITICAL_FINDING_20260504.md', + 'SESSION_SUMMARY_20260504.md', + 'gaps_filled_20260504.md', + 'QUICK_REFERENCE.md', + 'FINAL_SUMMARY.md', + 'completion_report.md', + 'implementation_summary.md', + 'wrap-up.md', + ]: + assert _is_churn_markdown(Path('/tmp') / name), name + + def test_excludes_legitimate_doc_files(self): + for name in ['README.md', 'CHANGELOG.md', 'LICENSE.md', 'CONTRIBUTING.md']: + assert not _is_churn_markdown(Path('/tmp') / name), name + + def test_excludes_files_in_docs_directory(self): + # Even if the filename matches "summary", being in docs/ makes it + # legitimate documentation. + assert not _is_churn_markdown(Path('/proj/docs/architecture_summary.md')) + + def test_non_md_files_not_churn(self): + assert not _is_churn_markdown(Path('/tmp/SESSION_SUMMARY.txt')) + + +class TestChurnGuard(unittest.TestCase): + def test_excessive_summary_writes_eventually_refused(self): + with tempfile.TemporaryDirectory() as tmp: + ctx = _ctx(tmp) + for i in range(4): + _write_file({ + 'path': f'CRITICAL_FINDING_{i}.md', + 'content': '# something\n', + }, ctx) + # The 5th churn-shaped write triggers the guard. + with self.assertRaises(ToolExecutionError) as cm: + _write_file({ + 'path': 'CRITICAL_FINDING_5.md', + 'content': '# something\n', + }, ctx) + msg = str(cm.exception) + self.assertIn('churn-doc guard', msg) + self.assertIn('not progress', msg) + + def test_legitimate_doc_writes_do_not_count(self): + with tempfile.TemporaryDirectory() as tmp: + ctx = _ctx(tmp) + for name in ['README.md', 'CHANGELOG.md', 'LICENSE.md', 'CONTRIBUTING.md']: + _write_file({'path': name, 'content': '# x\n'}, ctx) + # All four legitimate docs written. Now write a churn doc; + # should still be the FIRST churn write, not the 5th. + _write_file({'path': 'SESSION_SUMMARY.md', 'content': '# x\n'}, ctx) + self.assertEqual(ctx.edit_history.get('__churn_md_count__'), 1) + + +class TestBashSelfAuthoredBanner(unittest.TestCase): + def test_running_self_authored_script_gets_banner(self): + with tempfile.TemporaryDirectory() as tmp: + ctx = _ctx(tmp, allow_shell=True) + _write_file({ + 'path': 'my_test.py', + 'content': 'print("hi")\n', + }, ctx) + output = _run_bash({'command': 'python3 my_test.py'}, ctx) + text = output[0] if isinstance(output, tuple) else output + self.assertIn('[self-authored:', text) + self.assertIn('my_test.py', text) + self.assertIn('not an independent measurement', text) + self.assertIn('hi', text) # actual output preserved + + def test_running_external_command_no_banner(self): + with tempfile.TemporaryDirectory() as tmp: + ctx = _ctx(tmp, allow_shell=True) + output = _run_bash({'command': 'echo hello'}, ctx) + text = output[0] if isinstance(output, tuple) else output + self.assertNotIn('[self-authored:', text) + self.assertIn('hello', text) + + def test_unrelated_command_after_self_write_no_banner(self): + with tempfile.TemporaryDirectory() as tmp: + ctx = _ctx(tmp, allow_shell=True) + _write_file({'path': 'measurement.py', 'content': 'x = 1\n'}, ctx) + # Run a command that does NOT reference the self-authored file. + output = _run_bash({'command': 'echo done'}, ctx) + text = output[0] if isinstance(output, tuple) else output + self.assertNotIn('[self-authored:', text) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_edit_loop_and_self_authored.py b/tests/test_edit_loop_and_self_authored.py new file mode 100644 index 0000000..4260869 --- /dev/null +++ b/tests/test_edit_loop_and_self_authored.py @@ -0,0 +1,122 @@ +"""Edit-loop guard + self-authored artifact tagging. + +Two reasoning-pattern guards extracted from the Latti S127 transcript: + +1. Edit-loop: model edited the same test file 8-10 times tweaking + thresholds back and forth. Pure churn. Guard refuses after N writes + to the same path within one tool-context lifetime. + +2. Self-authored: model wrote a "retrieval accuracy test" then ran it + then cited the result as if it were an objective measurement. The + tagging prepends a skepticism header to any read of a file the + agent wrote earlier in the same session. +""" +from __future__ import annotations + +import tempfile +import unittest +from pathlib import Path + +from src.agent_tools import ( + ToolExecutionError, + _edit_file, + _read_file, + _write_file, + build_tool_context, + default_tool_registry, +) +from src.agent_types import AgentPermissions, AgentRuntimeConfig + + +def _ctx(tmp: str): + return build_tool_context( + AgentRuntimeConfig( + cwd=Path(tmp), + permissions=AgentPermissions( + allow_file_write=True, + allow_shell_commands=False, + allow_destructive_shell_commands=False, + ), + ), + tool_registry=default_tool_registry(), + ) + + +class TestEditLoopGuard(unittest.TestCase): + def test_repeated_writes_to_same_file_eventually_refused(self): + with tempfile.TemporaryDirectory() as tmp: + ctx = _ctx(tmp) + for i in range(5): + _write_file({'path': 'foo.py', 'content': f'# rev {i}\n'}, ctx) + with self.assertRaises(ToolExecutionError) as cm: + _write_file({'path': 'foo.py', 'content': '# rev 6\n'}, ctx) + msg = str(cm.exception) + self.assertIn('edit-loop guard', msg) + self.assertIn('tweak-and-rerun', msg) + + def test_writes_to_different_files_do_not_count_against_each_other(self): + with tempfile.TemporaryDirectory() as tmp: + ctx = _ctx(tmp) + for i in range(5): + _write_file({'path': f'a_{i}.py', 'content': '# x\n'}, ctx) + # 6th write to a NEW file should still succeed. + _write_file({'path': 'a_6.py', 'content': '# x\n'}, ctx) + self.assertEqual(ctx.edit_history.get( + str((Path(tmp) / 'a_6.py').resolve()) + ), 1) + + def test_edit_after_loop_limit_also_refused(self): + """Mixing _edit_file and _write_file on the same path counts together.""" + with tempfile.TemporaryDirectory() as tmp: + ctx = _ctx(tmp) + _write_file({'path': 'foo.py', 'content': 'hello there\n'}, ctx) + for _ in range(4): + _edit_file( + {'path': 'foo.py', 'old_text': 'hello', 'new_text': 'hello'}, + ctx, + ) + # That's 1 write + 4 edits = 5. Sixth touch refused. + with self.assertRaises(ToolExecutionError) as cm: + _edit_file( + {'path': 'foo.py', 'old_text': 'hello', 'new_text': 'hi'}, + ctx, + ) + self.assertIn('edit-loop guard', str(cm.exception)) + + +class TestSelfAuthoredTagging(unittest.TestCase): + def test_read_after_self_write_includes_warning_header(self): + with tempfile.TemporaryDirectory() as tmp: + ctx = _ctx(tmp) + _write_file( + {'path': 'measurement.py', 'content': 'def m(): return 0.42\n'}, + ctx, + ) + content = _read_file({'path': 'measurement.py'}, ctx) + self.assertIn('[self-authored:', content) + self.assertIn('not independent measurements', content) + self.assertIn('return 0.42', content) + + def test_read_of_external_file_no_warning(self): + with tempfile.TemporaryDirectory() as tmp: + # File created externally (e.g., by user, or pre-existed) + (Path(tmp) / 'external.md').write_text('user-written content\n') + ctx = _ctx(tmp) + content = _read_file({'path': 'external.md'}, ctx) + self.assertNotIn('[self-authored:', content) + self.assertIn('user-written content', content) + + def test_self_authored_warning_with_line_range(self): + """Warning header must also appear when read uses start_line/end_line.""" + with tempfile.TemporaryDirectory() as tmp: + ctx = _ctx(tmp) + _write_file( + {'path': 'm.py', 'content': 'a\nb\nc\nd\ne\n'}, + ctx, + ) + content = _read_file({'path': 'm.py', 'start_line': 2, 'end_line': 4}, ctx) + self.assertIn('[self-authored:', content) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_secret_redaction_on_tool_ingestion.py b/tests/test_secret_redaction_on_tool_ingestion.py new file mode 100644 index 0000000..29d4328 --- /dev/null +++ b/tests/test_secret_redaction_on_tool_ingestion.py @@ -0,0 +1,133 @@ +"""Tool-result secrets are redacted at ingestion, before message history. + +Without redaction, a `Read` of an .env file would put a live API key into +`session.messages`. Every subsequent `llm_call` action carries the full +message history in `payload['messages']`, so secrets would leak throughout +the conversation — potentially wedging the session. + +These tests pin the contract: + 1. Single-shot append: secret in tool content never reaches stored content. + 2. Streamed append: secret straddling chunk boundaries is still redacted. + 3. Final replace: secret in finalize_tool content never reaches stored content. + 4. update_message redacts tool-role messages but not assistant-role. +""" +from __future__ import annotations + +from src.agent_session import AgentSessionState, redact_secrets + +# A token shaped like a real Anthropic key — matches `_SECRET_PATTERNS` +# but is obviously synthetic so a leak in CI logs is harmless. +# Constructed via `+` so the literal token shape never appears in source — +# avoids tripping GitHub push-protection / secret-scanning. The runtime +# value still matches the redactor's regex (which is the point of the test). +FAKE_SK_ANT = 'sk-' + 'ant-' + ('A' * 8) + ('b' * 8) + ('C' * 8) + ('d' * 8) + + +def test_redact_secrets_replaces_known_token_shapes(): + fake_ghp = 'ghp_' + 'abcdefghijklmnopqrstuvwxyz' + text = f'ANTHROPIC_API_KEY={FAKE_SK_ANT}\nGITHUB={fake_ghp}' + out = redact_secrets(text) + assert FAKE_SK_ANT not in out + assert fake_ghp not in out + assert '[REDACTED:' in out + + +def test_redact_secrets_passthrough_on_clean_text(): + text = 'no secrets here, just prose and a path /etc/hostname' + assert redact_secrets(text) == text + + +def test_append_tool_redacts_before_storage(): + session = AgentSessionState.create(system_prompt_parts=['sys'], user_prompt=None) + session.append_tool( + name='Read', + tool_call_id='call_1', + content=f'cat /home/user/dotenv\n{FAKE_SK_ANT}\n', + ) + stored = session.messages[-1].content + assert FAKE_SK_ANT not in stored + assert '[REDACTED:ant]' in stored + + +def test_finalize_tool_redacts_before_storage(): + session = AgentSessionState.create(system_prompt_parts=['sys'], user_prompt=None) + idx = session.start_tool(name='Read', tool_call_id='call_2') + session.finalize_tool( + idx, + content=f'env contents:\n{FAKE_SK_ANT}', + ) + stored = session.messages[-1].content + assert FAKE_SK_ANT not in stored + assert '[REDACTED:ant]' in stored + + +def test_streamed_delta_redacts_secret_straddling_chunk_boundary(): + session = AgentSessionState.create(system_prompt_parts=['sys'], user_prompt=None) + idx = session.start_tool(name='Read', tool_call_id='call_3') + # Split the fake token across two deltas. Per-delta redaction would miss + # this; reassembled-content redaction catches it. + half = len(FAKE_SK_ANT) // 2 + session.append_tool_delta(idx, FAKE_SK_ANT[:half]) + session.append_tool_delta(idx, FAKE_SK_ANT[half:]) + stored = session.messages[idx].content + assert FAKE_SK_ANT not in stored + assert '[REDACTED:ant]' in stored + + +def test_update_message_redacts_when_role_is_tool(): + """`update_message` is the post-hoc mutation path. If a caller routes + tool output through it (e.g., to swap content after the fact), the + secret must be redacted there too. + """ + session = AgentSessionState.create(system_prompt_parts=['sys'], user_prompt=None) + idx = session.start_tool(name='Read', tool_call_id='call_um') + session.update_message(idx, content=f'API_KEY={FAKE_SK_ANT}') + stored = session.messages[idx].content + assert FAKE_SK_ANT not in stored + assert '[REDACTED:ant]' in stored + + +def test_update_message_does_not_redact_assistant_content(): + """Redaction is scoped to tool-role messages. Assistant content is + bounded by other walls (the model's own output). Don't widen scope + silently — pin the boundary. + """ + session = AgentSessionState.create(system_prompt_parts=['sys'], user_prompt=None) + idx = session.start_assistant() + session.update_message(idx, content=f'analyzing... {FAKE_SK_ANT}') + assert FAKE_SK_ANT in session.messages[idx].content + + +def test_redact_stripe_underscore_token(): + fake_stripe = 'sk' + '_live_' + 'abcdefghijklmnopqrstuvwx' + out = redact_secrets(f'STRIPE={fake_stripe}') + assert fake_stripe not in out + assert '[REDACTED:stripe]' in out + + +def test_redact_google_api_key(): + # Real Google API keys are 39 chars: `AIza` + 35 from [A-Za-z0-9_-]. + fake = 'AIza' + 'SyA1B2C3D4E5F6G7H8I9J0KaLbMcNdOePfQ' + assert len(fake) == 39 + out = redact_secrets(f'GOOGLE_API_KEY={fake}') + assert fake not in out + assert '[REDACTED:google]' in out + + +def test_redact_jwt_triple_segment(): + jwt = ( + 'eyJ' + 'hbGciOiJIUzI1NiJ9' + + '.' + 'eyJ' + 'zdWIiOiIxMjM0NSIsIm5hbWUiOiJqIn0' + + '.' + 'SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c' + ) + out = redact_secrets(f'token={jwt}') + assert jwt not in out + assert '[REDACTED:jwt]' in out + + +def test_jwt_pattern_does_not_false_positive_on_bare_eyJ(): + """`eyJ` alone is just base64 of `{"` and appears in unrelated content. + The pattern requires three dot-separated segments; bare `eyJ` is fine. + """ + out = redact_secrets('debug: parsing started with eyJ marker (not a token)') + assert out == 'debug: parsing started with eyJ marker (not a token)' diff --git a/tests/test_web_fetch_binary_refusal.py b/tests/test_web_fetch_binary_refusal.py new file mode 100644 index 0000000..2eb17bb --- /dev/null +++ b/tests/test_web_fetch_binary_refusal.py @@ -0,0 +1,127 @@ +"""web_fetch refuses binary payloads instead of returning UTF-8 garbage. + +The original code decoded all responses with `errors='replace'`, which +turns a PDF into Unicode replacement-char soup. The model reads that +garbage and fills the gaps with hallucinated content. Pin the refusal so +this regression is observable. +""" +from __future__ import annotations + +import io +import unittest +from pathlib import Path +from unittest.mock import patch + +from src.agent_tools import ( + ToolExecutionError, + _looks_binary, + _web_fetch, + build_tool_context, + default_tool_registry, +) +from src.agent_types import AgentPermissions, AgentRuntimeConfig + + +class _FakeResponse: + def __init__(self, body: bytes, content_type: str): + self._body = body + self.headers = _FakeHeaders(content_type) + + def read(self, n: int) -> bytes: + return self._body[:n] + + def __enter__(self): + return self + + def __exit__(self, *a): + return None + + +class _FakeHeaders: + def __init__(self, ct: str): + self._ct = ct + + def get_content_type(self) -> str: + return self._ct + + +def _ctx(tmp: str): + return build_tool_context( + AgentRuntimeConfig( + cwd=Path(tmp), + permissions=AgentPermissions( + allow_shell_commands=False, allow_destructive_shell_commands=False, + ), + ), + tool_registry=default_tool_registry(), + ) + + +class TestLooksBinary(unittest.TestCase): + def test_pdf_magic_bytes_detected(self): + assert _looks_binary(b'%PDF-1.4\nblah\nblah', 'application/octet-stream') + assert _looks_binary(b'%PDF-1.4\nblah', None) + + def test_application_pdf_content_type_detected(self): + # Even without magic bytes (rare but possible). + assert _looks_binary(b'whatever', 'application/pdf') + + def test_image_content_type_detected(self): + assert _looks_binary(b'fakejpg', 'image/jpeg') + + def test_charset_suffix_does_not_break_detection(self): + assert _looks_binary(b'%PDF-stuff', 'application/pdf; charset=binary') + + def test_nul_byte_detected(self): + assert _looks_binary(b'hello\x00world\x00\x00', 'text/plain') + + def test_clean_text_passes(self): + assert not _looks_binary(b'hello world\nthis is utf-8 prose.', 'text/html') + + def test_high_nonprintable_ratio_detected(self): + # >10% bytes outside printable ASCII range + body = bytes(range(0, 200)) # ~50% non-printable + assert _looks_binary(body, 'text/plain') + + def test_empty_body_not_binary(self): + assert not _looks_binary(b'', 'text/plain') + + +class TestWebFetchBinaryRefusal(unittest.TestCase): + def test_pdf_fetch_raises_with_named_reason(self): + import tempfile + with tempfile.TemporaryDirectory() as tmp: + ctx = _ctx(tmp) + fake_pdf = b'%PDF-1.4\n' + bytes(range(256)) * 10 + with patch( + 'urllib.request.urlopen', + return_value=_FakeResponse(fake_pdf, 'application/pdf'), + ): + with self.assertRaises(ToolExecutionError) as cm: + _web_fetch({'url': 'https://example.com/paper.pdf'}, ctx) + msg = str(cm.exception) + self.assertIn('non-text content', msg) + self.assertIn('Do NOT proceed as if', msg) + + def test_html_fetch_succeeds(self): + import tempfile + with tempfile.TemporaryDirectory() as tmp: + ctx = _ctx(tmp) + html = b'

Hello, world.

' + with patch( + 'urllib.request.urlopen', + return_value=_FakeResponse(html, 'text/html'), + ): + result = _web_fetch({'url': 'https://example.com/'}, ctx) + # _web_fetch returns a tuple (text, metadata); the runtime + # unwraps it. We just confirm it returned text containing + # the body, not a raised exception. + if isinstance(result, tuple): + text = result[0] + else: + text = result + self.assertIn('Hello, world.', text) + + +if __name__ == '__main__': + unittest.main()