Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

### Fixed

- `zeph-llm`, `zeph-mcp`: embed calls inside fire-and-forget tasks (`RouterProvider::spawn_asi_update`
and `EmbeddingAnomalyGuard::check_async`) are now bounded by `embed_timeout_ms` (default 5 s) via
`tokio::time::timeout`. On timeout the task returns early — same as the existing embed-error path —
preventing indefinite task accumulation when the embedding provider stalls (closes #4566).
`EmbeddingAnomalyGuard` gains an `embed_timeout_ms` field and `with_embed_timeout()` builder.
- `zeph-core`: `ShadowSentinel` no longer drops `JoinHandle`s from its two fire-and-forget persist
tasks. Both spawn sites now route through a bounded `Mutex<JoinSet<()>>` (capacity 32). Completed
handles are reaped non-blockingly with `try_join_next()` before each spawn; if the set is still at
capacity the new task is skipped with a debug log. `drain_pending()` awaits all remaining writes at
session shutdown via `Agent::shutdown()` (closes #4570).
- `zeph-core`: `select_messages_for_compression` now sorts indices before building `to_compress`,
ensuring messages are passed to the compression LLM in chronological (ascending index) order.
Previously, iterating a raw `HashSet<usize>` produced non-deterministic ordering (closes #4558).
Expand Down
5 changes: 5 additions & 0 deletions crates/zeph-core/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,11 @@ impl<C: Channel> Agent<C> {
h.abort();
}

// Drain pending shadow sentinel DB writes before final teardown.
if let Some(ref sentinel) = self.services.security.shadow_sentinel {
sentinel.drain_pending().await;
}

// Allow cancelled tasks to release their HTTP connections before the summary LLM call.
// abort_all() posts cancellation signals but does not drain tasks; aborted futures only
// observe cancellation at their next .await point. Without yielding here the summary
Expand Down
122 changes: 117 additions & 5 deletions crates/zeph-core/src/agent/shadow_sentinel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use std::sync::{
Arc,
atomic::{AtomicU32, Ordering},
};
use tokio::sync::Mutex;
use tokio::task::JoinSet;

use serde_json::Value as JsonValue;
use tracing::{Instrument as _, info_span};
Expand Down Expand Up @@ -436,6 +438,13 @@ impl From<ShadowEventRow> for SentinelEvent {

// ── ShadowSentinel ────────────────────────────────────────────────────────────

/// Maximum number of concurrent fire-and-forget persist tasks tracked in `pending_writes`.
///
/// When the set is at capacity the oldest completed tasks are reaped before spawning a new one.
/// If the set is still full after reaping (all tasks are still running), the new spawn is skipped
/// with a debug log — persistence is best-effort and the sentinel must never block tool dispatch.
const MAX_PENDING_WRITES: usize = 32;

/// Orchestrates the persistent safety stream and LLM pre-execution probe.
///
/// `ShadowSentinel` is wrapped in `Arc` and shared between `ShadowProbeExecutor` instances
Expand All @@ -448,6 +457,7 @@ impl From<ShadowEventRow> for SentinelEvent {
/// probe counter.
/// - `check_tool_call()` — call before each tool execution to probe high-risk calls.
/// - `record_tool_event()` — call after tool execution to persist the event.
/// - `drain_pending()` — call at session shutdown to await all queued persist writes.
///
/// # NEVER
///
Expand All @@ -460,6 +470,9 @@ pub struct ShadowSentinel {
/// probe-checking methods can take `&self` even under parallel tool execution.
probes_this_turn: AtomicU32,
session_id: SessionId,
/// Bounded set of fire-and-forget DB persist tasks. Prevents unbounded task accumulation
/// and ensures panics surface at `drain_pending()` instead of being silently swallowed.
pending_writes: Mutex<JoinSet<()>>,
}

impl ShadowSentinel {
Expand All @@ -484,6 +497,7 @@ impl ShadowSentinel {
config,
probes_this_turn: AtomicU32::new(0),
session_id: session_id.into(),
pending_writes: Mutex::new(JoinSet::new()),
}
}

Expand Down Expand Up @@ -627,19 +641,20 @@ impl ShadowSentinel {
created_at: unix_now(),
};
let store = self.store.clone();
tokio::spawn(async move {
self.spawn_persist(async move {
if let Err(e) = store.record(&event).await {
tracing::warn!(error = %e, "ShadowSentinel: failed to persist probe result");
}
});
})
.await;

verdict
}

/// Persist a tool execution event in the shadow stream (fire-and-forget).
///
/// Called after a tool finishes execution to maintain the trajectory for future probes.
pub fn record_tool_event(
pub async fn record_tool_event(
&self,
qualified_tool_id: &str,
turn_number: u64,
Expand All @@ -662,11 +677,44 @@ impl ShadowSentinel {
created_at: unix_now(),
};
let store = self.store.clone();
tokio::spawn(async move {
self.spawn_persist(async move {
if let Err(e) = store.record(&event).await {
tracing::warn!(error = %e, "ShadowSentinel: failed to persist tool event");
}
});
})
.await;
}

/// Await all queued fire-and-forget persist tasks.
///
/// Call once at session shutdown to ensure no DB writes are silently dropped.
/// All errors have already been logged inside each task; this method only joins the handles.
pub async fn drain_pending(&self) {
let mut set = self.pending_writes.lock().await;
while set.join_next().await.is_some() {}
}

/// Spawn a background persist task into the bounded `JoinSet`.
///
/// Reaps completed handles before spawning to stay within `MAX_PENDING_WRITES`. If the set
/// is still at capacity after reaping (all tasks still running), the new task is dropped and
/// a debug message is emitted — persistence is best-effort and must never block the tool path.
async fn spawn_persist<F>(&self, fut: F)
where
F: std::future::Future<Output = ()> + Send + 'static,
{
let mut set = self.pending_writes.lock().await;
// Reap only already-finished handles — never block waiting for a running task.
// try_join_next() returns immediately if no task has completed yet.
while set.try_join_next().is_some() {}
if set.len() < MAX_PENDING_WRITES {
set.spawn(fut);
} else {
tracing::debug!(
max = MAX_PENDING_WRITES,
"ShadowSentinel: pending_writes at capacity, skipping persist"
);
}
}

/// Reset the per-turn probe counter.
Expand Down Expand Up @@ -887,6 +935,70 @@ mod tests {
);
}

// ── JoinSet regression tests (#4570) ─────────────────────────────────────

/// `drain_pending` awaits all spawned persist tasks and returns when the set is empty.
#[tokio::test]
async fn drain_pending_awaits_all_tasks() {
use std::sync::atomic::{AtomicU32, Ordering};

let config = zeph_config::ShadowSentinelConfig::default();
let sentinel = make_test_sentinel(config).await;

let counter = Arc::new(AtomicU32::new(0));
for _ in 0..5 {
let c = Arc::clone(&counter);
sentinel
.spawn_persist(async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
c.fetch_add(1, Ordering::Relaxed);
})
.await;
}

sentinel.drain_pending().await;

assert_eq!(
counter.load(Ordering::Relaxed),
5,
"drain_pending must join all 5 tasks before returning"
);
}

/// When the pending set is at capacity and all running tasks complete before the next
/// `spawn_persist`, the new task IS accepted (the set has room after reaping).
/// Conversely, if we fill the set, drain it, then overfill past capacity while tasks are
/// still running — the implementation drops extras. We verify the simpler property:
/// `spawn_persist` never panics when called repeatedly beyond `MAX_PENDING_WRITES`.
#[tokio::test]
async fn spawn_persist_beyond_capacity_does_not_panic() {
use std::sync::atomic::{AtomicU32, Ordering};

let config = zeph_config::ShadowSentinelConfig::default();
let sentinel = make_test_sentinel(config).await;
let counter = Arc::new(AtomicU32::new(0));

// Spawn twice the capacity; each task completes instantly.
// spawn_persist will reap completed tasks between spawns, so most will be accepted.
for _ in 0..(MAX_PENDING_WRITES * 2) {
let c = Arc::clone(&counter);
sentinel
.spawn_persist(async move {
c.fetch_add(1, Ordering::Relaxed);
})
.await;
}

sentinel.drain_pending().await;

// All tasks (or at least MAX_PENDING_WRITES of them) must have run; none panicked.
let ran = counter.load(Ordering::Relaxed);
assert!(
ran >= u32::try_from(MAX_PENDING_WRITES).unwrap(),
"at least MAX_PENDING_WRITES tasks must complete; ran={ran}"
);
}

// Build a minimal ShadowSentinel with a no-op probe for unit tests.
//
// Opens an in-memory SQLite pool. Store methods are never called in these unit
Expand Down
75 changes: 69 additions & 6 deletions crates/zeph-llm/src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1446,20 +1446,38 @@ impl RouterProvider {
let router = self.clone();
let window_size = asi_cfg.window;
let provider_name = provider.to_owned();
let embed_timeout_ms = self.embed_timeout_ms;
tokio::spawn(async move {
let emb = match precomputed_embedding {
Some(e) => e,
None => match router.embed(&response).await {
let emb = if let Some(e) = precomputed_embedding {
e
} else {
let embed_fut = router.embed(&response);
let embed_result = if embed_timeout_ms > 0 {
let timeout = std::time::Duration::from_millis(embed_timeout_ms);
if let Ok(r) = tokio::time::timeout(timeout, embed_fut).await {
r
} else {
tracing::debug!(
provider = provider_name,
timeout_ms = embed_timeout_ms,
"asi: embed timed out, skipping coherence update"
);
return;
}
} else {
embed_fut.await
};
match embed_result {
Ok(e) => e,
Err(e) => {
Err(err) => {
tracing::debug!(
provider = provider_name,
error = %e,
error = %err,
"asi: embed failed, skipping coherence update"
);
return;
}
},
}
};
let mut state = asi.lock();
state.push_embedding(&provider_name, emb, window_size);
Expand Down Expand Up @@ -3936,4 +3954,49 @@ mod tests {
let result = super::blocking_load(|| 42_u32);
assert_eq!(result, 42, "blocking_load must return the closure result");
}

// ── spawn_asi_update timeout regression (#4566) ───────────────────────────

/// Regression for #4566: when `embed()` inside `spawn_asi_update` exceeds `embed_timeout_ms`,
/// the ASI coherence window must NOT be updated (task returns early without pushing embedding).
#[tokio::test]
async fn spawn_asi_update_embed_timeout_does_not_update_asi() {
use crate::mock::MockProvider;
use std::sync::atomic::Ordering;

// Provider that takes 200 ms to embed — well above the 10 ms timeout.
let mut m = MockProvider::with_responses(vec!["ok".to_owned()]);
m.supports_embeddings = true;
m.embedding = vec![1.0, 0.0];
m.embed_delay_ms = 200;
let provider_embed_calls = Arc::clone(&m.embed_call_count);

let r = RouterProvider::new(vec![AnyProvider::Mock(m)])
.with_asi(AsiRouterConfig::default())
.with_embed_timeout(10);

// Inject a sentinel turn id so the debounce does not fire.
r.state.asi_last_turn.store(u64::MAX, Ordering::SeqCst);

// No precomputed embedding → router will attempt to call embed().
r.spawn_asi_update("p1", "response".to_owned(), 1u64, None);

// Wait long enough for the spawned task to reach its timeout and return.
tokio::time::sleep(std::time::Duration::from_millis(150)).await;

// embed() was called (the call was made before the timeout fired).
assert!(
provider_embed_calls.load(Ordering::Relaxed) >= 1,
"embed() must have been attempted"
);

// ASI window must be empty — timeout fired before push_embedding could run.
let asi = r.asi.as_ref().unwrap().lock();
let coherence = asi.coherence("p1");
// coherence() returns 1.0 when the provider is unknown (no entries in the window).
assert!(
(coherence - 1.0).abs() < f32::EPSILON,
"ASI window must be empty after embed timeout; coherence={coherence}"
);
}
}
Loading
Loading