Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docs/schedule-followup.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,37 @@ followup-driven runs distinctly from organic user input:
the `AppState` reference is held weakly, no circular `Arc` is
formed between `AppState` and `CronService`.

## Boundary handling & retries

When a followup fires, the originating thread or the dispatch itself may be in a
state that prevents delivery. These cases are handled explicitly rather than
dropped silently:

- **Thread deleted.** If the originating thread is no longer in the thread store
by the time the followup fires, the run is recorded as `failed_dropped` with a
`thread not found: <id>` reason. This is non-retryable — retrying cannot bring
the thread back.
- **Transient dispatch failure.** A network/internal error during dispatch is
retried up to `FOLLOWUP_MAX_RETRIES` (3) times with exponential backoff
(≈200ms / 400ms / 800ms). If a retry succeeds, the run is `success` and the
intermediate failures are logged at `warn`. If the budget is exhausted, the
run is recorded as `failed_dropped` with the concrete underlying error.
- **Terminal drops are final.** A `failed_dropped` outcome disables the one-shot
job (and honors `delete_after_run`) so a dropped followup never re-fires on a
later tick.
- **Thread stopped / cancelled.** There is no dedicated dispatch-time signal for
a stopped-but-still-present thread today, so such a thread still receives the
injected turn (which starts a fresh turn). The drop classifier is structured
to accommodate such a signal if one is added later.

Every drop path emits a `tracing::warn`, and the existing `cron_job_completed`
broadcast event carries the run `status` and reason, so drops are observable and
never silent.

The `RunRecord.status` enum gains a `failed_dropped` value
(`JobRunStatus::FailedDropped`) distinct from `failed`. It is additive: older
persisted run records without it still deserialize.

## Observability

`schedule_followup` jobs are `system: true`, so they do not show up in the
Expand Down
230 changes: 208 additions & 22 deletions garyx-gateway/src/cron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::{HashMap, VecDeque};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::{Arc, OnceLock, Weak};
use std::time::Duration;

use chrono::{DateTime, Local, LocalResult, NaiveDateTime, TimeZone, Utc};
use chrono_tz::Tz;
Expand Down Expand Up @@ -46,6 +47,12 @@ const MAX_INTERVAL_SECS: u64 = i64::MAX as u64;
pub enum JobRunStatus {
Success,
Failed,
/// Terminal failure where the run was intentionally dropped rather than
/// retried further: the target thread is gone, or a transient dispatch
/// failure exhausted its retry budget. Distinct from `Failed` so a dropped
/// followup is treated as terminal (one-shot jobs are disabled, see
/// `CronJob::settle_after_run`) and never silently re-fires.
FailedDropped,
Running,
NeverRun,
}
Expand All @@ -65,6 +72,28 @@ pub struct RunRecord {
pub error: Option<String>,
}

/// Maximum number of *retries* (i.e. attempts after the first) for a transient
/// internal-dispatch failure before the followup is dropped. Total
/// attempts are `FOLLOWUP_MAX_RETRIES + 1`.
const FOLLOWUP_MAX_RETRIES: u32 = 3;

/// Base delay for exponential backoff between internal-dispatch retries. The
/// nth retry waits `FOLLOWUP_RETRY_BASE_BACKOFF * 2^n` (≈200ms, 400ms, 800ms).
const FOLLOWUP_RETRY_BASE_BACKOFF: Duration = Duration::from_millis(200);

/// Classification of a single internal-dispatch attempt outcome.
///
/// `Dropped` is non-retryable — the target thread is gone (deleted) or the job
/// is structurally unable to dispatch, so retrying cannot help. `Transient` is
/// a network/internal failure worth retrying with backoff.
#[derive(Debug)]
enum FollowupAttemptError {
/// Non-retryable: drop the followup immediately with this reason.
Dropped(String),
/// Retryable transient failure carrying the underlying error text.
Transient(String),
}

/// Persisted state for a single cron job.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CronJob {
Expand Down Expand Up @@ -191,6 +220,37 @@ impl CronJob {
}
}

