From f1a9801f2c27d1d7b61ebc6dad61922fcb0ac9a6 Mon Sep 17 00:00:00 2001 From: pablof7z Date: Sat, 30 May 2026 17:20:28 +0300 Subject: [PATCH] feat(agent): track fs_write file modifications across runs (#113) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a TENEX agent writes a file via `fs_write`, the EmitHook now snapshots the written content (SHA-256 + up to 50 KB of bytes) into the conversation DB, keyed by (conversation_id, agent_pubkey, file_path) with last-write-wins upsert. Capture is gated on the `fs_write` success result so blocked and failed writes never create a bogus baseline. On a later run of the same agent in the same conversation, bootstrap re-reads each snapshot from the absolute path the writing run resolved (working dir is not stable across runs — the supervisor moves a conversation into a git worktree on first file mutation) and, when the content changed externally, appends a `` block to the system prompt. UTF-8 files produce an inline unified diff (similar crate) when <= 8 KB; binary, oversized, or large-diff cases fall back to a size/line summary. - tenex-conversations: schema v3 `agent_file_snapshots` table, `FileSnapshot`/`NewFileSnapshot` models, `record_file_snapshot` / `get_file_snapshots_for_agent` store methods, upsert integration test. - tenex-agent: `file_modifications` module (FileSnapshotWriter + reminder renderer), hook capture point, bootstrap reminder injection, similar dep. - probe: `file-modification-tracking` e2e scenario + verdicts (two runs in one conversation; second run's system prompt must carry the diff). Co-Authored-By: Claude Sonnet 4.6 --- Cargo.lock | 7 + crates/tenex-agent/Cargo.toml | 1 + .../src/agent_bootstrap/assembly.rs | 4 + crates/tenex-agent/src/agent_bootstrap/mod.rs | 17 + crates/tenex-agent/src/file_modifications.rs | 418 ++++++++++++++++++ crates/tenex-agent/src/hook.rs | 17 +- crates/tenex-agent/src/main.rs | 1 + crates/tenex-conversations/AGENTS.md | 8 +- crates/tenex-conversations/src/lib.rs | 5 +- crates/tenex-conversations/src/model.rs | 32 ++ crates/tenex-conversations/src/schema.rs | 25 +- crates/tenex-conversations/src/store.rs | 81 +++- .../tenex-conversations/tests/integration.rs | 92 +++- scripts/tenex-runtime-probe-scenarios.ts | 132 +++++- scripts/tenex-runtime-probe-verdicts.ts | 54 +++ 15 files changed, 881 insertions(+), 13 deletions(-) create mode 100644 crates/tenex-agent/src/file_modifications.rs diff --git a/Cargo.lock b/Cargo.lock index 002ad9929..874c82b34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3078,6 +3078,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" +[[package]] +name = "similar" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbbb5d9659141646ae647b42fe094daf6c6192d1620870b449d9557f748b2daa" + [[package]] name = "slab" version = "0.4.12" @@ -3277,6 +3283,7 @@ dependencies = [ "serde_json", "serde_yml", "sha2", + "similar", "tempfile", "tenex-accounting", "tenex-agent-registry", diff --git a/crates/tenex-agent/Cargo.toml b/crates/tenex-agent/Cargo.toml index 6189a3cac..55240077e 100644 --- a/crates/tenex-agent/Cargo.toml +++ b/crates/tenex-agent/Cargo.toml @@ -59,6 +59,7 @@ tracing = "0.1" tracing-opentelemetry = "0.32" sha2 = "0.10" hex = "0.4" +similar = "2" zip = { version = "2", default-features = false, features = ["deflate"] } [dev-dependencies] diff --git a/crates/tenex-agent/src/agent_bootstrap/assembly.rs b/crates/tenex-agent/src/agent_bootstrap/assembly.rs index 3fbcdac45..7ea554e5e 100644 --- a/crates/tenex-agent/src/agent_bootstrap/assembly.rs +++ b/crates/tenex-agent/src/agent_bootstrap/assembly.rs @@ -11,6 +11,7 @@ use tenex_supervision::{heuristics::default_supervisor, supervisor::Supervisor, use crate::config::AgentConfig; use crate::emit::{EmitState, EmitStateArgs}; +use crate::file_modifications::FileSnapshotWriter; use crate::hook::EmitHook; use crate::runtime_state::RuntimeStateHandle; use crate::tools::{DelegateTool, TodoItem}; @@ -147,6 +148,7 @@ pub(super) fn init_supervisor_and_hook( teams: Arc>, project_root: std::path::PathBuf, conv_db_path: std::path::PathBuf, + snapshot_writer: Option>, ) -> SupervisorComponents { let supervisor = Arc::new(Mutex::new(default_supervisor())); let supervisor_ref = supervisor.clone(); @@ -156,6 +158,7 @@ pub(super) fn init_supervisor_and_hook( todos, agent_category, runtime_state, + snapshot_writer, ); let allows_delegation = agent_category .map(|c| c.allows_delegation()) @@ -242,6 +245,7 @@ mod tests { Arc::new(Vec::new()), std::path::PathBuf::from("/tmp"), std::path::PathBuf::from("/tmp/conv.db"), + None, ) } diff --git a/crates/tenex-agent/src/agent_bootstrap/mod.rs b/crates/tenex-agent/src/agent_bootstrap/mod.rs index 1e5cf5f3a..0b6ab858a 100644 --- a/crates/tenex-agent/src/agent_bootstrap/mod.rs +++ b/crates/tenex-agent/src/agent_bootstrap/mod.rs @@ -26,6 +26,7 @@ use tenex_supervision::supervisor::Supervisor; use crate::cassette::CassetteRecorder; use crate::config::{self, ResolvedModel}; use crate::emit::EmitState; +use crate::file_modifications::{self, FileSnapshotWriter}; use crate::hook::EmitHook; use crate::injections::MessageInjectionTracker; use crate::runtime_state::RuntimeStateHandle; @@ -419,6 +420,13 @@ pub(crate) async fn build( teams.clone(), project_root.clone(), conv_db_path.clone(), + Some(Arc::new(FileSnapshotWriter::new( + conv_db_path.clone(), + conversation_id.clone(), + pubkey_hex.clone(), + execution_id.clone(), + working_dir.clone(), + ))), ); let skill_list_tool = SkillListTool::new(skill_ctx.clone()); @@ -475,6 +483,15 @@ pub(crate) async fn build( system_prompt.push_str("\n\n"); system_prompt.push_str(&active_shell_tasks); } + if let Some(file_modifications) = file_modifications::render_reminder( + &conv_db_path, + &conversation_id, + &pubkey_hex, + &working_dir, + ) { + system_prompt.push_str("\n\n"); + system_prompt.push_str(&file_modifications); + } let escalation_pubkey = escalation::resolve_escalation_pubkey(&base_dir, &project_agents) .filter(|pk| pk != &pubkey_hex); diff --git a/crates/tenex-agent/src/file_modifications.rs b/crates/tenex-agent/src/file_modifications.rs new file mode 100644 index 000000000..797d0097c --- /dev/null +++ b/crates/tenex-agent/src/file_modifications.rs @@ -0,0 +1,418 @@ +//! File-modification tracking for `fs_write`. +//! +//! When an agent writes a file via `fs_write`, [`FileSnapshotWriter::capture`] +//! snapshots the written content (up to [`MAX_SNAPSHOT_BYTES`]) into the +//! conversation DB, keyed by `(conversation_id, agent_pubkey, file_path)`. +//! +//! On a later run of the same agent in the same conversation, +//! [`render_reminder`] diffs each snapshot against the current on-disk state +//! and produces a `` block listing +//! every file that changed externally since the agent last wrote it. +//! +//! Both sides resolve `file_path` identically (env-var expansion + join against +//! `working_dir`, matching `tools::fs::resolve_path`) and hash the *bytes read +//! back from disk* with SHA-256, so capture and compare stay symmetric — a file +//! the agent wrote and nobody else touched never reports as modified. + +use std::path::{Path, PathBuf}; + +use sha2::{Digest, Sha256}; +use tenex_conversations::{ConversationStore, FileSnapshot, NewFileSnapshot}; + +/// Files larger than this are snapshotted by hash + size only (`content_bytes` +/// is `None`); they can report "modified" but cannot produce an inline diff. +const MAX_SNAPSHOT_BYTES: u64 = 50 * 1024; + +/// Maximum rendered unified-diff size to inline. Larger diffs collapse to a +/// summary. Independent of [`MAX_SNAPSHOT_BYTES`]. +const MAX_INLINE_DIFF_BYTES: usize = 8 * 1024; + +/// The prefix `fs_write` emits on success. Capture is gated on this so blocked +/// (`skip` reason) and failed (`Error writing …`) tool results never produce a +/// bogus baseline. +const FS_WRITE_SUCCESS_PREFIX: &str = "Successfully wrote"; + +/// Captures `fs_write` snapshots into the conversation DB. Holds the addressing +/// needed to resolve a written path and persist it under the right conversation +/// + agent. Stateless beyond its configuration; opens the store per capture +/// (writes are serialized upstream by RAL). +pub struct FileSnapshotWriter { + db_path: PathBuf, + conversation_id: String, + agent_pubkey: String, + execution_id: String, + working_dir: String, +} + +impl FileSnapshotWriter { + pub fn new( + db_path: PathBuf, + conversation_id: String, + agent_pubkey: String, + execution_id: String, + working_dir: String, + ) -> Self { + Self { + db_path, + conversation_id, + agent_pubkey, + execution_id, + working_dir, + } + } + + /// Snapshot the file an `fs_write` call just wrote. No-op unless `result` + /// indicates the write succeeded. `args` is the raw `fs_write` argument + /// JSON; its `path` field is resolved against `working_dir` and the file is + /// re-read from disk to hash it. + pub fn capture(&self, args: &str, result: &str) { + if !result.starts_with(FS_WRITE_SUCCESS_PREFIX) { + return; + } + let Some(rel_path) = parse_write_path(args) else { + return; + }; + let resolved = resolve_path(&self.working_dir, &rel_path); + let bytes = match std::fs::read(&resolved) { + Ok(b) => b, + Err(e) => { + tracing::warn!( + path = %resolved.display(), + error = %e, + "fs_write snapshot: could not read written file" + ); + return; + } + }; + let size_bytes = bytes.len() as i64; + let content_hash = hash_bytes(&bytes); + let content_bytes = if bytes.len() as u64 <= MAX_SNAPSHOT_BYTES { + Some(bytes) + } else { + None + }; + + let store = match ConversationStore::open(&self.db_path) { + Ok(store) => store, + Err(e) => { + tracing::warn!(error = %e, "fs_write snapshot: conversation store unavailable"); + return; + } + }; + if let Err(e) = store.record_file_snapshot( + &self.conversation_id, + &NewFileSnapshot { + agent_pubkey: self.agent_pubkey.clone(), + execution_id: self.execution_id.clone(), + file_path: rel_path, + content_hash, + content_bytes, + size_bytes, + }, + ) { + tracing::warn!(error = %e, "fs_write snapshot: failed to record"); + } + } +} + +/// Build the `` block for this +/// agent + conversation, or `None` when no tracked file changed externally. +/// +/// Opens the conversation DB at `db_path`, reads every snapshot this agent +/// wrote in this conversation, re-reads each file from disk (resolved against +/// `working_dir`), and includes only those whose content now differs from the +/// snapshot hash. +pub fn render_reminder( + db_path: &Path, + conversation_id: &str, + agent_pubkey: &str, + working_dir: &str, +) -> Option { + let store = ConversationStore::open(db_path).ok()?; + let snapshots = store + .get_file_snapshots_for_agent(conversation_id, agent_pubkey) + .ok()?; + if snapshots.is_empty() { + return None; + } + + let mut blocks = Vec::new(); + for snapshot in &snapshots { + if let Some(block) = render_file_block(snapshot, working_dir) { + blocks.push(block); + } + } + if blocks.is_empty() { + return None; + } + + let mut out = String::from( + "\nThe following files you wrote in this conversation have been modified externally since your last run:\n", + ); + for block in blocks { + out.push('\n'); + out.push_str(&block); + } + out.push_str(""); + Some(out) +} + +/// Render a single `` block for a snapshot whose on-disk +/// content differs from the recorded hash. Returns `None` when the file is +/// unchanged or can no longer be read. +fn render_file_block(snapshot: &FileSnapshot, working_dir: &str) -> Option { + let resolved = resolve_path(working_dir, &snapshot.file_path); + let current = std::fs::read(&resolved).ok()?; + if hash_bytes(¤t) == snapshot.content_hash { + return None; + } + + let path = &snapshot.file_path; + let body = match snapshot.content_bytes.as_deref() { + Some(old_bytes) => render_change(old_bytes, ¤t), + None => summarize_change(snapshot.size_bytes, current.len() as i64, None), + }; + Some(format!( + "\n{body}\n\n" + )) +} + +/// Render the change between the snapshot bytes and current bytes: a unified +/// text diff when both are UTF-8 and the diff is small, otherwise a summary. +fn render_change(old_bytes: &[u8], new_bytes: &[u8]) -> String { + let (Ok(old_text), Ok(new_text)) = ( + std::str::from_utf8(old_bytes), + std::str::from_utf8(new_bytes), + ) else { + // Binary on either side: TextDiff is text-oriented, fall back to a + // size + line-count summary (line count only when both are text). + return summarize_change(old_bytes.len() as i64, new_bytes.len() as i64, None); + }; + + let diff = similar::TextDiff::from_lines(old_text, new_text); + let rendered = diff + .unified_diff() + .context_radius(3) + .header("your version", "current") + .to_string(); + + if rendered.len() <= MAX_INLINE_DIFF_BYTES { + return rendered.trim_end().to_string(); + } + + let (mut added, mut removed) = (0i64, 0i64); + for change in diff.iter_all_changes() { + match change.tag() { + similar::ChangeTag::Insert => added += 1, + similar::ChangeTag::Delete => removed += 1, + similar::ChangeTag::Equal => {} + } + } + summarize_change( + old_bytes.len() as i64, + new_bytes.len() as i64, + Some((added, removed)), + ) +} + +/// A compact size/line-count summary used when an inline diff is unavailable +/// (binary content, snapshot stored without bytes, or oversized diff). +fn summarize_change(old_size: i64, new_size: i64, lines: Option<(i64, i64)>) -> String { + let byte_delta = new_size - old_size; + let mut summary = match lines { + Some((added, removed)) => { + format!("{added} line(s) added, {removed} line(s) removed") + } + None => "content changed".to_string(), + }; + summary.push_str(&format!( + " ({old_size} → {new_size} bytes, {byte_delta:+} bytes)" + )); + summary +} + +/// Parse the `path` field from raw `fs_write` argument JSON. Returns the path +/// exactly as the agent supplied it (relative to the working directory). +fn parse_write_path(args: &str) -> Option { + let value: serde_json::Value = serde_json::from_str(args).ok()?; + value + .get("path") + .and_then(|p| p.as_str()) + .map(str::to_string) +} + +/// SHA-256 hex of `bytes`. +fn hash_bytes(bytes: &[u8]) -> String { + let mut hasher = Sha256::new(); + hasher.update(bytes); + hex::encode(hasher.finalize()) +} + +/// Resolve a `fs_write` path against `working_dir`, mirroring +/// `tools::fs::resolve_path`: expand `$VAR` / `${VAR}`, then join relative +/// paths onto `working_dir`. Kept in lockstep with the tool so capture and +/// compare resolve to the same file. +fn resolve_path(working_dir: &str, path: &str) -> PathBuf { + let expanded = expand_env_vars(path); + let p = Path::new(&expanded); + if p.is_absolute() { + p.to_path_buf() + } else { + Path::new(working_dir).join(p) + } +} + +/// Expand `$VAR` and `${VAR}` using `std::env::var`. Unknown variables are left +/// verbatim. Mirrors `tools::fs::expand_env_vars` so resolution matches the +/// `fs_write` tool exactly. +fn expand_env_vars(input: &str) -> String { + let bytes = input.as_bytes(); + let mut out = String::with_capacity(input.len()); + let mut i = 0; + while i < bytes.len() { + if bytes[i] == b'$' && i + 1 < bytes.len() { + if bytes[i + 1] == b'{' { + if let Some(end_rel) = bytes[i + 2..].iter().position(|&b| b == b'}') { + let name = std::str::from_utf8(&bytes[i + 2..i + 2 + end_rel]).unwrap_or(""); + if !name.is_empty() { + match std::env::var(name) { + Ok(v) => out.push_str(&v), + Err(_) => out.push_str(&input[i..i + 2 + end_rel + 1]), + } + i += 2 + end_rel + 1; + continue; + } + } + } + let start = i + 1; + let mut end = start; + while end < bytes.len() && (bytes[end].is_ascii_alphanumeric() || bytes[end] == b'_') { + end += 1; + } + if end > start { + let name = std::str::from_utf8(&bytes[start..end]).unwrap_or(""); + match std::env::var(name) { + Ok(v) => out.push_str(&v), + Err(_) => out.push_str(&input[i..end]), + } + i = end; + continue; + } + } + out.push(bytes[i] as char); + i += 1; + } + out +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + fn writer(dir: &TempDir, db: &std::path::Path) -> FileSnapshotWriter { + FileSnapshotWriter::new( + db.to_path_buf(), + "conv-1".into(), + "agent-1".into(), + "exec-1".into(), + dir.path().display().to_string(), + ) + } + + fn open_db(dir: &TempDir) -> (std::path::PathBuf, ConversationStore) { + let db_path = dir.path().join("conversation.db"); + let store = ConversationStore::open(&db_path).unwrap(); + store.ensure_conversation("conv-1").unwrap(); + (db_path, store) + } + + #[test] + fn capture_skips_failed_write() { + let dir = TempDir::new().unwrap(); + let (db_path, store) = open_db(&dir); + let w = writer(&dir, &db_path); + // No file on disk; result is an error string → must not record. + w.capture( + r#"{"path":"foo.txt","content":"x"}"#, + "Error writing foo.txt: permission denied", + ); + assert!(store + .get_file_snapshots_for_agent("conv-1", "agent-1") + .unwrap() + .is_empty()); + } + + #[test] + fn capture_then_no_reminder_when_unchanged() { + let dir = TempDir::new().unwrap(); + let (db_path, _store) = open_db(&dir); + std::fs::write(dir.path().join("foo.txt"), b"original\n").unwrap(); + let w = writer(&dir, &db_path); + w.capture( + r#"{"path":"foo.txt","content":"original\n"}"#, + "Successfully wrote 9 bytes to /x/foo.txt", + ); + let reminder = render_reminder( + &db_path, + "conv-1", + "agent-1", + &dir.path().display().to_string(), + ); + assert!(reminder.is_none(), "unchanged file must not warn"); + } + + #[test] + fn external_modification_produces_diff_reminder() { + let dir = TempDir::new().unwrap(); + let (db_path, _store) = open_db(&dir); + let file = dir.path().join("foo.txt"); + std::fs::write(&file, b"original\n").unwrap(); + let w = writer(&dir, &db_path); + w.capture( + r#"{"path":"foo.txt","content":"original\n"}"#, + "Successfully wrote 9 bytes to /x/foo.txt", + ); + // External overwrite. + std::fs::write(&file, b"modified\n").unwrap(); + + let reminder = render_reminder( + &db_path, + "conv-1", + "agent-1", + &dir.path().display().to_string(), + ) + .expect("modified file must warn"); + assert!(reminder.contains("file-modifications")); + assert!(reminder.contains("foo.txt")); + assert!(reminder.contains("-original")); + assert!(reminder.contains("+modified")); + } + + #[test] + fn oversized_file_summarizes_instead_of_diff() { + let dir = TempDir::new().unwrap(); + let (db_path, _store) = open_db(&dir); + let file = dir.path().join("big.txt"); + let big = vec![b'a'; (MAX_SNAPSHOT_BYTES + 10) as usize]; + std::fs::write(&file, &big).unwrap(); + let w = writer(&dir, &db_path); + w.capture( + r#"{"path":"big.txt","content":"..."}"#, + "Successfully wrote 51210 bytes to /x/big.txt", + ); + std::fs::write(&file, b"small\n").unwrap(); + + let reminder = render_reminder( + &db_path, + "conv-1", + "agent-1", + &dir.path().display().to_string(), + ) + .expect("oversized changed file must warn"); + assert!(reminder.contains("big.txt")); + assert!(reminder.contains("bytes")); + // No inline diff for content stored without bytes. + assert!(!reminder.contains("+small")); + } +} diff --git a/crates/tenex-agent/src/hook.rs b/crates/tenex-agent/src/hook.rs index 8463040ab..7ae38adc9 100644 --- a/crates/tenex-agent/src/hook.rs +++ b/crates/tenex-agent/src/hook.rs @@ -2,6 +2,7 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use crate::emit::EmitState; +use crate::file_modifications::FileSnapshotWriter; use crate::runtime_state::RuntimeStateHandle; use crate::tools::{TodoItem, TodoStatus}; use rig_core::agent::{HookAction, ToolCallHookAction}; @@ -54,6 +55,9 @@ pub struct EmitHook { /// is emitted by main.rs (with usage from FinalResponse). pending: Arc>>, runtime_state: Option, + /// Snapshots successful `fs_write` results into the conversation DB so a + /// later run of this agent can detect external file modifications. + snapshot_writer: Option>, } impl EmitHook { @@ -63,6 +67,7 @@ impl EmitHook { todos: Arc>>, agent_category: Option, runtime_state: Option, + snapshot_writer: Option>, ) -> Self { let (delta_tx, delta_rx) = mpsc::unbounded_channel(); tokio::spawn(run_delta_buffer(state.clone(), delta_rx)); @@ -75,6 +80,7 @@ impl EmitHook { delta_tx, pending: Arc::new(Mutex::new(None)), runtime_state, + snapshot_writer, } } @@ -221,9 +227,18 @@ impl EmitHook { tool_name: &str, _tool_call_id: Option, _internal_call_id: &str, - _args: &str, + args: &str, result: &str, ) -> HookAction { + // Snapshot successful `fs_write` results so a later run of this agent + // can detect external modifications. `capture` self-gates on the write + // success prefix, so blocked/failed writes are ignored. + if tool_name == "fs_write" { + if let Some(writer) = &self.snapshot_writer { + writer.capture(args, result); + } + } + let is_mcp_error = tool_name.starts_with("mcp__") && result.starts_with("Error: "); if !is_mcp_error { return HookAction::cont(); diff --git a/crates/tenex-agent/src/main.rs b/crates/tenex-agent/src/main.rs index 7e93bbf5f..7a5726c3b 100644 --- a/crates/tenex-agent/src/main.rs +++ b/crates/tenex-agent/src/main.rs @@ -10,6 +10,7 @@ mod context_discovery; mod context_rig; mod emit; mod escalation; +mod file_modifications; mod home; mod hook; mod identity_resolver; diff --git a/crates/tenex-conversations/AGENTS.md b/crates/tenex-conversations/AGENTS.md index 26742cbb3..6e4c2b11b 100644 --- a/crates/tenex-conversations/AGENTS.md +++ b/crates/tenex-conversations/AGENTS.md @@ -26,10 +26,12 @@ Opened via `ConversationStore::open(path)` or `ConversationStore::open_in_memory ## Public API `ConversationStore` — the single open handle per project: -- Read: `list_recent`, `get_conversation`, `root_author_pubkey`, `get_messages`, `get_tool_messages`, `get_prompt_history`, `get_context_state`, `list_completions` -- Write: `ensure_conversation`, `upsert_conversation`, `update_metadata`, `append_message`, `append_tool_message`, `append_prompt_history_entry`, `upsert_context_state`, `record_completion` +- Read: `list_recent`, `get_conversation`, `root_author_pubkey`, `get_messages`, `get_tool_messages`, `get_prompt_history`, `get_context_state`, `list_completions`, `get_file_snapshots_for_agent` +- Write: `ensure_conversation`, `upsert_conversation`, `update_metadata`, `append_message`, `append_tool_message`, `append_prompt_history_entry`, `upsert_context_state`, `record_completion`, `record_file_snapshot` -Key types re-exported from `lib.rs`: `MessageRecord`, `NewMessage`, `NewToolMessage`, `NewPromptHistoryEntry`, `FrozenPromptMessage`, `AgentContextState`, `Completion`, `NewCompletion`, `ConversationListFilter`, `MessageQuery`. +`record_file_snapshot` upserts on `(conversation_id, agent_pubkey, file_path)` (last write wins): the `agent_file_snapshots` table (v3) captures the content of files an agent wrote via `fs_write`, so a later run of the same agent in the same conversation can diff against the current on-disk state. `file_path` is stored exactly as passed to `fs_write` (relative to the working dir); the reader re-resolves it the same way. + +Key types re-exported from `lib.rs`: `MessageRecord`, `NewMessage`, `NewToolMessage`, `NewPromptHistoryEntry`, `FrozenPromptMessage`, `AgentContextState`, `Completion`, `NewCompletion`, `FileSnapshot`, `NewFileSnapshot`, `ConversationListFilter`, `MessageQuery`. ## How to approach changes diff --git a/crates/tenex-conversations/src/lib.rs b/crates/tenex-conversations/src/lib.rs index e148d68ba..d4727363b 100644 --- a/crates/tenex-conversations/src/lib.rs +++ b/crates/tenex-conversations/src/lib.rs @@ -26,8 +26,9 @@ pub use error::{ConversationsError, Result}; pub use ids::normalize_project_id; pub use model::{ AgentContextState, AttachmentRecord, Completion, CompletionStatus, DelegationMarker, - DelegationStatus, FrozenPromptMessage, MessageRecord, NewCompletion, NewMessage, - NewPromptHistoryEntry, NewToolMessage, PromptHistoryEntry, ToolMessage, + DelegationStatus, FileSnapshot, FrozenPromptMessage, MessageRecord, NewCompletion, + NewFileSnapshot, NewMessage, NewPromptHistoryEntry, NewToolMessage, PromptHistoryEntry, + ToolMessage, }; pub use project::Project; pub use store::{ConversationListFilter, ConversationStore, MessageQuery}; diff --git a/crates/tenex-conversations/src/model.rs b/crates/tenex-conversations/src/model.rs index 567fe635b..260f38a3c 100644 --- a/crates/tenex-conversations/src/model.rs +++ b/crates/tenex-conversations/src/model.rs @@ -293,6 +293,38 @@ pub struct Completion { pub metadata: Option, } +/// One snapshot of a file an agent wrote via `fs_write`, captured at write +/// time so a later run of the same agent in the same conversation can detect +/// external modifications. Keyed by `(conversation_id, agent_pubkey, +/// file_path)` (upsert; last write wins). `file_path` is stored exactly as the +/// agent passed it to `fs_write` (relative to the working directory), so the +/// reader re-resolves it against the same working directory it would for a +/// write. `content_bytes` is the verbatim written bytes when ≤ 50 KB, else +/// `None`; `content_hash` is always the SHA-256 hex of the full content. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FileSnapshot { + pub id: i64, + pub conversation_id: String, + pub agent_pubkey: String, + pub execution_id: String, + pub file_path: String, + pub content_hash: String, + pub content_bytes: Option>, + pub size_bytes: i64, + pub recorded_at: i64, +} + +/// Input shape for [`crate::store::ConversationStore::record_file_snapshot`]. +#[derive(Debug, Clone)] +pub struct NewFileSnapshot { + pub agent_pubkey: String, + pub execution_id: String, + pub file_path: String, + pub content_hash: String, + pub content_bytes: Option>, + pub size_bytes: i64, +} + #[derive(Debug, Clone)] pub struct NewCompletion { pub root_event_id: Option, diff --git a/crates/tenex-conversations/src/schema.rs b/crates/tenex-conversations/src/schema.rs index a46bd730e..727167b1d 100644 --- a/crates/tenex-conversations/src/schema.rs +++ b/crates/tenex-conversations/src/schema.rs @@ -21,7 +21,7 @@ use rusqlite::Connection; use crate::error::{ConversationsError, Result}; -pub const EXPECTED_SCHEMA_VERSION: i64 = 2; +pub const EXPECTED_SCHEMA_VERSION: i64 = 3; const MIGRATION_V1: &str = r#" CREATE TABLE conversations ( @@ -194,9 +194,30 @@ CREATE INDEX idx_message_attachments_message ON message_attachments(message_id); "#; +const MIGRATION_V3: &str = r#" +CREATE TABLE agent_file_snapshots ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + conversation_id TEXT NOT NULL, + agent_pubkey TEXT NOT NULL, + execution_id TEXT NOT NULL, + file_path TEXT NOT NULL, + content_hash TEXT NOT NULL, + content_bytes BLOB, + size_bytes INTEGER NOT NULL, + recorded_at INTEGER NOT NULL, + UNIQUE(conversation_id, agent_pubkey, file_path), + FOREIGN KEY (conversation_id) + REFERENCES conversations(id) + ON DELETE CASCADE +); + +CREATE INDEX agent_file_snapshots_lookup + ON agent_file_snapshots(conversation_id, agent_pubkey); +"#; + /// Migrations indexed by target version. fn migrations() -> &'static [(i64, &'static str)] { - &[(1, MIGRATION_V1), (2, MIGRATION_V2)] + &[(1, MIGRATION_V1), (2, MIGRATION_V2), (3, MIGRATION_V3)] } /// Configure pragmas required by the crate. Must run on every connection. diff --git a/crates/tenex-conversations/src/store.rs b/crates/tenex-conversations/src/store.rs index 1af16d6a9..366a2b220 100644 --- a/crates/tenex-conversations/src/store.rs +++ b/crates/tenex-conversations/src/store.rs @@ -15,8 +15,8 @@ use rusqlite::{params, Connection, OptionalExtension, Row, Transaction, Transact use crate::error::{ConversationsError, Result}; use crate::model::{ AgentContextState, AttachmentRecord, Completion, CompletionStatus, ConversationRow, - DelegationMarker, DelegationStatus, MessageRecord, NewCompletion, NewMessage, - NewPromptHistoryEntry, NewToolMessage, PromptHistoryEntry, ToolMessage, + DelegationMarker, DelegationStatus, FileSnapshot, MessageRecord, NewCompletion, NewFileSnapshot, + NewMessage, NewPromptHistoryEntry, NewToolMessage, PromptHistoryEntry, ToolMessage, }; use crate::schema; @@ -694,6 +694,69 @@ impl ConversationStore { Ok(out) } + // ========================================================================== + // Agent file snapshots + // + // Capture the content of a file an agent wrote via `fs_write`, so a later + // run of the same agent in the same conversation can diff it against the + // current on-disk state and report external modifications. Upsert on + // `(conversation_id, agent_pubkey, file_path)` — last write wins — to bound + // table growth and ensure the baseline is always the agent's most recent + // write of that file. + // ========================================================================== + + /// Record (or replace) the snapshot for a file the agent wrote. Upserts on + /// `(conversation_id, agent_pubkey, file_path)`. + pub fn record_file_snapshot( + &self, + conversation_id: &str, + snapshot: &NewFileSnapshot, + ) -> Result<()> { + self.conn.execute( + "INSERT INTO agent_file_snapshots ( + conversation_id, agent_pubkey, execution_id, file_path, + content_hash, content_bytes, size_bytes, recorded_at + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) + ON CONFLICT(conversation_id, agent_pubkey, file_path) DO UPDATE SET + execution_id = excluded.execution_id, + content_hash = excluded.content_hash, + content_bytes = excluded.content_bytes, + size_bytes = excluded.size_bytes, + recorded_at = excluded.recorded_at", + params![ + conversation_id, + snapshot.agent_pubkey, + snapshot.execution_id, + snapshot.file_path, + snapshot.content_hash, + snapshot.content_bytes, + snapshot.size_bytes, + now_ms(), + ], + )?; + Ok(()) + } + + /// All file snapshots this agent has written in this conversation, ordered + /// by `file_path` for stable rendering. + pub fn get_file_snapshots_for_agent( + &self, + conversation_id: &str, + agent_pubkey: &str, + ) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT id, conversation_id, agent_pubkey, execution_id, file_path, + content_hash, content_bytes, size_bytes, recorded_at + FROM agent_file_snapshots + WHERE conversation_id = ?1 AND agent_pubkey = ?2 + ORDER BY file_path ASC", + )?; + let rows = stmt + .query_map(params![conversation_id, agent_pubkey], row_to_file_snapshot)? + .collect::, _>>()?; + Ok(rows) + } + // ========================================================================== // Delegation markers // @@ -1360,6 +1423,20 @@ fn row_to_completion(row: &Row<'_>) -> rusqlite::Result { }) } +fn row_to_file_snapshot(row: &Row<'_>) -> rusqlite::Result { + Ok(FileSnapshot { + id: row.get(0)?, + conversation_id: row.get(1)?, + agent_pubkey: row.get(2)?, + execution_id: row.get(3)?, + file_path: row.get(4)?, + content_hash: row.get(5)?, + content_bytes: row.get(6)?, + size_bytes: row.get(7)?, + recorded_at: row.get(8)?, + }) +} + fn now_ms() -> i64 { SystemTime::now() .duration_since(UNIX_EPOCH) diff --git a/crates/tenex-conversations/tests/integration.rs b/crates/tenex-conversations/tests/integration.rs index 861e65d78..13309e92b 100644 --- a/crates/tenex-conversations/tests/integration.rs +++ b/crates/tenex-conversations/tests/integration.rs @@ -6,8 +6,8 @@ use serde_json::json; use tempfile::TempDir; use tenex_conversations::model::{AgentContextState, CompletionStatus, ConversationRow}; use tenex_conversations::{ - ConversationListFilter, ConversationStore, MessageQuery, NewCompletion, NewMessage, - NewPromptHistoryEntry, NewToolMessage, Project, + ConversationListFilter, ConversationStore, MessageQuery, NewCompletion, NewFileSnapshot, + NewMessage, NewPromptHistoryEntry, NewToolMessage, Project, }; fn make_conversation(store: &ConversationStore, id: &str, last_activity: i64) { @@ -1251,3 +1251,91 @@ fn append_message_serializes_under_concurrent_writers() { "all sequences must be distinct" ); } + +#[test] +fn file_snapshot_upsert_is_last_write_wins() { + let store = ConversationStore::open_in_memory().unwrap(); + store.ensure_conversation("conv-snap").unwrap(); + + store + .record_file_snapshot( + "conv-snap", + &NewFileSnapshot { + agent_pubkey: "agent-1".into(), + execution_id: "exec-1".into(), + file_path: "src/foo.rs".into(), + content_hash: "hash-v1".into(), + content_bytes: Some(b"original\n".to_vec()), + size_bytes: 9, + }, + ) + .unwrap(); + + // Second write of the same (conversation, agent, path) replaces the row. + store + .record_file_snapshot( + "conv-snap", + &NewFileSnapshot { + agent_pubkey: "agent-1".into(), + execution_id: "exec-2".into(), + file_path: "src/foo.rs".into(), + content_hash: "hash-v2".into(), + content_bytes: Some(b"updated\n".to_vec()), + size_bytes: 8, + }, + ) + .unwrap(); + + let snaps = store + .get_file_snapshots_for_agent("conv-snap", "agent-1") + .unwrap(); + assert_eq!(snaps.len(), 1, "upsert must not accumulate duplicate rows"); + let snap = &snaps[0]; + assert_eq!(snap.content_hash, "hash-v2"); + assert_eq!(snap.execution_id, "exec-2"); + assert_eq!(snap.size_bytes, 8); + assert_eq!(snap.content_bytes.as_deref(), Some(b"updated\n".as_slice())); + + // A different agent / different path are independent rows. + store + .record_file_snapshot( + "conv-snap", + &NewFileSnapshot { + agent_pubkey: "agent-1".into(), + execution_id: "exec-3".into(), + file_path: "src/bar.rs".into(), + content_hash: "hash-bar".into(), + content_bytes: None, + size_bytes: 70_000, + }, + ) + .unwrap(); + store + .record_file_snapshot( + "conv-snap", + &NewFileSnapshot { + agent_pubkey: "agent-2".into(), + execution_id: "exec-4".into(), + file_path: "src/foo.rs".into(), + content_hash: "hash-other".into(), + content_bytes: Some(b"other\n".to_vec()), + size_bytes: 6, + }, + ) + .unwrap(); + + let agent1 = store + .get_file_snapshots_for_agent("conv-snap", "agent-1") + .unwrap(); + assert_eq!(agent1.len(), 2, "agent-1 has two distinct files"); + // Ordered by file_path: bar.rs before foo.rs. + assert_eq!(agent1[0].file_path, "src/bar.rs"); + assert_eq!(agent1[0].content_bytes, None, "oversized stores NULL bytes"); + assert_eq!(agent1[1].file_path, "src/foo.rs"); + + let agent2 = store + .get_file_snapshots_for_agent("conv-snap", "agent-2") + .unwrap(); + assert_eq!(agent2.len(), 1); + assert_eq!(agent2[0].content_hash, "hash-other"); +} diff --git a/scripts/tenex-runtime-probe-scenarios.ts b/scripts/tenex-runtime-probe-scenarios.ts index 834a4da9a..f9af79b32 100644 --- a/scripts/tenex-runtime-probe-scenarios.ts +++ b/scripts/tenex-runtime-probe-scenarios.ts @@ -1,4 +1,4 @@ -import { readFileSync } from "node:fs"; +import { readFileSync, writeFileSync } from "node:fs"; import path from "node:path"; import type { Event, EventTemplate, SimplePool } from "nostr-tools"; import { @@ -49,6 +49,7 @@ export const availableScenarios = [ "delegation-crossproject", "same-agent-concurrency", "fs-read-adjustment", + "file-modification-tracking", "mcp-tool-basic", "mcp-resource-basic", "acp-worker-basic", @@ -122,6 +123,16 @@ export const signAsUserExplanation = export const signAsUserCompletionText = "sign_as_user probe complete"; export const backendKind1RoutingRequest = "BACKEND_KIND1_ROUTING_PROBE"; export const backendKind1RoutingCompletionText = "backend kind1 routed"; +export const fileModificationProbeFileName = "probe-file.txt"; +export const fileModificationOriginalContent = "original\n"; +export const fileModificationModifiedContent = "modified\n"; +export const fileModificationFirstRequest = + "FILE_MOD_PROBE: write probe-file.txt with the word original."; +export const fileModificationSecondRequest = + "FILE_MOD_PROBE: did anything change in probe-file.txt?"; +export const fileModificationFirstCompletionText = "wrote probe-file.txt"; +export const fileModificationSecondCompletionText = + "probe-file.txt was modified externally."; const colorWords = [ "red", "blue", "green", "yellow", "purple", "orange", "pink", "black", @@ -199,6 +210,9 @@ export function scenarioProjectDtag(name: ScenarioName): string { if (name === "same-agent-concurrency") { return "probe-concurrency"; } + if (name === "file-modification-tracking") { + return "probe-file-modification-tracking"; + } if (name === "mcp-tool-basic") { return "probe-mcp-tool"; } @@ -323,6 +337,9 @@ export function pmInstructions(name: ScenarioName): string { if (name === "backend-kind1-routing") { return "This scenario verifies backend-signed relay routing. Do not call tools. Reply exactly: backend kind1 routed."; } + if (name === "file-modification-tracking") { + return "This scenario verifies file-modification tracking. On the first request, use fs_write to create probe-file.txt with content 'original' and a trailing newline, then confirm you wrote it. On the second request, you will receive a system-reminder of type file-modifications describing external changes; acknowledge that probe-file.txt was modified externally."; + } return "Use fs_read one file at a time. If the user corrects the requested total, follow the latest total before finishing."; } @@ -372,6 +389,46 @@ export function mockScenario(name: ScenarioName): unknown { }; } + if (name === "file-modification-tracking") { + return { + responses: [ + // First run (turn 1 of its own process): write the file. + { + agent: "pm", + turn: 1, + contains: fileModificationFirstRequest, + toolCalls: [ + { + name: "fs_write", + args: { + path: fileModificationProbeFileName, + content: fileModificationOriginalContent, + description: "write probe file for file-modification tracking", + }, + }, + ], + }, + { + agent: "pm", + turn: 2, + contains: "Successfully wrote", + content: fileModificationFirstCompletionText, + }, + // Second run (a fresh process, so turn resets to 1): the + // file-modifications reminder must already be in the system + // prompt. Match on it to prove the reminder was injected. + { + agent: "pm", + turn: 1, + containsAll: ["file-modifications", fileModificationProbeFileName], + content: fileModificationSecondCompletionText, + }, + ], + defaultContent: + "File-modification tracking probe did not match expected runtime state.", + }; + } + if (name === "same-agent-concurrency") { return { responses: [ @@ -729,6 +786,8 @@ export async function runScenario(name: ScenarioName, context: ScenarioContext): await runCrossProjectDelegationProbe(context); } else if (name === "same-agent-concurrency") { await runSameAgentConcurrencyProbe(context); + } else if (name === "file-modification-tracking") { + await runFileModificationTrackingProbe(context); } else if (name === "mcp-tool-basic") { await runMcpToolProbe(context); } else if (name === "mcp-resource-basic") { @@ -1308,6 +1367,77 @@ async function runFsReadAdjustmentProbe(context: ScenarioContext): Promise await context.delay(Number(process.env.TENEX_PROBE_WAIT_MS ?? 8_000)); } +async function runFileModificationTrackingProbe(context: ScenarioContext): Promise { + const timeoutMs = Number(process.env.TENEX_PROBE_WAIT_MS ?? 15_000); + + // First run: ask the PM to write probe-file.txt. The mock cassette turn 1 + // issues the fs_write; the EmitHook snapshots the written bytes into the + // conversation DB. + const firstEvent = context.sign( + { + kind: 1, + created_at: context.now(), + content: fileModificationFirstRequest, + tags: [["a", context.projectRef]], + }, + context.userSecret + ); + await Promise.all(context.pool.publish([context.relayUrl], firstEvent)); + + // Wait for the fs_write tool event so we know the file exists on disk and + // the snapshot was captured before we overwrite it. + await context.waitForObservedEvent( + context.events, + (event) => + event.pubkey === context.pmPubkey && + event.tags.some((tag) => tag[0] === "tool" && tag[1] === "fs_write"), + timeoutMs, + "fs_write tool event (first run)" + ); + // Wait for the first run's PM completion so the run has fully finished. + await waitForStoredMessage( + context.conversationDbPath, + firstEvent.id, + (message) => + message.authorPubkey === context.pmPubkey && + message.content.includes(fileModificationFirstCompletionText), + timeoutMs, + "PM completion (first run)", + context.delay + ); + + // External modification: overwrite the file the agent wrote. + const probeFilePath = path.join( + context.workspaceDir, + fileModificationProbeFileName + ); + writeFileSync(probeFilePath, fileModificationModifiedContent); + + // Second run, same conversation (threaded via the root e-tag). On bootstrap + // the agent must see a file-modifications system-reminder for probe-file.txt. + const secondEvent = context.sign( + { + kind: 1, + created_at: context.now(), + content: fileModificationSecondRequest, + tags: [ + ["e", firstEvent.id, "", "root"], + ["p", context.pmPubkey], + ], + }, + context.userSecret + ); + await Promise.all(context.pool.publish([context.relayUrl], secondEvent)); + await waitForStoredMessage( + context.conversationDbPath, + secondEvent.id, + (message) => message.authorPubkey === context.pmPubkey, + timeoutMs, + "PM completion (second run)", + context.delay + ); +} + async function runMcpToolProbe(context: ScenarioContext): Promise { const userEvent = context.sign( { diff --git a/scripts/tenex-runtime-probe-verdicts.ts b/scripts/tenex-runtime-probe-verdicts.ts index 2857bcbdf..94637c5d7 100644 --- a/scripts/tenex-runtime-probe-verdicts.ts +++ b/scripts/tenex-runtime-probe-verdicts.ts @@ -16,6 +16,8 @@ import { convReminderCompletionText, convReminderProbeMessage, crossProjectDelegationUserRequest, + fileModificationProbeFileName, + fileModificationSecondRequest, delegationUserRequest, delegationWorkerCompletionText, extractColorChoice, @@ -104,6 +106,12 @@ export function evaluate( if (name === "same-agent-concurrency") { return [...commonVerdicts, ...evaluateSameAgentConcurrency(events, context)]; } + if (name === "file-modification-tracking") { + return [ + ...commonVerdicts, + ...evaluateFileModificationTracking(events, requestRecords, context), + ]; + } if (name === "mcp-tool-basic") { return [...commonVerdicts, ...evaluateMcpTool(events, requestRecords, context)]; } @@ -398,6 +406,52 @@ function evaluateRootAgentsMd( ]; } +function evaluateFileModificationTracking( + events: Event[], + requestRecords: MockRequestRecord[], + context: EvaluateContext +): Verdict[] { + // The first run must have actually written the file (fs_write tool event). + const wroteFile = events.some( + (event) => + event.pubkey === context.pmPubkey && + hasTag(event, "tool", "fs_write") && + (tagValue(event, "tool-args") ?? "").includes(fileModificationProbeFileName) + ); + + // The second run's model request must carry the file-modifications reminder + // for probe-file.txt in its system prompt (visible in requestDebug). + const secondRunRequest = requestRecords.find( + (record) => + record.agent === "pm" && + record.requestDebug.includes(fileModificationSecondRequest) && + record.requestDebug.includes('type="file-modifications"') && + record.requestDebug.includes(fileModificationProbeFileName) + ); + const requestDebug = secondRunRequest?.requestDebug ?? ""; + + return [ + { + name: "First run wrote probe-file.txt via fs_write", + ok: wroteFile, + detail: "Expected an fs_write tool event for probe-file.txt on the first run.", + }, + { + name: "Second run prompt contains file-modifications reminder", + ok: Boolean(secondRunRequest), + detail: "Expected the second PM request system prompt to contain a file-modifications reminder for probe-file.txt.", + }, + { + name: "Reminder diff shows original → modified", + ok: + Boolean(secondRunRequest) && + requestDebug.includes("-original") && + requestDebug.includes("+modified"), + detail: "Expected the file-modifications diff to show the original line removed and the modified line added.", + }, + ]; +} + function evaluateNestedAgentsMd( events: Event[], requestRecords: MockRequestRecord[],