From f0fb57e571d995612612549fd27b2b473a9f6483 Mon Sep 17 00:00:00 2001 From: "Andrei G." Date: Fri, 29 May 2026 00:21:26 +0200 Subject: [PATCH 1/2] fix(llm,mcp,core): add embed timeout and track JoinHandles in ShadowSentinel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #4566 — RouterProvider::spawn_asi_update and EmbeddingAnomalyGuard::check_async now wrap embed() calls with tokio::time::timeout(embed_timeout_ms). On Elapsed the task returns early (same as embed error path); no ASI window update or anomaly event is emitted. EmbeddingAnomalyGuard gains an embed_timeout_ms field (default 5000 ms) with a with_embed_timeout() builder. Closes #4570 — ShadowSentinel::check_tool_call and record_tool_event no longer drop JoinHandles. A bounded Mutex> (cap MAX_PENDING_WRITES = 32) replaces the two let _ = tokio::spawn(...) sites. spawn_persist() reaps completed handles with try_join_next (non-blocking) before spawning; if still at capacity the new task is skipped with a debug log. drain_pending() (blocking) is called in Agent::shutdown() to flush remaining writes. --- crates/zeph-core/src/agent/mod.rs | 5 + crates/zeph-core/src/agent/shadow_sentinel.rs | 122 +++++++++++++++++- crates/zeph-llm/src/router/mod.rs | 75 ++++++++++- crates/zeph-mcp/src/embedding_guard.rs | 87 ++++++++++++- 4 files changed, 277 insertions(+), 12 deletions(-) diff --git a/crates/zeph-core/src/agent/mod.rs b/crates/zeph-core/src/agent/mod.rs index 18322db41..9d13cbaec 100644 --- a/crates/zeph-core/src/agent/mod.rs +++ b/crates/zeph-core/src/agent/mod.rs @@ -777,6 +777,11 @@ impl Agent { h.abort(); } + // Drain pending shadow sentinel DB writes before final teardown. + if let Some(ref sentinel) = self.services.security.shadow_sentinel { + sentinel.drain_pending().await; + } + // Allow cancelled tasks to release their HTTP connections before the summary LLM call. // abort_all() posts cancellation signals but does not drain tasks; aborted futures only // observe cancellation at their next .await point. Without yielding here the summary diff --git a/crates/zeph-core/src/agent/shadow_sentinel.rs b/crates/zeph-core/src/agent/shadow_sentinel.rs index 351c7e8da..6ab5e3988 100644 --- a/crates/zeph-core/src/agent/shadow_sentinel.rs +++ b/crates/zeph-core/src/agent/shadow_sentinel.rs @@ -33,6 +33,8 @@ use std::sync::{ Arc, atomic::{AtomicU32, Ordering}, }; +use tokio::sync::Mutex; +use tokio::task::JoinSet; use serde_json::Value as JsonValue; use tracing::{Instrument as _, info_span}; @@ -436,6 +438,13 @@ impl From for SentinelEvent { // ── ShadowSentinel ──────────────────────────────────────────────────────────── +/// Maximum number of concurrent fire-and-forget persist tasks tracked in `pending_writes`. +/// +/// When the set is at capacity the oldest completed tasks are reaped before spawning a new one. +/// If the set is still full after reaping (all tasks are still running), the new spawn is skipped +/// with a debug log — persistence is best-effort and the sentinel must never block tool dispatch. +const MAX_PENDING_WRITES: usize = 32; + /// Orchestrates the persistent safety stream and LLM pre-execution probe. /// /// `ShadowSentinel` is wrapped in `Arc` and shared between `ShadowProbeExecutor` instances @@ -448,6 +457,7 @@ impl From for SentinelEvent { /// probe counter. /// - `check_tool_call()` — call before each tool execution to probe high-risk calls. /// - `record_tool_event()` — call after tool execution to persist the event. +/// - `drain_pending()` — call at session shutdown to await all queued persist writes. /// /// # NEVER /// @@ -460,6 +470,9 @@ pub struct ShadowSentinel { /// probe-checking methods can take `&self` even under parallel tool execution. probes_this_turn: AtomicU32, session_id: SessionId, + /// Bounded set of fire-and-forget DB persist tasks. Prevents unbounded task accumulation + /// and ensures panics surface at `drain_pending()` instead of being silently swallowed. + pending_writes: Mutex>, } impl ShadowSentinel { @@ -484,6 +497,7 @@ impl ShadowSentinel { config, probes_this_turn: AtomicU32::new(0), session_id: session_id.into(), + pending_writes: Mutex::new(JoinSet::new()), } } @@ -627,11 +641,12 @@ impl ShadowSentinel { created_at: unix_now(), }; let store = self.store.clone(); - tokio::spawn(async move { + self.spawn_persist(async move { if let Err(e) = store.record(&event).await { tracing::warn!(error = %e, "ShadowSentinel: failed to persist probe result"); } - }); + }) + .await; verdict } @@ -639,7 +654,7 @@ impl ShadowSentinel { /// Persist a tool execution event in the shadow stream (fire-and-forget). /// /// Called after a tool finishes execution to maintain the trajectory for future probes. - pub fn record_tool_event( + pub async fn record_tool_event( &self, qualified_tool_id: &str, turn_number: u64, @@ -662,11 +677,44 @@ impl ShadowSentinel { created_at: unix_now(), }; let store = self.store.clone(); - tokio::spawn(async move { + self.spawn_persist(async move { if let Err(e) = store.record(&event).await { tracing::warn!(error = %e, "ShadowSentinel: failed to persist tool event"); } - }); + }) + .await; + } + + /// Await all queued fire-and-forget persist tasks. + /// + /// Call once at session shutdown to ensure no DB writes are silently dropped. + /// All errors have already been logged inside each task; this method only joins the handles. + pub async fn drain_pending(&self) { + let mut set = self.pending_writes.lock().await; + while set.join_next().await.is_some() {} + } + + /// Spawn a background persist task into the bounded `JoinSet`. + /// + /// Reaps completed handles before spawning to stay within `MAX_PENDING_WRITES`. If the set + /// is still at capacity after reaping (all tasks still running), the new task is dropped and + /// a debug message is emitted — persistence is best-effort and must never block the tool path. + async fn spawn_persist(&self, fut: F) + where + F: std::future::Future + Send + 'static, + { + let mut set = self.pending_writes.lock().await; + // Reap only already-finished handles — never block waiting for a running task. + // try_join_next() returns immediately if no task has completed yet. + while set.try_join_next().is_some() {} + if set.len() < MAX_PENDING_WRITES { + set.spawn(fut); + } else { + tracing::debug!( + max = MAX_PENDING_WRITES, + "ShadowSentinel: pending_writes at capacity, skipping persist" + ); + } } /// Reset the per-turn probe counter. @@ -887,6 +935,70 @@ mod tests { ); } + // ── JoinSet regression tests (#4570) ───────────────────────────────────── + + /// `drain_pending` awaits all spawned persist tasks and returns when the set is empty. + #[tokio::test] + async fn drain_pending_awaits_all_tasks() { + use std::sync::atomic::{AtomicU32, Ordering}; + + let config = zeph_config::ShadowSentinelConfig::default(); + let sentinel = make_test_sentinel(config).await; + + let counter = Arc::new(AtomicU32::new(0)); + for _ in 0..5 { + let c = Arc::clone(&counter); + sentinel + .spawn_persist(async move { + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + c.fetch_add(1, Ordering::Relaxed); + }) + .await; + } + + sentinel.drain_pending().await; + + assert_eq!( + counter.load(Ordering::Relaxed), + 5, + "drain_pending must join all 5 tasks before returning" + ); + } + + /// When the pending set is at capacity and all running tasks complete before the next + /// `spawn_persist`, the new task IS accepted (the set has room after reaping). + /// Conversely, if we fill the set, drain it, then overfill past capacity while tasks are + /// still running — the implementation drops extras. We verify the simpler property: + /// `spawn_persist` never panics when called repeatedly beyond `MAX_PENDING_WRITES`. + #[tokio::test] + async fn spawn_persist_beyond_capacity_does_not_panic() { + use std::sync::atomic::{AtomicU32, Ordering}; + + let config = zeph_config::ShadowSentinelConfig::default(); + let sentinel = make_test_sentinel(config).await; + let counter = Arc::new(AtomicU32::new(0)); + + // Spawn twice the capacity; each task completes instantly. + // spawn_persist will reap completed tasks between spawns, so most will be accepted. + for _ in 0..(MAX_PENDING_WRITES * 2) { + let c = Arc::clone(&counter); + sentinel + .spawn_persist(async move { + c.fetch_add(1, Ordering::Relaxed); + }) + .await; + } + + sentinel.drain_pending().await; + + // All tasks (or at least MAX_PENDING_WRITES of them) must have run; none panicked. + let ran = counter.load(Ordering::Relaxed); + assert!( + ran >= u32::try_from(MAX_PENDING_WRITES).unwrap(), + "at least MAX_PENDING_WRITES tasks must complete; ran={ran}" + ); + } + // Build a minimal ShadowSentinel with a no-op probe for unit tests. // // Opens an in-memory SQLite pool. Store methods are never called in these unit diff --git a/crates/zeph-llm/src/router/mod.rs b/crates/zeph-llm/src/router/mod.rs index be639e0da..9b49139b2 100644 --- a/crates/zeph-llm/src/router/mod.rs +++ b/crates/zeph-llm/src/router/mod.rs @@ -1446,20 +1446,38 @@ impl RouterProvider { let router = self.clone(); let window_size = asi_cfg.window; let provider_name = provider.to_owned(); + let embed_timeout_ms = self.embed_timeout_ms; tokio::spawn(async move { - let emb = match precomputed_embedding { - Some(e) => e, - None => match router.embed(&response).await { + let emb = if let Some(e) = precomputed_embedding { + e + } else { + let embed_fut = router.embed(&response); + let embed_result = if embed_timeout_ms > 0 { + let timeout = std::time::Duration::from_millis(embed_timeout_ms); + if let Ok(r) = tokio::time::timeout(timeout, embed_fut).await { + r + } else { + tracing::debug!( + provider = provider_name, + timeout_ms = embed_timeout_ms, + "asi: embed timed out, skipping coherence update" + ); + return; + } + } else { + embed_fut.await + }; + match embed_result { Ok(e) => e, - Err(e) => { + Err(err) => { tracing::debug!( provider = provider_name, - error = %e, + error = %err, "asi: embed failed, skipping coherence update" ); return; } - }, + } }; let mut state = asi.lock(); state.push_embedding(&provider_name, emb, window_size); @@ -3936,4 +3954,49 @@ mod tests { let result = super::blocking_load(|| 42_u32); assert_eq!(result, 42, "blocking_load must return the closure result"); } + + // ── spawn_asi_update timeout regression (#4566) ─────────────────────────── + + /// Regression for #4566: when `embed()` inside `spawn_asi_update` exceeds `embed_timeout_ms`, + /// the ASI coherence window must NOT be updated (task returns early without pushing embedding). + #[tokio::test] + async fn spawn_asi_update_embed_timeout_does_not_update_asi() { + use crate::mock::MockProvider; + use std::sync::atomic::Ordering; + + // Provider that takes 200 ms to embed — well above the 10 ms timeout. + let mut m = MockProvider::with_responses(vec!["ok".to_owned()]); + m.supports_embeddings = true; + m.embedding = vec![1.0, 0.0]; + m.embed_delay_ms = 200; + let provider_embed_calls = Arc::clone(&m.embed_call_count); + + let r = RouterProvider::new(vec![AnyProvider::Mock(m)]) + .with_asi(AsiRouterConfig::default()) + .with_embed_timeout(10); + + // Inject a sentinel turn id so the debounce does not fire. + r.state.asi_last_turn.store(u64::MAX, Ordering::SeqCst); + + // No precomputed embedding → router will attempt to call embed(). + r.spawn_asi_update("p1", "response".to_owned(), 1u64, None); + + // Wait long enough for the spawned task to reach its timeout and return. + tokio::time::sleep(std::time::Duration::from_millis(150)).await; + + // embed() was called (the call was made before the timeout fired). + assert!( + provider_embed_calls.load(Ordering::Relaxed) >= 1, + "embed() must have been attempted" + ); + + // ASI window must be empty — timeout fired before push_embedding could run. + let asi = r.asi.as_ref().unwrap().lock(); + let coherence = asi.coherence("p1"); + // coherence() returns 1.0 when the provider is unknown (no entries in the window). + assert!( + (coherence - 1.0).abs() < f32::EPSILON, + "ASI window must be empty after embed timeout; coherence={coherence}" + ); + } } diff --git a/crates/zeph-mcp/src/embedding_guard.rs b/crates/zeph-mcp/src/embedding_guard.rs index 73df316a0..0d40096fb 100644 --- a/crates/zeph-mcp/src/embedding_guard.rs +++ b/crates/zeph-mcp/src/embedding_guard.rs @@ -13,6 +13,7 @@ //! first-line defense (regex patterns in `sanitize.rs` cover that case). use std::sync::{Arc, LazyLock}; +use std::time::Duration; use zeph_common::ToolName; @@ -56,6 +57,9 @@ struct CentroidState { sample_count: usize, } +/// Default timeout for embedding computation inside the background anomaly-check task. +const DEFAULT_EMBED_TIMEOUT_MS: u64 = 5000; + /// Detects anomalous MCP tool output via embedding distance from a per-server centroid. /// /// `check_async()` is fire-and-forget: it returns immediately and sends results via @@ -70,6 +74,8 @@ pub struct EmbeddingAnomalyGuard { /// Caps the per-sample update rate once the centroid is established, preventing /// slow boiling-frog drift attacks. Default: 0.01 (1% shift per clean sample max). ema_floor: f32, + /// Maximum milliseconds to wait for the embedding computation. `0` means no timeout. + embed_timeout_ms: u64, result_tx: mpsc::UnboundedSender, } @@ -79,6 +85,7 @@ impl std::fmt::Debug for EmbeddingAnomalyGuard { .field("threshold", &self.threshold) .field("min_samples", &self.min_samples) .field("ema_floor", &self.ema_floor) + .field("embed_timeout_ms", &self.embed_timeout_ms) .finish_non_exhaustive() } } @@ -106,11 +113,19 @@ impl EmbeddingAnomalyGuard { threshold, min_samples, ema_floor, + embed_timeout_ms: DEFAULT_EMBED_TIMEOUT_MS, result_tx: tx, }; (guard, rx) } + /// Override the embedding timeout. `0` disables the timeout. + #[must_use] + pub fn with_embed_timeout(mut self, timeout_ms: u64) -> Self { + self.embed_timeout_ms = timeout_ms; + self + } + /// Fire-and-forget anomaly check. /// /// Returns immediately. Results are delivered via the `mpsc` channel returned by `new()`. @@ -148,13 +163,31 @@ impl EmbeddingAnomalyGuard { let embed_fn = Arc::clone(&self.embed_fn); let threshold = self.threshold; + let embed_timeout_ms = self.embed_timeout_ms; let tx = self.result_tx.clone(); let server_id = server_id.to_owned(); let tool_name: ToolName = tool_name.into(); let output = tool_output.to_owned(); tokio::spawn(async move { - match (embed_fn)(&output).await { + let embed_result = if embed_timeout_ms > 0 { + let timeout = Duration::from_millis(embed_timeout_ms); + if let Ok(r) = tokio::time::timeout(timeout, (embed_fn)(&output)).await { + r + } else { + // Fail-open: timeout does not block the tool output path. + tracing::debug!( + server_id, + tool_name = %tool_name, + timeout_ms = embed_timeout_ms, + "embedding guard: computation timed out" + ); + return; + } + } else { + (embed_fn)(&output).await + }; + match embed_result { Ok(embedding) => { let distance = cosine_distance(&embedding, ¢roid); let result = if distance > threshold { @@ -455,4 +488,56 @@ mod tests { state.centroid[1] ); } + + /// Regression for #4566: when embed computation exceeds `embed_timeout_ms`, the task + /// returns early (fail-open) and no event is sent to the result channel. + #[tokio::test] + async fn check_async_embed_timeout_is_fail_open() { + // embed_fn sleeps 200 ms — well above the 10 ms timeout. + let embed_fn: Arc EmbedFuture + Send + Sync> = Arc::new(|_| { + Box::pin(async { + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + Ok(vec![1.0f32, 0.0]) + }) + }); + let (guard, mut rx) = EmbeddingAnomalyGuard::new(embed_fn, 0.35, 2, 0.01); + // Set a 10 ms timeout so the 200 ms embed always expires. + let guard = guard.with_embed_timeout(10); + + // Warm up centroid so the guard takes the embedding path (not regex fallback). + guard.record_clean("srv", &[1.0f32, 0.0]); + guard.record_clean("srv", &[1.0f32, 0.0]); + + guard.check_async("srv", "tool", "some output"); + + // Wait long enough for the task to run and hit the timeout. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Fail-open: no event should arrive when the embed computation timed out. + assert!( + rx.try_recv().is_err(), + "embed timeout must not produce an event — fail-open behaviour expected" + ); + } + + /// `with_embed_timeout(0)` disables the timeout: embed runs to completion. + #[tokio::test] + async fn check_async_zero_timeout_disables_timeout() { + let embed_fn: Arc EmbedFuture + Send + Sync> = + Arc::new(|_| Box::pin(async { Ok(vec![1.0f32, 0.0]) })); + let (guard, mut rx) = EmbeddingAnomalyGuard::new(embed_fn, 0.35, 2, 0.01); + let guard = guard.with_embed_timeout(0); // disabled + + guard.record_clean("srv", &[1.0f32, 0.0]); + guard.record_clean("srv", &[1.0f32, 0.0]); + + guard.check_async("srv", "tool", "output"); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + assert!( + rx.try_recv().is_ok(), + "timeout=0 must allow embed to complete and produce a result" + ); + } } From 52bbfbbf31d07df0a7b87899771cb7bdd01612b1 Mon Sep 17 00:00:00 2001 From: "Andrei G." Date: Fri, 29 May 2026 00:28:58 +0200 Subject: [PATCH 2/2] chore(changelog): document #4566 and #4570 fixes in [Unreleased] --- CHANGELOG.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f8c637edc..7ea1fd090 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ### Fixed +- `zeph-llm`, `zeph-mcp`: embed calls inside fire-and-forget tasks (`RouterProvider::spawn_asi_update` + and `EmbeddingAnomalyGuard::check_async`) are now bounded by `embed_timeout_ms` (default 5 s) via + `tokio::time::timeout`. On timeout the task returns early — same as the existing embed-error path — + preventing indefinite task accumulation when the embedding provider stalls (closes #4566). + `EmbeddingAnomalyGuard` gains an `embed_timeout_ms` field and `with_embed_timeout()` builder. +- `zeph-core`: `ShadowSentinel` no longer drops `JoinHandle`s from its two fire-and-forget persist + tasks. Both spawn sites now route through a bounded `Mutex>` (capacity 32). Completed + handles are reaped non-blockingly with `try_join_next()` before each spawn; if the set is still at + capacity the new task is skipped with a debug log. `drain_pending()` awaits all remaining writes at + session shutdown via `Agent::shutdown()` (closes #4570). - `zeph-core`: `select_messages_for_compression` now sorts indices before building `to_compress`, ensuring messages are passed to the compression LLM in chronological (ascending index) order. Previously, iterating a raw `HashSet` produced non-deterministic ordering (closes #4558).