/// Apply post-run bookkeeping after a run produced `status`, returning
/// whether the job file should be deleted (`delete_after_run` on a terminal
/// outcome). Single source of truth for both the `run_now` and `tick` paths.
///
/// `Success` advances/disables the schedule as before. `FailedDropped` is a
/// *terminal* failure: one-shot jobs are disabled so a dropped followup is
/// not re-claimed every tick (`is_due` only exempts past-at-registration
/// jobs, so a fired-but-not-advanced `Once` job would otherwise re-fire
/// indefinitely), and `delete_after_run` is honored just like `Success`.
/// All other statuses keep the prior behavior (bump counters, leave the
/// schedule untouched).
fn settle_after_run(&mut self, status: &JobRunStatus, started_at: DateTime<Utc>) -> bool {
self.last_status = status.clone();
match status {
JobRunStatus::Success => self.advance(),
JobRunStatus::FailedDropped => {
self.last_run_at = Some(started_at);
self.run_count += 1;
if matches!(self.schedule, CronSchedule::Once { .. }) {
self.enabled = false;
}
}
_ => {
self.last_run_at = Some(started_at);
self.run_count += 1;
}
}
self.delete_after_run
&& matches!(status, JobRunStatus::Success | JobRunStatus::FailedDropped)
}

/// Is this job due to run?
fn is_due(&self) -> bool {
if !self.enabled {
Expand Down Expand Up @@ -863,14 +923,7 @@ impl CronService {
{
let mut jobs = self.jobs.write().await;
if let Some(j) = jobs.get_mut(id) {
j.last_status = record.status.clone();
if record.status == JobRunStatus::Success {
j.advance();
} else {
j.last_run_at = Some(record.started_at);
j.run_count += 1;
}
should_delete = j.delete_after_run && record.status == JobRunStatus::Success;
should_delete = j.settle_after_run(&record.status, record.started_at);
if !should_delete {
let _ = persist_job(&self.data_dir, j).await;
}
Expand Down Expand Up @@ -970,6 +1023,7 @@ impl CronService {
match status {
JobRunStatus::Success => "success",
JobRunStatus::Failed => "failed",
JobRunStatus::FailedDropped => "dropped",
JobRunStatus::Running => "running",
JobRunStatus::NeverRun => "unknown",
}
Expand Down Expand Up @@ -1229,14 +1283,7 @@ impl CronService {
{
let mut map = jobs.write().await;
if let Some(j) = map.get_mut(&id) {
j.last_status = record.status.clone();
if record.status == JobRunStatus::Success {
j.advance();
} else {
j.last_run_at = Some(record.started_at);
j.run_count += 1;
}
should_delete = j.delete_after_run && record.status == JobRunStatus::Success;
should_delete = j.settle_after_run(&record.status, record.started_at);
if !should_delete {
let _ = persist_job(data_dir, j).await;
}
Expand Down Expand Up @@ -1289,10 +1336,22 @@ impl CronService {

let (status, error) = match &job.kind {
CronJobKind::InternalDispatch { payload } => {
match Self::dispatch_internal_followup(job, &run_id, payload, app_state_weak).await
// Boundary fallback: classify drop-vs-transient and
// retry transient dispatch failures with exponential backoff.
// Any terminal failure (thread gone, or retry budget exhausted)
// becomes `FailedDropped` with the reason recorded in the run
// record — never a silent drop.
match Self::dispatch_internal_followup_with_retry(
job,
&run_id,
payload,
app_state_weak,
FOLLOWUP_RETRY_BASE_BACKOFF,
)
.await
{
Ok(()) => (JobRunStatus::Success, None),
Err(e) => (JobRunStatus::Failed, Some(e)),
Err(reason) => (JobRunStatus::FailedDropped, Some(reason)),
}
}
CronJobKind::AutomationPrompt => match &job.action {
Expand Down Expand Up @@ -1376,19 +1435,137 @@ impl CronService {
/// so the resumed agent can correlate the followup with its own earlier
/// `schedule_followup` call (and so telemetry can distinguish followups
/// from organic user input).
async fn dispatch_internal_followup(
/// Drive [`Self::dispatch_internal_followup_once`] with bounded
/// exponential-backoff retry.
///
/// Returns `Ok(())` on success (possibly after retries) or `Err(reason)`
/// when the followup is dropped — either non-retryably (thread gone) or
/// because the retry budget was exhausted. The reason string is recorded in
/// the run record so a drop is never silent.
async fn dispatch_internal_followup_with_retry(
job: &CronJob,
run_id: &str,
payload: &InternalDispatchJobPayload,
app_state_weak: &Arc<OnceLock<Weak<AppState>>>,
base_backoff: Duration,
) -> Result<(), String> {
let thread_id = Self::trimmed_non_empty(job.thread_id.as_deref())
.ok_or_else(|| format!("cron internal-dispatch job {} is missing thread_id", job.id))?;
Self::run_followup_with_retry(
FOLLOWUP_MAX_RETRIES,
base_backoff,
&job.id,
run_id,
|_attempt| Self::dispatch_internal_followup_once(job, run_id, payload, app_state_weak),
)
.await
}

/// Generic retry driver shared by production and tests.
///
/// Calls `attempt` (receiving the zero-based attempt index) until it
/// succeeds, hits a non-retryable `Dropped` outcome, or exhausts
/// `max_retries` transient failures. Every drop path emits a `tracing::warn`
/// so drops are observable; the nth retry sleeps `base_backoff * 2^n`.
async fn run_followup_with_retry<F, Fut>(
max_retries: u32,
base_backoff: Duration,
job_id: &str,
run_id: &str,
mut attempt: F,
) -> Result<(), String>
where
F: FnMut(u32) -> Fut,
Fut: std::future::Future<Output = Result<(), FollowupAttemptError>>,
{
let mut last_error = String::new();
for n in 0..=max_retries {
match attempt(n).await {
Ok(()) => return Ok(()),
Err(FollowupAttemptError::Dropped(reason)) => {
tracing::warn!(
job_id = %job_id,
run_id = %run_id,
reason = %reason,
"schedule_followup dropped (non-retryable)"
);
return Err(reason);
}
Err(FollowupAttemptError::Transient(error)) => {
last_error = error;
if n < max_retries {
let backoff = base_backoff * 2u32.pow(n);
tracing::warn!(
job_id = %job_id,
run_id = %run_id,
attempt = n + 1,
max_attempts = max_retries + 1,
backoff_ms = backoff.as_millis() as u64,
error = %last_error,
"schedule_followup dispatch failed; retrying after backoff"
);
if !backoff.is_zero() {
tokio::time::sleep(backoff).await;
}
}
}
}
}

let reason = format!(
"dispatch failed after {} retries: {}",
max_retries, last_error
);
tracing::warn!(
job_id = %job_id,
run_id = %run_id,
reason = %reason,
"schedule_followup dropped (retry budget exhausted)"
);
Err(reason)
}

/// Perform a single internal-dispatch attempt, classifying the outcome into
/// retryable vs non-retryable.
///
/// Builds a synthetic user-turn body from a `schedule_followup` payload and
/// injects it into the originating thread via
/// [`dispatch_internal_message_to_thread`]. The body is prefixed with a
/// `<garyx_followup_metadata>` block so the resumed agent can correlate the
/// followup with its own earlier `schedule_followup` call (and so telemetry
/// can distinguish followups from organic user input).
///
/// A missing thread_id / app_state back-reference, or a thread that is no
/// longer present in the thread store, yields `Dropped` (retrying cannot
/// help). Any other dispatch error yields `Transient`.
async fn dispatch_internal_followup_once(
job: &CronJob,
run_id: &str,
payload: &InternalDispatchJobPayload,
app_state_weak: &Arc<OnceLock<Weak<AppState>>>,
) -> Result<(), FollowupAttemptError> {
let thread_id = Self::trimmed_non_empty(job.thread_id.as_deref()).ok_or_else(|| {
FollowupAttemptError::Dropped(format!(
"cron internal-dispatch job {} is missing thread_id",
job.id
))
})?;

let app_state = app_state_weak
.get()
.and_then(Weak::upgrade)
.ok_or_else(|| "cron app_state back-reference is not installed".to_owned())?;
.ok_or_else(|| {
FollowupAttemptError::Dropped(
"cron app_state back-reference is not installed".to_owned(),
)
})?;

// Explicit pre-check: if the originating thread was deleted before the
// followup fired, drop it now rather than relying on string-matching the
// dispatch error.
if app_state.threads.thread_store.get(&thread_id).await.is_none() {
return Err(FollowupAttemptError::Dropped(format!(
"thread not found: {thread_id}"
)));
}

let scheduled_for = job.next_run;
let body = build_followup_body(&job.id, payload, scheduled_for);
Expand Down Expand Up @@ -1434,6 +1611,15 @@ impl CronService {
},
)
.await
.map_err(|error| {
// A thread deleted between the pre-check and dispatch surfaces here
// as the dispatch sentinel — still a non-retryable drop.
if error.starts_with("thread not found") {
FollowupAttemptError::Dropped(error)
} else {
FollowupAttemptError::Transient(error)
}
})
}

async fn dispatch_agent_turn(
Expand Down
Loading
Loading