diff --git a/docs/schedule-followup.md b/docs/schedule-followup.md index a0545f67..5fcefa70 100644 --- a/docs/schedule-followup.md +++ b/docs/schedule-followup.md @@ -126,6 +126,37 @@ followup-driven runs distinctly from organic user input: the `AppState` reference is held weakly, no circular `Arc` is formed between `AppState` and `CronService`. +## Boundary handling & retries + +When a followup fires, the originating thread or the dispatch itself may be in a +state that prevents delivery. These cases are handled explicitly rather than +dropped silently: + +- **Thread deleted.** If the originating thread is no longer in the thread store + by the time the followup fires, the run is recorded as `failed_dropped` with a + `thread not found: ` reason. This is non-retryable — retrying cannot bring + the thread back. +- **Transient dispatch failure.** A network/internal error during dispatch is + retried up to `FOLLOWUP_MAX_RETRIES` (3) times with exponential backoff + (≈200ms / 400ms / 800ms). If a retry succeeds, the run is `success` and the + intermediate failures are logged at `warn`. If the budget is exhausted, the + run is recorded as `failed_dropped` with the concrete underlying error. +- **Terminal drops are final.** A `failed_dropped` outcome disables the one-shot + job (and honors `delete_after_run`) so a dropped followup never re-fires on a + later tick. +- **Thread stopped / cancelled.** There is no dedicated dispatch-time signal for + a stopped-but-still-present thread today, so such a thread still receives the + injected turn (which starts a fresh turn). The drop classifier is structured + to accommodate such a signal if one is added later. + +Every drop path emits a `tracing::warn`, and the existing `cron_job_completed` +broadcast event carries the run `status` and reason, so drops are observable and +never silent. + +The `RunRecord.status` enum gains a `failed_dropped` value +(`JobRunStatus::FailedDropped`) distinct from `failed`. It is additive: older +persisted run records without it still deserialize. + ## Observability `schedule_followup` jobs are `system: true`, so they do not show up in the diff --git a/garyx-gateway/src/cron.rs b/garyx-gateway/src/cron.rs index ba8f8eec..4447ffc2 100644 --- a/garyx-gateway/src/cron.rs +++ b/garyx-gateway/src/cron.rs @@ -2,6 +2,7 @@ use std::collections::{HashMap, VecDeque}; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::{Arc, OnceLock, Weak}; +use std::time::Duration; use chrono::{DateTime, Local, LocalResult, NaiveDateTime, TimeZone, Utc}; use chrono_tz::Tz; @@ -46,6 +47,12 @@ const MAX_INTERVAL_SECS: u64 = i64::MAX as u64; pub enum JobRunStatus { Success, Failed, + /// Terminal failure where the run was intentionally dropped rather than + /// retried further: the target thread is gone, or a transient dispatch + /// failure exhausted its retry budget. Distinct from `Failed` so a dropped + /// followup is treated as terminal (one-shot jobs are disabled, see + /// `CronJob::settle_after_run`) and never silently re-fires. + FailedDropped, Running, NeverRun, } @@ -65,6 +72,28 @@ pub struct RunRecord { pub error: Option, } +/// Maximum number of *retries* (i.e. attempts after the first) for a transient +/// internal-dispatch failure before the followup is dropped. Total +/// attempts are `FOLLOWUP_MAX_RETRIES + 1`. +const FOLLOWUP_MAX_RETRIES: u32 = 3; + +/// Base delay for exponential backoff between internal-dispatch retries. The +/// nth retry waits `FOLLOWUP_RETRY_BASE_BACKOFF * 2^n` (≈200ms, 400ms, 800ms). +const FOLLOWUP_RETRY_BASE_BACKOFF: Duration = Duration::from_millis(200); + +/// Classification of a single internal-dispatch attempt outcome. +/// +/// `Dropped` is non-retryable — the target thread is gone (deleted) or the job +/// is structurally unable to dispatch, so retrying cannot help. `Transient` is +/// a network/internal failure worth retrying with backoff. +#[derive(Debug)] +enum FollowupAttemptError { + /// Non-retryable: drop the followup immediately with this reason. + Dropped(String), + /// Retryable transient failure carrying the underlying error text. + Transient(String), +} + /// Persisted state for a single cron job. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct CronJob { @@ -191,6 +220,37 @@ impl CronJob { } } + /// Apply post-run bookkeeping after a run produced `status`, returning + /// whether the job file should be deleted (`delete_after_run` on a terminal + /// outcome). Single source of truth for both the `run_now` and `tick` paths. + /// + /// `Success` advances/disables the schedule as before. `FailedDropped` is a + /// *terminal* failure: one-shot jobs are disabled so a dropped followup is + /// not re-claimed every tick (`is_due` only exempts past-at-registration + /// jobs, so a fired-but-not-advanced `Once` job would otherwise re-fire + /// indefinitely), and `delete_after_run` is honored just like `Success`. + /// All other statuses keep the prior behavior (bump counters, leave the + /// schedule untouched). + fn settle_after_run(&mut self, status: &JobRunStatus, started_at: DateTime) -> bool { + self.last_status = status.clone(); + match status { + JobRunStatus::Success => self.advance(), + JobRunStatus::FailedDropped => { + self.last_run_at = Some(started_at); + self.run_count += 1; + if matches!(self.schedule, CronSchedule::Once { .. }) { + self.enabled = false; + } + } + _ => { + self.last_run_at = Some(started_at); + self.run_count += 1; + } + } + self.delete_after_run + && matches!(status, JobRunStatus::Success | JobRunStatus::FailedDropped) + } + /// Is this job due to run? fn is_due(&self) -> bool { if !self.enabled { @@ -863,14 +923,7 @@ impl CronService { { let mut jobs = self.jobs.write().await; if let Some(j) = jobs.get_mut(id) { - j.last_status = record.status.clone(); - if record.status == JobRunStatus::Success { - j.advance(); - } else { - j.last_run_at = Some(record.started_at); - j.run_count += 1; - } - should_delete = j.delete_after_run && record.status == JobRunStatus::Success; + should_delete = j.settle_after_run(&record.status, record.started_at); if !should_delete { let _ = persist_job(&self.data_dir, j).await; } @@ -970,6 +1023,7 @@ impl CronService { match status { JobRunStatus::Success => "success", JobRunStatus::Failed => "failed", + JobRunStatus::FailedDropped => "dropped", JobRunStatus::Running => "running", JobRunStatus::NeverRun => "unknown", } @@ -1229,14 +1283,7 @@ impl CronService { { let mut map = jobs.write().await; if let Some(j) = map.get_mut(&id) { - j.last_status = record.status.clone(); - if record.status == JobRunStatus::Success { - j.advance(); - } else { - j.last_run_at = Some(record.started_at); - j.run_count += 1; - } - should_delete = j.delete_after_run && record.status == JobRunStatus::Success; + should_delete = j.settle_after_run(&record.status, record.started_at); if !should_delete { let _ = persist_job(data_dir, j).await; } @@ -1289,10 +1336,22 @@ impl CronService { let (status, error) = match &job.kind { CronJobKind::InternalDispatch { payload } => { - match Self::dispatch_internal_followup(job, &run_id, payload, app_state_weak).await + // Boundary fallback: classify drop-vs-transient and + // retry transient dispatch failures with exponential backoff. + // Any terminal failure (thread gone, or retry budget exhausted) + // becomes `FailedDropped` with the reason recorded in the run + // record — never a silent drop. + match Self::dispatch_internal_followup_with_retry( + job, + &run_id, + payload, + app_state_weak, + FOLLOWUP_RETRY_BASE_BACKOFF, + ) + .await { Ok(()) => (JobRunStatus::Success, None), - Err(e) => (JobRunStatus::Failed, Some(e)), + Err(reason) => (JobRunStatus::FailedDropped, Some(reason)), } } CronJobKind::AutomationPrompt => match &job.action { @@ -1376,19 +1435,137 @@ impl CronService { /// so the resumed agent can correlate the followup with its own earlier /// `schedule_followup` call (and so telemetry can distinguish followups /// from organic user input). - async fn dispatch_internal_followup( + /// Drive [`Self::dispatch_internal_followup_once`] with bounded + /// exponential-backoff retry. + /// + /// Returns `Ok(())` on success (possibly after retries) or `Err(reason)` + /// when the followup is dropped — either non-retryably (thread gone) or + /// because the retry budget was exhausted. The reason string is recorded in + /// the run record so a drop is never silent. + async fn dispatch_internal_followup_with_retry( job: &CronJob, run_id: &str, payload: &InternalDispatchJobPayload, app_state_weak: &Arc>>, + base_backoff: Duration, ) -> Result<(), String> { - let thread_id = Self::trimmed_non_empty(job.thread_id.as_deref()) - .ok_or_else(|| format!("cron internal-dispatch job {} is missing thread_id", job.id))?; + Self::run_followup_with_retry( + FOLLOWUP_MAX_RETRIES, + base_backoff, + &job.id, + run_id, + |_attempt| Self::dispatch_internal_followup_once(job, run_id, payload, app_state_weak), + ) + .await + } + + /// Generic retry driver shared by production and tests. + /// + /// Calls `attempt` (receiving the zero-based attempt index) until it + /// succeeds, hits a non-retryable `Dropped` outcome, or exhausts + /// `max_retries` transient failures. Every drop path emits a `tracing::warn` + /// so drops are observable; the nth retry sleeps `base_backoff * 2^n`. + async fn run_followup_with_retry( + max_retries: u32, + base_backoff: Duration, + job_id: &str, + run_id: &str, + mut attempt: F, + ) -> Result<(), String> + where + F: FnMut(u32) -> Fut, + Fut: std::future::Future>, + { + let mut last_error = String::new(); + for n in 0..=max_retries { + match attempt(n).await { + Ok(()) => return Ok(()), + Err(FollowupAttemptError::Dropped(reason)) => { + tracing::warn!( + job_id = %job_id, + run_id = %run_id, + reason = %reason, + "schedule_followup dropped (non-retryable)" + ); + return Err(reason); + } + Err(FollowupAttemptError::Transient(error)) => { + last_error = error; + if n < max_retries { + let backoff = base_backoff * 2u32.pow(n); + tracing::warn!( + job_id = %job_id, + run_id = %run_id, + attempt = n + 1, + max_attempts = max_retries + 1, + backoff_ms = backoff.as_millis() as u64, + error = %last_error, + "schedule_followup dispatch failed; retrying after backoff" + ); + if !backoff.is_zero() { + tokio::time::sleep(backoff).await; + } + } + } + } + } + + let reason = format!( + "dispatch failed after {} retries: {}", + max_retries, last_error + ); + tracing::warn!( + job_id = %job_id, + run_id = %run_id, + reason = %reason, + "schedule_followup dropped (retry budget exhausted)" + ); + Err(reason) + } + + /// Perform a single internal-dispatch attempt, classifying the outcome into + /// retryable vs non-retryable. + /// + /// Builds a synthetic user-turn body from a `schedule_followup` payload and + /// injects it into the originating thread via + /// [`dispatch_internal_message_to_thread`]. The body is prefixed with a + /// `` block so the resumed agent can correlate the + /// followup with its own earlier `schedule_followup` call (and so telemetry + /// can distinguish followups from organic user input). + /// + /// A missing thread_id / app_state back-reference, or a thread that is no + /// longer present in the thread store, yields `Dropped` (retrying cannot + /// help). Any other dispatch error yields `Transient`. + async fn dispatch_internal_followup_once( + job: &CronJob, + run_id: &str, + payload: &InternalDispatchJobPayload, + app_state_weak: &Arc>>, + ) -> Result<(), FollowupAttemptError> { + let thread_id = Self::trimmed_non_empty(job.thread_id.as_deref()).ok_or_else(|| { + FollowupAttemptError::Dropped(format!( + "cron internal-dispatch job {} is missing thread_id", + job.id + )) + })?; let app_state = app_state_weak .get() .and_then(Weak::upgrade) - .ok_or_else(|| "cron app_state back-reference is not installed".to_owned())?; + .ok_or_else(|| { + FollowupAttemptError::Dropped( + "cron app_state back-reference is not installed".to_owned(), + ) + })?; + + // Explicit pre-check: if the originating thread was deleted before the + // followup fired, drop it now rather than relying on string-matching the + // dispatch error. + if app_state.threads.thread_store.get(&thread_id).await.is_none() { + return Err(FollowupAttemptError::Dropped(format!( + "thread not found: {thread_id}" + ))); + } let scheduled_for = job.next_run; let body = build_followup_body(&job.id, payload, scheduled_for); @@ -1434,6 +1611,15 @@ impl CronService { }, ) .await + .map_err(|error| { + // A thread deleted between the pre-check and dispatch surfaces here + // as the dispatch sentinel — still a non-retryable drop. + if error.starts_with("thread not found") { + FollowupAttemptError::Dropped(error) + } else { + FollowupAttemptError::Transient(error) + } + }) } async fn dispatch_agent_turn( diff --git a/garyx-gateway/src/cron/tests.rs b/garyx-gateway/src/cron/tests.rs index 52e14e5a..7e119237 100644 --- a/garyx-gateway/src/cron/tests.rs +++ b/garyx-gateway/src/cron/tests.rs @@ -2052,3 +2052,191 @@ async fn test_internal_dispatch_followup_fires_and_injects_synthetic_user_turn() "verbatim prompt must follow the metadata block" ); } + +// --------------------------------------------------------------------------- +// schedule_followup boundary fallback — drop classification + retry +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn test_followup_retry_drops_immediately_without_retry() { + use std::sync::atomic::{AtomicU32, Ordering}; + + let calls = AtomicU32::new(0); + let result = CronService::run_followup_with_retry( + FOLLOWUP_MAX_RETRIES, + std::time::Duration::ZERO, + "job-drop", + "run-drop", + |_attempt| { + calls.fetch_add(1, Ordering::SeqCst); + async { Err(FollowupAttemptError::Dropped("thread not found: t1".to_owned())) } + }, + ) + .await; + + assert_eq!( + result.unwrap_err(), + "thread not found: t1", + "non-retryable drop returns its reason verbatim" + ); + assert_eq!( + calls.load(Ordering::SeqCst), + 1, + "a Dropped outcome must not be retried" + ); +} + +#[tokio::test] +async fn test_followup_retry_succeeds_after_transient_failures() { + use std::sync::atomic::{AtomicU32, Ordering}; + + let calls = AtomicU32::new(0); + let result = CronService::run_followup_with_retry( + FOLLOWUP_MAX_RETRIES, + std::time::Duration::ZERO, + "job-ok", + "run-ok", + |_attempt| { + let c = calls.fetch_add(1, Ordering::SeqCst); + async move { + if c < 2 { + Err(FollowupAttemptError::Transient(format!("boom {c}"))) + } else { + Ok(()) + } + } + }, + ) + .await; + + assert!( + result.is_ok(), + "transient failures within budget then success must succeed: {result:?}" + ); + assert_eq!( + calls.load(Ordering::SeqCst), + 3, + "two transient failures then a successful third attempt" + ); +} + +#[tokio::test] +async fn test_followup_retry_exhausts_budget_and_drops() { + use std::sync::atomic::{AtomicU32, Ordering}; + + let calls = AtomicU32::new(0); + let result = CronService::run_followup_with_retry( + FOLLOWUP_MAX_RETRIES, + std::time::Duration::ZERO, + "job-exhaust", + "run-exhaust", + |_attempt| { + calls.fetch_add(1, Ordering::SeqCst); + async { Err(FollowupAttemptError::Transient("network down".to_owned())) } + }, + ) + .await; + + let err = result.unwrap_err(); + assert!( + err.contains(&format!("after {FOLLOWUP_MAX_RETRIES} retries")), + "exhausted error names the retry count, got: {err}" + ); + assert!( + err.contains("network down"), + "exhausted error carries the concrete underlying failure, got: {err}" + ); + assert_eq!( + calls.load(Ordering::SeqCst), + FOLLOWUP_MAX_RETRIES + 1, + "one initial attempt plus FOLLOWUP_MAX_RETRIES retries" + ); +} + +#[tokio::test] +async fn test_internal_dispatch_drops_when_thread_missing() { + use garyx_models::config::GaryxConfig; + + let tmp = TempDir::new().unwrap(); + let cron = Arc::new(CronService::new(tmp.path().to_path_buf())); + let _ = ensure_dirs(tmp.path()).await; + + let bridge = Arc::new(garyx_bridge::MultiProviderBridge::new()); + // No thread is seeded: the originating thread was deleted before the + // followup fired. The pre-check short-circuits before any provider call. + let state = crate::server::AppStateBuilder::new(GaryxConfig::default()) + .with_bridge(bridge.clone()) + .with_cron_service(cron.clone()) + .with_auto_research_store(Arc::new(crate::auto_research::AutoResearchStore::new())) + .with_agent_team_store(Arc::new(crate::agent_teams::AgentTeamStore::new())) + .with_custom_agent_store(Arc::new(crate::custom_agents::CustomAgentStore::new())) + .build(); + // Keep the builder result alive for the duration of the tick so the cron + // service's weak app_state back-reference can upgrade. + let _state = state; + + let thread_id = "thread::followup-deleted-target"; + let run_id = "run-from-test"; + let job_id = crate::mcp::tools::schedule_followup::followup_job_id(thread_id, run_id); + let scheduled_at = Utc::now(); + let scheduled_for = scheduled_at + chrono::Duration::milliseconds(200); + let payload = garyx_models::config::InternalDispatchJobPayload { + prompt: "resume verification".to_owned(), + reason: Some("test reason".to_owned()), + originating_run_id: Some(run_id.to_owned()), + scheduled_at, + delay_seconds_requested: 60, + }; + cron.upsert(CronJobConfig { + id: job_id.clone(), + kind: CronJobKind::InternalDispatch { + payload: payload.clone(), + }, + label: None, + schedule: CronSchedule::Once { + at: scheduled_for.to_rfc3339(), + }, + ui_schedule: None, + action: CronAction::Log, + target: None, + message: None, + workspace_dir: None, + agent_id: None, + thread_id: Some(thread_id.to_owned()), + delete_after_run: true, + enabled: true, + system: true, + }) + .await + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(400)).await; + CronService::tick( + &cron.jobs, + &cron.runs, + &cron.active_agent_runs, + tmp.path(), + None, + &cron.dispatch_runtime, + &cron.app_state_weak, + &cron.garyx_db, + ) + .await; + + let runs = cron.list_runs(10, 0).await; + assert_eq!(runs.len(), 1, "exactly one run should be recorded"); + assert_eq!( + runs[0].status, + JobRunStatus::FailedDropped, + "a deleted thread must produce a dropped run, got: {:?}", + runs[0].status + ); + let reason = runs[0].error.as_deref().unwrap_or_default(); + assert!( + reason.contains("thread not found"), + "drop reason must explain the missing thread, got: {reason:?}" + ); + // The persisted (serde) form must be exactly "failed_dropped" per AC. + let serialized = serde_json::to_string(&runs[0].status).unwrap(); + assert_eq!(serialized, "\"failed_dropped\""); +}