diff --git a/README.md b/README.md
index 8f6b55b..6be4388 100644
--- a/README.md
+++ b/README.md
@@ -142,7 +142,7 @@ async fn main() -> Result<(), CanoError> {
.add_exit_state(FlowState::Complete);
// 5. Run.
- let result = workflow.orchestrate(FlowState::Start).await?;
+ let result = workflow.orchestrate(FlowState::Start, CancellationToken::disabled()).await?;
println!("Workflow finished: {:?}", result);
Ok(())
diff --git a/cano-e2e/src/bin/cano_workflow_app.rs b/cano-e2e/src/bin/cano_workflow_app.rs
index 260aad7..3b609ea 100644
--- a/cano-e2e/src/bin/cano_workflow_app.rs
+++ b/cano-e2e/src/bin/cano_workflow_app.rs
@@ -13,6 +13,7 @@
use std::str::FromStr;
use std::sync::Arc;
+use cano::CancellationToken;
use cano_e2e::{Faults, Phase, PostgresCheckpointStore, StdoutTracer, build_workflow};
#[tokio::main]
@@ -62,8 +63,16 @@ async fn main() -> anyhow::Result<()> {
emit(&format!("READY {workflow_id} {mode}"));
let result = match mode.as_str() {
- "resume" => workflow.resume_from(workflow_id.clone()).await,
- "run" => workflow.orchestrate(Phase::Reserve).await,
+ "resume" => {
+ workflow
+ .resume_from(workflow_id.clone(), CancellationToken::disabled())
+ .await
+ }
+ "run" => {
+ workflow
+ .orchestrate(Phase::Reserve, CancellationToken::disabled())
+ .await
+ }
other => anyhow::bail!("unknown mode {other:?}"),
};
match result {
diff --git a/cano-macros/tests/batch_task_impl.rs b/cano-macros/tests/batch_task_impl.rs
index ff59645..9aec98e 100644
--- a/cano-macros/tests/batch_task_impl.rs
+++ b/cano-macros/tests/batch_task_impl.rs
@@ -105,7 +105,10 @@ async fn inherent_inferred_integrates_with_workflow() {
.register(Step::Process, InherentInferred)
.add_exit_state(Step::Done);
- let result = workflow.orchestrate(Step::Process).await.unwrap();
+ let result = workflow
+ .orchestrate(Step::Process, CancellationToken::disabled())
+ .await
+ .unwrap();
assert_eq!(result, Step::Done);
}
@@ -197,7 +200,10 @@ async fn inherent_with_key_integrates_with_workflow() {
.register(Step::Process, InherentWithKey)
.add_exit_state(Step::Done);
- let result = workflow.orchestrate(Step::Process).await.unwrap();
+ let result = workflow
+ .orchestrate(Step::Process, CancellationToken::disabled())
+ .await
+ .unwrap();
assert_eq!(result, Step::Done);
}
@@ -394,7 +400,10 @@ async fn trait_form_integrates_with_workflow() {
.register(Step::Process, TraitBatch)
.add_exit_state(Step::Done);
- let result = workflow.orchestrate(Step::Process).await.unwrap();
+ let result = workflow
+ .orchestrate(Step::Process, CancellationToken::disabled())
+ .await
+ .unwrap();
assert_eq!(result, Step::Done);
}
@@ -534,7 +543,10 @@ async fn end_to_end_workflow_load_process_finish() {
.register(Step::Process, LoadStep)
.add_exit_state(Step::Done);
- let result = workflow.orchestrate(Step::Process).await.unwrap();
+ let result = workflow
+ .orchestrate(Step::Process, CancellationToken::disabled())
+ .await
+ .unwrap();
assert_eq!(result, Step::Done);
let output: Vec = store.get("output").unwrap();
diff --git a/cano-macros/tests/compensatable_task_impl.rs b/cano-macros/tests/compensatable_task_impl.rs
index 6f8bde2..3172527 100644
--- a/cano-macros/tests/compensatable_task_impl.rs
+++ b/cano-macros/tests/compensatable_task_impl.rs
@@ -72,7 +72,10 @@ async fn inherent_compensatable_impl_registers_and_compensates() {
.register(Step::Boom, Boom)
.add_exit_state(Step::Done);
- let err = workflow.orchestrate(Step::Reserve).await.unwrap_err();
+ let err = workflow
+ .orchestrate(Step::Reserve, CancellationToken::disabled())
+ .await
+ .unwrap_err();
assert_eq!(err.message(), "boom"); // clean rollback -> the original failure is surfaced
assert!(
compensated.load(Ordering::SeqCst),
@@ -85,7 +88,10 @@ async fn inherent_compensatable_impl_registers_and_compensates() {
.register_with_compensation(Step::Reserve, ReserveNamed)
.add_exit_state(Step::Done);
assert_eq!(
- workflow.orchestrate(Step::Reserve).await.unwrap(),
+ workflow
+ .orchestrate(Step::Reserve, CancellationToken::disabled())
+ .await
+ .unwrap(),
Step::Done
);
}
diff --git a/cano-macros/tests/poll_task_impl.rs b/cano-macros/tests/poll_task_impl.rs
index 7b467da..b27622e 100644
--- a/cano-macros/tests/poll_task_impl.rs
+++ b/cano-macros/tests/poll_task_impl.rs
@@ -219,7 +219,10 @@ async fn inherent_poller_integrates_with_workflow() {
.register(Step::Poll, InherentPoller)
.add_exit_state(Step::Done);
- let result = workflow.orchestrate(Step::Poll).await.unwrap();
+ let result = workflow
+ .orchestrate(Step::Poll, CancellationToken::disabled())
+ .await
+ .unwrap();
assert_eq!(result, Step::Done);
}
@@ -229,7 +232,10 @@ async fn trait_poller_integrates_with_workflow() {
.register(Step::Poll, TraitPoller)
.add_exit_state(Step::Done);
- let result = workflow.orchestrate(Step::Poll).await.unwrap();
+ let result = workflow
+ .orchestrate(Step::Poll, CancellationToken::disabled())
+ .await
+ .unwrap();
assert_eq!(result, Step::Done);
}
diff --git a/cano-macros/tests/router_task_impl.rs b/cano-macros/tests/router_task_impl.rs
index 30ae1ff..00b0f17 100644
--- a/cano-macros/tests/router_task_impl.rs
+++ b/cano-macros/tests/router_task_impl.rs
@@ -192,7 +192,10 @@ async fn inherent_router_integrates_with_workflow() {
.register(Step::PathA, PathATask)
.add_exit_state(Step::Done);
- let result = workflow.orchestrate(Step::Route).await.unwrap();
+ let result = workflow
+ .orchestrate(Step::Route, CancellationToken::disabled())
+ .await
+ .unwrap();
assert_eq!(result, Step::Done);
}
@@ -203,6 +206,9 @@ async fn trait_router_integrates_with_workflow() {
.register(Step::PathA, PathATask)
.add_exit_state(Step::Done);
- let result = workflow.orchestrate(Step::Route).await.unwrap();
+ let result = workflow
+ .orchestrate(Step::Route, CancellationToken::disabled())
+ .await
+ .unwrap();
assert_eq!(result, Step::Done);
}
diff --git a/cano-macros/tests/stepped_task_impl.rs b/cano-macros/tests/stepped_task_impl.rs
index 3f55a28..f26c827 100644
--- a/cano-macros/tests/stepped_task_impl.rs
+++ b/cano-macros/tests/stepped_task_impl.rs
@@ -380,7 +380,10 @@ async fn stepped_task_in_workflow() {
.register(MyState::Work, stepper)
.add_exit_state(MyState::Done);
- let result = workflow.orchestrate(MyState::Work).await.unwrap();
+ let result = workflow
+ .orchestrate(MyState::Work, CancellationToken::disabled())
+ .await
+ .unwrap();
assert_eq!(result, MyState::Done);
}
diff --git a/cano-macros/tests/task_impl_name.rs b/cano-macros/tests/task_impl_name.rs
index 5a2c747..cc3bf3c 100644
--- a/cano-macros/tests/task_impl_name.rs
+++ b/cano-macros/tests/task_impl_name.rs
@@ -55,5 +55,11 @@ async fn task_still_runs_in_a_workflow() {
let workflow = Workflow::bare()
.register(Step::Start, NamedInherentTask)
.add_exit_state(Step::Done);
- assert_eq!(workflow.orchestrate(Step::Start).await.unwrap(), Step::Done);
+ assert_eq!(
+ workflow
+ .orchestrate(Step::Start, CancellationToken::disabled())
+ .await
+ .unwrap(),
+ Step::Done
+ );
}
diff --git a/cano-macros/tests/timer_task_impl.rs b/cano-macros/tests/timer_task_impl.rs
index 5ef14f2..d663f77 100644
--- a/cano-macros/tests/timer_task_impl.rs
+++ b/cano-macros/tests/timer_task_impl.rs
@@ -212,7 +212,10 @@ async fn inherent_timer_integrates_with_workflow() {
.register(Step::Wait, TraitTimer)
.add_exit_state(Step::Done);
- let result = workflow.orchestrate(Step::Wait).await.unwrap();
+ let result = workflow
+ .orchestrate(Step::Wait, CancellationToken::disabled())
+ .await
+ .unwrap();
assert_eq!(result, Step::Done);
}
diff --git a/cano/Cargo.toml b/cano/Cargo.toml
index 3ffae3b..13691c3 100644
--- a/cano/Cargo.toml
+++ b/cano/Cargo.toml
@@ -101,6 +101,11 @@ name = "scheduler_graceful_shutdown"
path = "examples/scheduler_graceful_shutdown.rs"
required-features = ["scheduler"]
+[[example]]
+name = "scheduler_cancellation"
+path = "examples/scheduler_cancellation.rs"
+required-features = ["scheduler"]
+
[[example]]
name = "scheduler_mixed_workflows"
path = "examples/scheduler_mixed_workflows.rs"
@@ -201,6 +206,10 @@ path = "examples/saga_payment.rs"
name = "workflow_total_timeout"
path = "examples/workflow_total_timeout.rs"
+[[example]]
+name = "workflow_cancellation"
+path = "examples/workflow_cancellation.rs"
+
[[example]]
name = "router_task"
path = "examples/router_task.rs"
diff --git a/cano/benches/workflow_performance.rs b/cano/benches/workflow_performance.rs
index ee4cbd5..f85e700 100644
--- a/cano/benches/workflow_performance.rs
+++ b/cano/benches/workflow_performance.rs
@@ -577,7 +577,9 @@ fn bench_orchestrate_overhead(c: &mut Criterion) {
b.to_async(&runtime).iter(|| {
let workflow = Arc::clone(&workflow);
async move {
- let _ = workflow.orchestrate(S::Done).await;
+ let _ = workflow
+ .orchestrate(S::Done, CancellationToken::disabled())
+ .await;
}
});
});
@@ -617,7 +619,9 @@ fn bench_large_split_collect(c: &mut Criterion) {
.add_exit_state(S::Done);
b.to_async(&runtime).iter(|| async {
- let _ = workflow.orchestrate(S::Start).await;
+ let _ = workflow
+ .orchestrate(S::Start, CancellationToken::disabled())
+ .await;
});
});
}
@@ -659,7 +663,9 @@ fn bench_tracing_overhead(c: &mut Criterion) {
b.to_async(&runtime).iter(|| {
let workflow = Arc::clone(&workflow);
async move {
- let _ = workflow.orchestrate(S::Done).await;
+ let _ = workflow
+ .orchestrate(S::Done, CancellationToken::disabled())
+ .await;
}
});
});
diff --git a/cano/examples/ai_workflow_yes_and.rs b/cano/examples/ai_workflow_yes_and.rs
index 96ba928..3849df7 100644
--- a/cano/examples/ai_workflow_yes_and.rs
+++ b/cano/examples/ai_workflow_yes_and.rs
@@ -318,7 +318,9 @@ async fn main() -> Result<(), CanoError> {
println!("Starting improvised story...\n");
- let final_state = workflow.orchestrate(ConversationState::Start).await?;
+ let final_state = workflow
+ .orchestrate(ConversationState::Start, CancellationToken::disabled())
+ .await?;
println!("\nStory completed with state: {final_state:?}");
diff --git a/cano/examples/batch_task.rs b/cano/examples/batch_task.rs
index a93230b..6726e78 100644
--- a/cano/examples/batch_task.rs
+++ b/cano/examples/batch_task.rs
@@ -240,7 +240,9 @@ async fn main() -> CanoResult<()> {
.register(Step::Summarise, Summarise { url_count })
.add_exit_state(Step::Done);
- let result = workflow.orchestrate(Step::ParseUrls).await?;
+ let result = workflow
+ .orchestrate(Step::ParseUrls, CancellationToken::disabled())
+ .await?;
assert_eq!(result, Step::Done);
println!("\ncompleted at {result:?}");
diff --git a/cano/examples/circuit_breaker.rs b/cano/examples/circuit_breaker.rs
index 5c7ad63..80283bb 100644
--- a/cano/examples/circuit_breaker.rs
+++ b/cano/examples/circuit_breaker.rs
@@ -98,7 +98,9 @@ async fn main() -> Result<(), CanoError> {
println!("Phase 1 — dependency unhealthy, threshold = 3.");
for attempt in 1..=5 {
- let outcome = workflow.orchestrate(Step::Call).await;
+ let outcome = workflow
+ .orchestrate(Step::Call, CancellationToken::disabled())
+ .await;
// `orchestrate` wraps task failures in `WithStateContext`; unwrap one layer
// before pattern-matching on the underlying variant.
let label = match outcome {
@@ -125,7 +127,10 @@ async fn main() -> Result<(), CanoError> {
println!("Phase 3 — half-open trial probes the dependency, then closes the breaker.");
for attempt in 1..=3 {
- match workflow.orchestrate(Step::Call).await {
+ match workflow
+ .orchestrate(Step::Call, CancellationToken::disabled())
+ .await
+ {
Ok(_) => println!(
" recovery call {attempt}: ok | state={:?}",
breaker.state()
diff --git a/cano/examples/circuit_breaker_manual.rs b/cano/examples/circuit_breaker_manual.rs
index e0539b3..902713b 100644
--- a/cano/examples/circuit_breaker_manual.rs
+++ b/cano/examples/circuit_breaker_manual.rs
@@ -151,7 +151,9 @@ async fn main() -> Result<(), CanoError> {
// ------------------------------------------------------------------
println!("Phase 1: dependency unhealthy (threshold = 3 consecutive failures)");
for attempt in 1..=5 {
- let outcome = workflow.orchestrate(Step::Call).await;
+ let outcome = workflow
+ .orchestrate(Step::Call, CancellationToken::disabled())
+ .await;
// `orchestrate` wraps task failures in `WithStateContext`; unwrap one layer
// before pattern-matching on the underlying variant.
let label = match &outcome {
@@ -185,7 +187,10 @@ async fn main() -> Result<(), CanoError> {
// ------------------------------------------------------------------
println!("\nPhase 3: half-open trial — one probe closes the breaker");
for attempt in 1..=3 {
- match workflow.orchestrate(Step::Call).await {
+ match workflow
+ .orchestrate(Step::Call, CancellationToken::disabled())
+ .await
+ {
Ok(_) => println!(" call {attempt}: ok | breaker={:?}", breaker.state()),
Err(e) => println!(" call {attempt}: err: {e} | breaker={:?}", breaker.state()),
}
diff --git a/cano/examples/custom_checkpoint_store.rs b/cano/examples/custom_checkpoint_store.rs
index 7ff97dc..49f0e74 100644
--- a/cano/examples/custom_checkpoint_store.rs
+++ b/cano/examples/custom_checkpoint_store.rs
@@ -173,7 +173,10 @@ async fn main() -> Result<(), Box> {
// --- First run: Process crashes; the checkpoint log is kept. -------
println!("=== run 1: Process will crash ===");
- match workflow.orchestrate(Step::Init).await {
+ match workflow
+ .orchestrate(Step::Init, CancellationToken::disabled())
+ .await
+ {
Ok(s) => println!(" completed at {s:?} (unexpected)"),
Err(e) => println!(" stopped with error: {e}"),
}
@@ -201,7 +204,9 @@ async fn main() -> Result<(), Box> {
// --- Second run: resume from last checkpoint (Process, attempt 2). ---
println!("\n=== run 2: resume_from ===");
- let final_state = workflow.resume_from(run_id).await?;
+ let final_state = workflow
+ .resume_from(run_id, CancellationToken::disabled())
+ .await?;
println!(" reached {final_state:?}");
assert_eq!(final_state, Step::Done);
diff --git a/cano/examples/join_strategies.rs b/cano/examples/join_strategies.rs
index 76cd54e..c550357 100644
--- a/cano/examples/join_strategies.rs
+++ b/cano/examples/join_strategies.rs
@@ -110,7 +110,9 @@ async fn run_strategy(label: &str, strategy: JoinStrategy) -> CanoResult<()> {
.add_exit_state(Step::Done);
let start = Instant::now();
- let result = workflow.orchestrate(Step::Parallel).await?;
+ let result = workflow
+ .orchestrate(Step::Parallel, CancellationToken::disabled())
+ .await?;
let elapsed = start.elapsed();
// Count how many workers managed to log a result before being cancelled.
diff --git a/cano/examples/metrics_demo.rs b/cano/examples/metrics_demo.rs
index e249564..8ba6f01 100644
--- a/cano/examples/metrics_demo.rs
+++ b/cano/examples/metrics_demo.rs
@@ -60,7 +60,7 @@ async fn main() {
// Run the workflow 3 times directly.
for _ in 0..3 {
workflow()
- .orchestrate(Step::Fetch)
+ .orchestrate(Step::Fetch, CancellationToken::disabled())
.await
.expect("workflow run");
}
diff --git a/cano/examples/metrics_tracing_context.rs b/cano/examples/metrics_tracing_context.rs
index 49c2426..8c55b4a 100644
--- a/cano/examples/metrics_tracing_context.rs
+++ b/cano/examples/metrics_tracing_context.rs
@@ -94,7 +94,7 @@ async fn main() {
// Path 1: Cano's own `workflow_orchestrate` span carries `workflow_id`.
workflow()
.with_workflow_id("demo-run-1")
- .orchestrate(Step::Fetch)
+ .orchestrate(Step::Fetch, CancellationToken::disabled())
.await
.expect("workflow run");
@@ -103,7 +103,7 @@ async fn main() {
let span = info_span!("api_request", request_id = "abc");
let _enter = span.enter();
workflow()
- .orchestrate(Step::Fetch)
+ .orchestrate(Step::Fetch, CancellationToken::disabled())
.await
.expect("workflow run");
}
diff --git a/cano/examples/mixed_workflow.rs b/cano/examples/mixed_workflow.rs
index 276b664..77b66c5 100644
--- a/cano/examples/mixed_workflow.rs
+++ b/cano/examples/mixed_workflow.rs
@@ -240,7 +240,10 @@ async fn main() -> CanoResult<()> {
.register(WorkflowState::GenerateReport, ReportTask)
.add_exit_states(vec![WorkflowState::Complete]);
- match workflow.orchestrate(WorkflowState::GenerateData).await {
+ match workflow
+ .orchestrate(WorkflowState::GenerateData, CancellationToken::disabled())
+ .await
+ {
Ok(_final_state) => {
println!("\nWorkflow completed successfully!");
diff --git a/cano/examples/observer_metrics.rs b/cano/examples/observer_metrics.rs
index 04ff78a..6e7d678 100644
--- a/cano/examples/observer_metrics.rs
+++ b/cano/examples/observer_metrics.rs
@@ -141,7 +141,10 @@ async fn main() -> Result<(), Box> {
.with_observer(metrics.clone());
for run in 1..=2 {
- match workflow.orchestrate(Step::Start).await {
+ match workflow
+ .orchestrate(Step::Start, CancellationToken::disabled())
+ .await
+ {
Ok(state) => println!("run {run}: reached {state:?}"),
Err(error) => println!("run {run}: stopped — {error}"),
}
diff --git a/cano/examples/panic_safety.rs b/cano/examples/panic_safety.rs
index 9d26a18..6b3359f 100644
--- a/cano/examples/panic_safety.rs
+++ b/cano/examples/panic_safety.rs
@@ -127,7 +127,10 @@ async fn main() -> Result<(), Box> {
.register(Step::PanicTask, Panicker)
.add_exit_state(Step::Done);
- match workflow.orchestrate(Step::PanicTask).await {
+ match workflow
+ .orchestrate(Step::PanicTask, CancellationToken::disabled())
+ .await
+ {
Ok(s) => println!(" outcome: Ok({s:?}) (unexpected)"),
Err(e) => {
println!(" outcome: Err(\"{e}\")");
@@ -157,7 +160,10 @@ async fn main() -> Result<(), Box> {
.register(Step::PanicTask, PanicAfterReserve)
.add_exit_state(Step::Done);
- match workflow.orchestrate(Step::Reserve).await {
+ match workflow
+ .orchestrate(Step::Reserve, CancellationToken::disabled())
+ .await
+ {
Ok(s) => println!(" outcome: Ok({s:?}) (unexpected)"),
Err(e) => {
println!(" outcome: Err(\"{e}\")");
diff --git a/cano/examples/poll_retry_on_error.rs b/cano/examples/poll_retry_on_error.rs
index 7b652e2..1e4263c 100644
--- a/cano/examples/poll_retry_on_error.rs
+++ b/cano/examples/poll_retry_on_error.rs
@@ -115,7 +115,10 @@ async fn main() -> Result<(), Box> {
.register(Step::Poll, poller)
.add_exit_state(Step::Done);
- match workflow.orchestrate(Step::Poll).await {
+ match workflow
+ .orchestrate(Step::Poll, CancellationToken::disabled())
+ .await
+ {
Ok(state) => println!(" result: Ok({state:?}) -- loop tolerated the streak\n"),
Err(e) => println!(" result: Err({e}) -- unexpected failure\n"),
}
@@ -135,7 +138,10 @@ async fn main() -> Result<(), Box> {
.register(Step::Poll, poller)
.add_exit_state(Step::Done);
- match workflow.orchestrate(Step::Poll).await {
+ match workflow
+ .orchestrate(Step::Poll, CancellationToken::disabled())
+ .await
+ {
Ok(state) => println!(" result: Ok({state:?}) -- unexpected success\n"),
Err(e) => println!(" result: Err(\"{e}\") -- loop aborted after streak > cap\n"),
}
@@ -195,7 +201,10 @@ async fn main() -> Result<(), Box> {
)
.add_exit_state(Step::Done);
- match workflow.orchestrate(Step::Poll).await {
+ match workflow
+ .orchestrate(Step::Poll, CancellationToken::disabled())
+ .await
+ {
Ok(state) => println!(" result: Ok({state:?}) -- Pending reset the counter\n"),
Err(e) => println!(" result: Err({e}) -- unexpected failure\n"),
}
diff --git a/cano/examples/poll_task.rs b/cano/examples/poll_task.rs
index 8026ccb..0ede9af 100644
--- a/cano/examples/poll_task.rs
+++ b/cano/examples/poll_task.rs
@@ -160,7 +160,9 @@ async fn main() -> CanoResult<()> {
.register(Step::Process, Process)
.add_exit_state(Step::Done);
- let result = workflow.orchestrate(Step::AwaitJob).await?;
+ let result = workflow
+ .orchestrate(Step::AwaitJob, CancellationToken::disabled())
+ .await?;
assert_eq!(result, Step::Done);
println!("\ncompleted at {result:?}");
diff --git a/cano/examples/processing_models_tour.rs b/cano/examples/processing_models_tour.rs
index 457dd21..dc5fb6d 100644
--- a/cano/examples/processing_models_tour.rs
+++ b/cano/examples/processing_models_tour.rs
@@ -242,7 +242,9 @@ async fn main() -> Result<(), Box> {
.with_checkpoint_store(checkpoint_store.clone())
.with_workflow_id(run_id);
- let result = workflow.orchestrate(Stage::Route).await?;
+ let result = workflow
+ .orchestrate(Stage::Route, CancellationToken::disabled())
+ .await?;
assert_eq!(result, Stage::Done);
println!("\ncompleted at {result:?}");
diff --git a/cano/examples/router_task.rs b/cano/examples/router_task.rs
index 1af745e..151999c 100644
--- a/cano/examples/router_task.rs
+++ b/cano/examples/router_task.rs
@@ -123,13 +123,17 @@ fn build_workflow(use_fast_path: bool) -> Workflow {
async fn main() -> CanoResult<()> {
println!("=== fast-path run ===");
let workflow = build_workflow(true);
- let result = workflow.orchestrate(Step::Classify).await?;
+ let result = workflow
+ .orchestrate(Step::Classify, CancellationToken::disabled())
+ .await?;
assert_eq!(result, Step::Done);
println!("completed at {result:?}\n");
println!("=== slow-path run ===");
let workflow = build_workflow(false);
- let result = workflow.orchestrate(Step::Classify).await?;
+ let result = workflow
+ .orchestrate(Step::Classify, CancellationToken::disabled())
+ .await?;
assert_eq!(result, Step::Done);
println!("completed at {result:?}");
diff --git a/cano/examples/saga_payment.rs b/cano/examples/saga_payment.rs
index 445050b..b3bb217 100644
--- a/cano/examples/saga_payment.rs
+++ b/cano/examples/saga_payment.rs
@@ -118,7 +118,10 @@ async fn main() -> Result<(), Box> {
.register(Step::Ship, ShipOrder) // plain — and it fails
.add_exit_state(Step::Done);
- match workflow.orchestrate(Step::Reserve).await {
+ match workflow
+ .orchestrate(Step::Reserve, CancellationToken::disabled())
+ .await
+ {
Ok(state) => println!("\ncompleted at {state:?}"),
Err(error) => println!("\nfailed, rolled back: {error}"),
}
diff --git a/cano/examples/saga_recovery.rs b/cano/examples/saga_recovery.rs
index 1d0d574..577f50a 100644
--- a/cano/examples/saga_recovery.rs
+++ b/cano/examples/saga_recovery.rs
@@ -161,7 +161,10 @@ async fn main() -> Result<(), Box> {
// --- (a) First run: Charge fails; compensations drain LIFO. ----------
println!("--- run: Reserve → Authorize → Charge (fails) → compensate LIFO ---\n");
- match workflow.orchestrate(Step::Reserve).await {
+ match workflow
+ .orchestrate(Step::Reserve, CancellationToken::disabled())
+ .await
+ {
Ok(s) => println!("\ncompleted at {s:?} (unexpected)"),
Err(e) => println!("\nfailed + rolled back: {e}"),
}
diff --git a/cano/examples/scheduler_book_prepositions.rs b/cano/examples/scheduler_book_prepositions.rs
index 9ff257c..8d14917 100644
--- a/cano/examples/scheduler_book_prepositions.rs
+++ b/cano/examples/scheduler_book_prepositions.rs
@@ -273,7 +273,9 @@ async fn main() -> CanoResult<()> {
)
.add_exit_states(vec![WorkflowPhase::Analyze, WorkflowPhase::Complete]);
- let _ = workflow1.orchestrate(WorkflowPhase::Download).await?;
+ let _ = workflow1
+ .orchestrate(WorkflowPhase::Download, CancellationToken::disabled())
+ .await?;
// Book 2: Alice's Adventures in Wonderland
let workflow2 = Workflow::new(Resources::new().insert("store", store.clone()))
@@ -283,7 +285,9 @@ async fn main() -> CanoResult<()> {
)
.add_exit_states(vec![WorkflowPhase::Analyze, WorkflowPhase::Complete]);
- let _ = workflow2.orchestrate(WorkflowPhase::Download).await?;
+ let _ = workflow2
+ .orchestrate(WorkflowPhase::Download, CancellationToken::disabled())
+ .await?;
// Book 3: A Christmas Carol
let workflow3 = Workflow::new(Resources::new().insert("store", store.clone()))
@@ -293,7 +297,9 @@ async fn main() -> CanoResult<()> {
)
.add_exit_states(vec![WorkflowPhase::Analyze, WorkflowPhase::Complete]);
- let _ = workflow3.orchestrate(WorkflowPhase::Download).await?;
+ let _ = workflow3
+ .orchestrate(WorkflowPhase::Download, CancellationToken::disabled())
+ .await?;
// Analyze and rank the downloaded books
println!("\nAnalyzing and ranking books...\n");
@@ -304,7 +310,7 @@ async fn main() -> CanoResult<()> {
.add_exit_state(WorkflowPhase::Complete);
analysis_workflow
- .orchestrate(WorkflowPhase::Analyze)
+ .orchestrate(WorkflowPhase::Analyze, CancellationToken::disabled())
.await?;
println!("\nBook preposition analysis complete!");
diff --git a/cano/examples/scheduler_cancellation.rs b/cano/examples/scheduler_cancellation.rs
new file mode 100644
index 0000000..8a3f3ad
--- /dev/null
+++ b/cano/examples/scheduler_cancellation.rs
@@ -0,0 +1,119 @@
+#![cfg(feature = "scheduler")]
+//! # Scheduler cooperative cancellation
+//!
+//! Demonstrates [`RunningScheduler::cancel_flow`](cano::RunningScheduler::cancel_flow):
+//! a manually-triggered saga `Reserve → Charge → Ship → Done` whose `Ship` step
+//! runs long. A sibling task calls `cancel_flow` once `Ship` is in flight; the
+//! engine aborts it at its next await, the saga compensation stack drains in
+//! reverse (`Charge` then `Reserve`), and the flow returns to `Idle` — a
+//! deliberate cancel is **not** counted as a backoff failure. Graceful `stop()`
+//! cancels in-flight flows the same way.
+//!
+//! Run with:
+//! ```bash
+//! cargo run --example scheduler_cancellation --features scheduler
+//! ```
+
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+
+use cano::prelude::*;
+use cano::scheduler::Status;
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+enum Step {
+ Reserve,
+ Charge,
+ Ship,
+ Done,
+}
+
+struct Reserve;
+struct Charge;
+
+#[saga::task(state = Step)]
+impl Reserve {
+ type Output = u32;
+ async fn run(&self, _res: &Resources) -> Result<(TaskResult, u32), CanoError> {
+ println!("reserve : holding inventory (ticket #42)");
+ Ok((TaskResult::Single(Step::Charge), 42))
+ }
+ async fn compensate(&self, _res: &Resources, ticket: u32) -> Result<(), CanoError> {
+ println!("reserve : releasing ticket #{ticket} (rollback)");
+ Ok(())
+ }
+}
+
+#[saga::task(state = Step)]
+impl Charge {
+ type Output = String;
+ async fn run(&self, _res: &Resources) -> Result<(TaskResult, String), CanoError> {
+ println!("charge : capturing $42.00 (auth auth-XYZ)");
+ Ok((TaskResult::Single(Step::Ship), "auth-XYZ".to_string()))
+ }
+ async fn compensate(&self, _res: &Resources, auth: String) -> Result<(), CanoError> {
+ println!("charge : refunding auth {auth} (rollback)");
+ Ok(())
+ }
+}
+
+/// Long-running, non-compensatable step. Flips `started` so the sibling
+/// canceller fires deterministically while this task is parked in its sleep.
+struct Ship {
+ started: Arc,
+}
+#[task(state = Step)]
+impl Ship {
+ fn config(&self) -> TaskConfig {
+ TaskConfig::minimal()
+ }
+ async fn run_bare(&self) -> Result, CanoError> {
+ println!("ship : dispatching shipment… (cancel_flow will stop this)");
+ self.started.store(true, Ordering::SeqCst);
+ tokio::time::sleep(Duration::from_secs(10)).await;
+ println!("ship : this line should never print");
+ Ok(TaskResult::Single(Step::Done))
+ }
+}
+
+#[tokio::main]
+async fn main() -> CanoResult<()> {
+ let ship_started = Arc::new(AtomicBool::new(false));
+
+ let workflow = Workflow::bare()
+ .register_with_compensation(Step::Reserve, Reserve)
+ .register_with_compensation(Step::Charge, Charge)
+ .register(
+ Step::Ship,
+ Ship {
+ started: ship_started.clone(),
+ },
+ )
+ .add_exit_state(Step::Done);
+
+ let mut scheduler = Scheduler::new();
+ scheduler.manual("order", workflow, Step::Reserve)?;
+ let running = scheduler.start().await?;
+
+ // Kick off the saga, then cancel it once Ship is in flight.
+ running.trigger("order").await?;
+ while !ship_started.load(Ordering::SeqCst) {
+ tokio::time::sleep(Duration::from_millis(5)).await;
+ }
+ println!("\n>>> cancelling the in-flight flow…\n");
+ running.cancel_flow("order").await?;
+
+ // Wait for the cancelled run to settle (saga drained, status back to Idle).
+ loop {
+ let status = running.status("order").await.map(|i| i.status);
+ if status != Some(Status::Running) {
+ println!("\norder flow status after cancel: {status:?} (Idle — not a failure)");
+ break;
+ }
+ tokio::time::sleep(Duration::from_millis(5)).await;
+ }
+
+ running.stop().await?;
+ Ok(())
+}
diff --git a/cano/examples/split_bulkhead.rs b/cano/examples/split_bulkhead.rs
index 8780062..522b823 100644
--- a/cano/examples/split_bulkhead.rs
+++ b/cano/examples/split_bulkhead.rs
@@ -167,7 +167,9 @@ async fn main() -> Result<(), CanoError> {
.register(Step::Summarize, Summarize)
.add_exit_state(Step::Done);
- let result = workflow.orchestrate(Step::ParallelWork).await?;
+ let result = workflow
+ .orchestrate(Step::ParallelWork, CancellationToken::disabled())
+ .await?;
println!("\ncompleted at {result:?}");
println!("\n=== Done ===");
diff --git a/cano/examples/stepped_task.rs b/cano/examples/stepped_task.rs
index d084406..71588cf 100644
--- a/cano/examples/stepped_task.rs
+++ b/cano/examples/stepped_task.rs
@@ -140,7 +140,9 @@ async fn main() -> Result<(), Box> {
.with_checkpoint_store(checkpoint_store.clone())
.with_workflow_id(run_id);
- let result = workflow.orchestrate(Stage::Crunch).await?;
+ let result = workflow
+ .orchestrate(Stage::Crunch, CancellationToken::disabled())
+ .await?;
assert_eq!(result, Stage::Done);
println!("\ncompleted at {result:?}");
diff --git a/cano/examples/store_custom_backend.rs b/cano/examples/store_custom_backend.rs
index 379987d..dae181e 100644
--- a/cano/examples/store_custom_backend.rs
+++ b/cano/examples/store_custom_backend.rs
@@ -221,7 +221,9 @@ async fn main() -> Result<(), CanoError> {
.add_exit_state(Step::Done);
println!("\n-- Part 2: MemoryStore::get_shared (Arc zero-copy sharing) --");
- let result = workflow.orchestrate(Step::WriteA).await?;
+ let result = workflow
+ .orchestrate(Step::WriteA, CancellationToken::disabled())
+ .await?;
println!("\ncompleted at {result:?}");
println!("\n=== Done ===");
diff --git a/cano/examples/task_interface_demo.rs b/cano/examples/task_interface_demo.rs
index 8ac5085..622e676 100644
--- a/cano/examples/task_interface_demo.rs
+++ b/cano/examples/task_interface_demo.rs
@@ -141,7 +141,10 @@ async fn main() -> Result<(), CanoError> {
println!("Executing workflow...");
println!();
- match workflow.orchestrate(TaskState::Start).await {
+ match workflow
+ .orchestrate(TaskState::Start, CancellationToken::disabled())
+ .await
+ {
Ok(final_state) => {
println!();
println!("Workflow completed successfully!");
diff --git a/cano/examples/task_simple.rs b/cano/examples/task_simple.rs
index cfc10a1..273a768 100644
--- a/cano/examples/task_simple.rs
+++ b/cano/examples/task_simple.rs
@@ -82,7 +82,10 @@ async fn main() -> CanoResult<()> {
.register(Action::Count, CounterTask)
.add_exit_states(vec![Action::Complete]);
- match workflow.orchestrate(Action::Generate).await {
+ match workflow
+ .orchestrate(Action::Generate, CancellationToken::disabled())
+ .await
+ {
Ok(_final_state) => {
println!("Workflow completed!");
println!("Final Results:");
diff --git a/cano/examples/testing_helpers.rs b/cano/examples/testing_helpers.rs
index 1f4333e..cdd08c3 100644
--- a/cano/examples/testing_helpers.rs
+++ b/cano/examples/testing_helpers.rs
@@ -117,7 +117,9 @@ async fn main() -> Result<(), Box> {
.with_checkpoint_store(checkpoints.clone())
.with_workflow_id("demo-run");
- let final_state = workflow.orchestrate(Step::Start).await?;
+ let final_state = workflow
+ .orchestrate(Step::Start, CancellationToken::disabled())
+ .await?;
assert_eq!(final_state, Step::Done);
// The observer captured the whole path and the checkpoint appends along the way.
@@ -135,7 +137,10 @@ async fn main() -> Result<(), Box> {
let panicky = Workflow::bare()
.register(Step::Start, panic_on_attempt(1, Step::Done))
.add_exit_state(Step::Done);
- match panicky.orchestrate(Step::Start).await {
+ match panicky
+ .orchestrate(Step::Start, CancellationToken::disabled())
+ .await
+ {
Ok(_) => unreachable!("the task panics on its first attempt"),
Err(e) => println!("panic_on_attempt surfaced as error: {e}"),
}
@@ -149,7 +154,9 @@ async fn main() -> Result<(), Box> {
.register_with_compensation(Step::Work, Charge)
.register(Step::Finish, Boom) // fails → drains the compensation stack in reverse
.add_exit_state(Step::Done);
- let _ = saga.orchestrate(Step::Start).await; // expected to fail and roll back
+ let _ = saga
+ .orchestrate(Step::Start, CancellationToken::disabled())
+ .await; // expected to fail and roll back
let ran = handle.0.lock().unwrap().clone();
// Charge ran last, so it compensates first; then Reserve.
diff --git a/cano/examples/timer_task.rs b/cano/examples/timer_task.rs
index b28e67b..7956ec7 100644
--- a/cano/examples/timer_task.rs
+++ b/cano/examples/timer_task.rs
@@ -100,7 +100,9 @@ async fn main() -> CanoResult<()> {
.register(Step::Process, Process)
.add_exit_state(Step::Done);
- let result = workflow.orchestrate(Step::CoolDown).await?;
+ let result = workflow
+ .orchestrate(Step::CoolDown, CancellationToken::disabled())
+ .await?;
assert_eq!(result, Step::Done);
println!("\ncompleted at {result:?}");
diff --git a/cano/examples/tracing_demo.rs b/cano/examples/tracing_demo.rs
index 533042c..49af079 100644
--- a/cano/examples/tracing_demo.rs
+++ b/cano/examples/tracing_demo.rs
@@ -213,7 +213,9 @@ async fn main() -> CanoResult<()> {
.add_exit_states(vec![WorkflowState::Complete, WorkflowState::Error]);
info!("Starting workflow execution...");
- let result = workflow.orchestrate(WorkflowState::Start).await?;
+ let result = workflow
+ .orchestrate(WorkflowState::Start, CancellationToken::disabled())
+ .await?;
info!(final_state = ?result, "Workflow completed");
println!("Basic workflow completed with state: {result:?}\n");
@@ -248,7 +250,9 @@ async fn main() -> CanoResult<()> {
.with_tracing_span(workflow_span);
info!("Starting task-based workflow execution (with custom span)...");
- let result = task_workflow.orchestrate(WorkflowState::Start).await?;
+ let result = task_workflow
+ .orchestrate(WorkflowState::Start, CancellationToken::disabled())
+ .await?;
let math_result: i32 = store.get("math_result").unwrap_or(0);
let completed_by: String = store.get("task_completed_by").unwrap_or_default();
@@ -329,7 +333,9 @@ async fn main() -> CanoResult<()> {
.add_exit_states(vec![WorkflowState::Complete, WorkflowState::Error]);
info!("Starting workflow that will encounter validation failure...");
- let result = error_workflow.orchestrate(WorkflowState::Start).await?;
+ let result = error_workflow
+ .orchestrate(WorkflowState::Start, CancellationToken::disabled())
+ .await?;
println!("Error workflow completed with state: {result:?}");
@@ -363,7 +369,9 @@ async fn main() -> CanoResult<()> {
// One line: re-emit lifecycle/failure events as `tracing` events.
.with_observer(Arc::new(TracingObserver::new()));
- let result = observed_workflow.orchestrate(WorkflowState::Start).await?;
+ let result = observed_workflow
+ .orchestrate(WorkflowState::Start, CancellationToken::disabled())
+ .await?;
println!("Observed workflow completed with state: {result:?}");
println!(
" (look for `task started` / `task succeeded` events; filter with RUST_LOG=cano::observer=debug)\n"
diff --git a/cano/examples/workflow_ad_exchange.rs b/cano/examples/workflow_ad_exchange.rs
index 20a4fd5..d55ab8d 100644
--- a/cano/examples/workflow_ad_exchange.rs
+++ b/cano/examples/workflow_ad_exchange.rs
@@ -596,7 +596,10 @@ async fn main() -> Result<(), CanoError> {
let start = tokio::time::Instant::now();
// Execute workflow - if splits timeout or fail, transition to NoFill
- let result = match workflow.orchestrate(AdExchangeState::Start).await {
+ let result = match workflow
+ .orchestrate(AdExchangeState::Start, CancellationToken::disabled())
+ .await
+ {
Ok(state) => state,
Err(e) => {
// If workflow fails due to split timeout/error, handle as NoFill.
@@ -606,7 +609,12 @@ async fn main() -> Result<(), CanoError> {
store.put("error_reason", e.to_string())?;
println!("\n⚠️ Handling as No Fill due to error\n");
- workflow.orchestrate(AdExchangeState::ErrorTracking).await?
+ workflow
+ .orchestrate(
+ AdExchangeState::ErrorTracking,
+ CancellationToken::disabled(),
+ )
+ .await?
}
};
diff --git a/cano/examples/workflow_bare.rs b/cano/examples/workflow_bare.rs
index b416b81..3f19384 100644
--- a/cano/examples/workflow_bare.rs
+++ b/cano/examples/workflow_bare.rs
@@ -87,7 +87,10 @@ async fn main() -> CanoResult<()> {
.register(Stage::Sanitize, SanitizeTask)
.add_exit_states(vec![Stage::Persist, Stage::Done]);
- match workflow.orchestrate(Stage::Validate).await {
+ match workflow
+ .orchestrate(Stage::Validate, CancellationToken::disabled())
+ .await
+ {
Ok(final_state) => println!("\nBare workflow reached: {final_state:?}\n"),
Err(e) => {
eprintln!("Workflow failed: {e}");
@@ -105,7 +108,10 @@ async fn main() -> CanoResult<()> {
.register(Stage::Persist, PersistTask) // resource task
.add_exit_states(vec![Stage::Done]);
- match workflow.orchestrate(Stage::Validate).await {
+ match workflow
+ .orchestrate(Stage::Validate, CancellationToken::disabled())
+ .await
+ {
Ok(final_state) => {
println!("\nMixed workflow reached: {final_state:?}");
if let Ok(v) = store.get::("sanitized_value") {
diff --git a/cano/examples/workflow_book_prepositions.rs b/cano/examples/workflow_book_prepositions.rs
index 753814a..a79024e 100644
--- a/cano/examples/workflow_book_prepositions.rs
+++ b/cano/examples/workflow_book_prepositions.rs
@@ -507,7 +507,13 @@ async fn run_workflow() -> Result<(), CanoError> {
println!(" BookRankingByPrepositionTask (Ranking phase)");
// Execute the entire workflow using Workflow orchestration
- match workflow.orchestrate(BookPrepositionAction::Download).await {
+ match workflow
+ .orchestrate(
+ BookPrepositionAction::Download,
+ CancellationToken::disabled(),
+ )
+ .await
+ {
Ok(final_state) => {
match final_state {
BookPrepositionAction::Complete => {
diff --git a/cano/examples/workflow_cancellation.rs b/cano/examples/workflow_cancellation.rs
new file mode 100644
index 0000000..2a74277
--- /dev/null
+++ b/cano/examples/workflow_cancellation.rs
@@ -0,0 +1,120 @@
+//! # Cooperative cancellation — saga rollback on cancel
+//!
+//! Demonstrates [`Workflow::orchestrate`](cano::Workflow::orchestrate) with a live token:
+//! a 3-step saga `Reserve → Charge → Ship → Done` where a sibling task fires a
+//! [`CancellationHandle`](cano::CancellationHandle) once `Ship` is in flight. The in-flight
+//! task is aborted at its next await point, the saga compensation stack drains in reverse
+//! (`Charge` then `Reserve`), and the call returns
+//! [`CanoError::Cancelled`](cano::CanoError::Cancelled).
+//!
+//! Run with:
+//! ```bash
+//! cargo run --example workflow_cancellation
+//! ```
+//!
+//! Expected output (timings will vary):
+//! ```text
+//! reserve : holding inventory (ticket #42)
+//! charge : capturing $42.00 (auth auth-XYZ)
+//! ship : dispatching shipment… (a sibling task will cancel this)
+//! charge : refunding auth auth-XYZ (rollback)
+//! reserve : releasing ticket #42 (rollback)
+//! workflow cancelled, rolled back: state=Ship attempt=0 path=[Reserve, Charge, Ship] caused by: Workflow cancelled
+//! ```
+
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::Duration;
+
+use cano::CancellationToken;
+use cano::prelude::*;
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+enum Step {
+ Reserve,
+ Charge,
+ Ship,
+ Done,
+}
+
+struct Reserve;
+struct Charge;
+
+#[saga::task(state = Step)]
+impl Reserve {
+ type Output = u32;
+ async fn run(&self, _res: &Resources) -> Result<(TaskResult, u32), CanoError> {
+ let ticket = 42;
+ println!("reserve : holding inventory (ticket #{ticket})");
+ Ok((TaskResult::Single(Step::Charge), ticket))
+ }
+ async fn compensate(&self, _res: &Resources, ticket: u32) -> Result<(), CanoError> {
+ println!("reserve : releasing ticket #{ticket} (rollback)");
+ Ok(())
+ }
+}
+
+#[saga::task(state = Step)]
+impl Charge {
+ type Output = String;
+ async fn run(&self, _res: &Resources) -> Result<(TaskResult, String), CanoError> {
+ let auth = "auth-XYZ".to_string();
+ println!("charge : capturing $42.00 (auth {auth})");
+ Ok((TaskResult::Single(Step::Ship), auth))
+ }
+ async fn compensate(&self, _res: &Resources, auth: String) -> Result<(), CanoError> {
+ println!("charge : refunding auth {auth} (rollback)");
+ Ok(())
+ }
+}
+
+// Plain (non-compensatable) long-running task. It flips `started` so the sibling
+// canceller fires deterministically while this task is parked in its sleep.
+struct Ship {
+ started: Arc,
+}
+#[task(state = Step)]
+impl Ship {
+ fn config(&self) -> TaskConfig {
+ TaskConfig::minimal()
+ }
+ async fn run_bare(&self) -> Result, CanoError> {
+ println!("ship : dispatching shipment… (a sibling task will cancel this)");
+ self.started.store(true, Ordering::SeqCst);
+ tokio::time::sleep(Duration::from_secs(2)).await;
+ println!("ship : this line should never print");
+ Ok(TaskResult::Single(Step::Done))
+ }
+}
+
+#[tokio::main]
+async fn main() {
+ let ship_started = Arc::new(AtomicBool::new(false));
+ let workflow = Workflow::bare()
+ .register_with_compensation(Step::Reserve, Reserve)
+ .register_with_compensation(Step::Charge, Charge)
+ .register(
+ Step::Ship,
+ Ship {
+ started: ship_started.clone(),
+ },
+ )
+ .add_exit_state(Step::Done);
+
+ let (handle, token) = CancellationToken::new();
+
+ // Sibling task: cancel as soon as `Ship` is in flight.
+ let canceller = tokio::spawn(async move {
+ while !ship_started.load(Ordering::SeqCst) {
+ tokio::time::sleep(Duration::from_millis(5)).await;
+ }
+ handle.cancel();
+ });
+
+ match workflow.orchestrate(Step::Reserve, token).await {
+ Ok(state) => println!("\nworkflow completed at {state:?}"),
+ Err(error) => println!("\nworkflow cancelled, rolled back: {error}"),
+ }
+
+ canceller.await.expect("canceller task panicked");
+}
diff --git a/cano/examples/workflow_negotiation.rs b/cano/examples/workflow_negotiation.rs
index e0c2c14..cd19cf4 100644
--- a/cano/examples/workflow_negotiation.rs
+++ b/cano/examples/workflow_negotiation.rs
@@ -250,7 +250,13 @@ async fn run_negotiation_workflow() -> Result<(), CanoError> {
]);
// Execute the negotiation workflow
- match workflow.orchestrate(NegotiationAction::StartSelling).await {
+ match workflow
+ .orchestrate(
+ NegotiationAction::StartSelling,
+ CancellationToken::disabled(),
+ )
+ .await
+ {
Ok(final_state) => {
println!("{}", "=".repeat(50));
diff --git a/cano/examples/workflow_observer.rs b/cano/examples/workflow_observer.rs
index 42d224e..d8b076e 100644
--- a/cano/examples/workflow_observer.rs
+++ b/cano/examples/workflow_observer.rs
@@ -158,7 +158,9 @@ async fn main() -> Result<(), CanoError> {
)
.add_exit_state(Step::Done)
.with_observer(observer.clone());
- let final_state = workflow.orchestrate(Step::Load).await?;
+ let final_state = workflow
+ .orchestrate(Step::Load, CancellationToken::disabled())
+ .await?;
println!(" → workflow finished in state {final_state:?}\n");
// -- Scenario B -------------------------------------------------------
@@ -181,7 +183,10 @@ async fn main() -> Result<(), CanoError> {
)
.add_exit_state(Step::Done)
.with_observer(observer.clone());
- match guarded.orchestrate(Step::Probe).await {
+ match guarded
+ .orchestrate(Step::Probe, CancellationToken::disabled())
+ .await
+ {
Ok(s) => println!(" → unexpectedly finished in {s:?}\n"),
Err(e) => println!(" → workflow errored as expected: {e}\n"),
}
diff --git a/cano/examples/workflow_on_request.rs b/cano/examples/workflow_on_request.rs
index 3445196..5c25bfd 100644
--- a/cano/examples/workflow_on_request.rs
+++ b/cano/examples/workflow_on_request.rs
@@ -142,7 +142,7 @@ fn build_workflow(resources: Resources) -> Workflow {
.register(TextPipelineState::Parse, ParseTask)
.register(TextPipelineState::Transform, TransformTask)
.add_exit_state(TextPipelineState::Done)
- .with_timeout(Duration::from_secs(5))
+ .with_total_timeout(Duration::from_secs(5))
}
// ============================================================================
@@ -165,7 +165,7 @@ async fn process_handler(
// Run the FSM to completion.
workflow
- .orchestrate(TextPipelineState::Parse)
+ .orchestrate(TextPipelineState::Parse, CancellationToken::disabled())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
diff --git a/cano/examples/workflow_partial_results.rs b/cano/examples/workflow_partial_results.rs
index 411ee94..01038a6 100644
--- a/cano/examples/workflow_partial_results.rs
+++ b/cano/examples/workflow_partial_results.rs
@@ -103,7 +103,9 @@ async fn main() -> Result<(), CanoError> {
println!("Starting workflow...");
let start = std::time::Instant::now();
- let result = workflow.orchestrate(ApiState::Start).await?;
+ let result = workflow
+ .orchestrate(ApiState::Start, CancellationToken::disabled())
+ .await?;
let duration = start.elapsed();
println!(
diff --git a/cano/examples/workflow_recovery.rs b/cano/examples/workflow_recovery.rs
index 0201d16..da2bd5c 100644
--- a/cano/examples/workflow_recovery.rs
+++ b/cano/examples/workflow_recovery.rs
@@ -109,7 +109,10 @@ async fn main() -> Result<(), Box> {
.with_observer(Arc::new(Watcher));
println!("── run 1: orchestrate (Process will crash) ──");
- if let Err(e) = workflow.orchestrate(Step::Start).await {
+ if let Err(e) = workflow
+ .orchestrate(Step::Start, CancellationToken::disabled())
+ .await
+ {
println!(" stopped: {e}");
}
println!("checkpoint log after run 1 (the crash left it intact):");
@@ -118,7 +121,9 @@ async fn main() -> Result<(), Box> {
}
println!("\n── run 2: resume_from ──");
- let final_state = workflow.resume_from(run_id).await?;
+ let final_state = workflow
+ .resume_from(run_id, CancellationToken::disabled())
+ .await?;
println!(" reached {final_state:?} — checkpoint log cleared on success");
assert_eq!(final_state, Step::Done);
assert!(checkpoint_store.load_run(run_id).await?.is_empty());
diff --git a/cano/examples/workflow_resources.rs b/cano/examples/workflow_resources.rs
index c30de2e..a822ae6 100644
--- a/cano/examples/workflow_resources.rs
+++ b/cano/examples/workflow_resources.rs
@@ -320,7 +320,9 @@ async fn main() -> Result<(), CanoError> {
.add_exit_state(Step::Done);
println!("Running workflow...");
- let final_state = workflow.orchestrate(Step::Init).await?;
+ let final_state = workflow
+ .orchestrate(Step::Init, CancellationToken::disabled())
+ .await?;
assert_eq!(final_state, Step::Done);
let result: u32 = store.get("result")?;
diff --git a/cano/examples/workflow_simd_matrix_pipeline.rs b/cano/examples/workflow_simd_matrix_pipeline.rs
index 6077d90..8035db7 100644
--- a/cano/examples/workflow_simd_matrix_pipeline.rs
+++ b/cano/examples/workflow_simd_matrix_pipeline.rs
@@ -470,7 +470,9 @@ async fn main() -> Result<(), Box> {
println!("Pipeline: Generate -> Multiply -> Transform -> Statistics -> Complete\n");
// Execute the workflow
- let _final_state = workflow.orchestrate(PipelineState::Generate).await?;
+ let _final_state = workflow
+ .orchestrate(PipelineState::Generate, CancellationToken::disabled())
+ .await?;
let total_duration = start_time.elapsed();
println!("\nSIMD Matrix Processing Pipeline completed!");
diff --git a/cano/examples/workflow_simple.rs b/cano/examples/workflow_simple.rs
index 1c24e80..ca8348b 100644
--- a/cano/examples/workflow_simple.rs
+++ b/cano/examples/workflow_simple.rs
@@ -152,7 +152,10 @@ async fn main() -> Result<(), CanoError> {
.register(WorkflowAction::Count, CounterTask)
.add_exit_states(vec![WorkflowAction::Complete, WorkflowAction::Error]);
- match workflow.orchestrate(WorkflowAction::Generate).await {
+ match workflow
+ .orchestrate(WorkflowAction::Generate, CancellationToken::disabled())
+ .await
+ {
Ok(WorkflowAction::Complete) => {
println!("\nWorkflow completed successfully!");
match store.get::("number_count") {
diff --git a/cano/examples/workflow_split_join.rs b/cano/examples/workflow_split_join.rs
index d4c14e7..ad070a6 100644
--- a/cano/examples/workflow_split_join.rs
+++ b/cano/examples/workflow_split_join.rs
@@ -142,7 +142,9 @@ async fn main() -> Result<(), CanoError> {
.register(DataProcessingState::Aggregate, Aggregator)
.add_exit_state(DataProcessingState::Complete);
- let result = workflow.orchestrate(DataProcessingState::Start).await?;
+ let result = workflow
+ .orchestrate(DataProcessingState::Start, CancellationToken::disabled())
+ .await?;
let final_result: i32 = store.get("final_result")?;
println!("Final result: {}", final_result);
@@ -172,7 +174,9 @@ async fn main() -> Result<(), CanoError> {
.register(DataProcessingState::Aggregate, Aggregator)
.add_exit_state(DataProcessingState::Complete);
- let result = workflow.orchestrate(DataProcessingState::Start).await?;
+ let result = workflow
+ .orchestrate(DataProcessingState::Start, CancellationToken::disabled())
+ .await?;
let processor_count: usize = store.get("processor_count")?;
println!(
@@ -205,7 +209,9 @@ async fn main() -> Result<(), CanoError> {
.register(DataProcessingState::Aggregate, Aggregator)
.add_exit_state(DataProcessingState::Complete);
- let result = workflow.orchestrate(DataProcessingState::Start).await?;
+ let result = workflow
+ .orchestrate(DataProcessingState::Start, CancellationToken::disabled())
+ .await?;
println!("Workflow completed with Any strategy: {:?}\n", result);
}
@@ -236,7 +242,9 @@ async fn main() -> Result<(), CanoError> {
.register(DataProcessingState::Aggregate, Aggregator)
.add_exit_state(DataProcessingState::Complete);
- let result = workflow.orchestrate(DataProcessingState::Start).await?;
+ let result = workflow
+ .orchestrate(DataProcessingState::Start, CancellationToken::disabled())
+ .await?;
let processor_count: usize = store.get("processor_count")?;
println!("Processors completed: {} (66% threshold)", processor_count);
@@ -267,7 +275,10 @@ async fn main() -> Result<(), CanoError> {
.register(DataProcessingState::Aggregate, Aggregator)
.add_exit_state(DataProcessingState::Complete);
- match workflow.orchestrate(DataProcessingState::Start).await {
+ match workflow
+ .orchestrate(DataProcessingState::Start, CancellationToken::disabled())
+ .await
+ {
Ok(result) => println!("Workflow completed: {:?}", result),
Err(e) => println!("Workflow failed (expected timeout): {}", e),
}
diff --git a/cano/examples/workflow_stack_store.rs b/cano/examples/workflow_stack_store.rs
index 00393d1..ca053ac 100644
--- a/cano/examples/workflow_stack_store.rs
+++ b/cano/examples/workflow_stack_store.rs
@@ -159,7 +159,9 @@ async fn main() -> CanoResult<()> {
// Execute workflow
println!("\nStarting workflow...\n");
- let final_state = workflow.orchestrate(RequestState::Start).await?;
+ let final_state = workflow
+ .orchestrate(RequestState::Start, CancellationToken::disabled())
+ .await?;
// Display results
println!("\nFinal Results:");
@@ -198,7 +200,9 @@ async fn main() -> CanoResult<()> {
// Execute workflow
println!("\nStarting workflow...\n");
- let final_state = workflow.orchestrate(RequestState::Start).await?;
+ let final_state = workflow
+ .orchestrate(RequestState::Start, CancellationToken::disabled())
+ .await?;
// Display results
println!("\nFinal Results:");
@@ -237,7 +241,9 @@ async fn main() -> CanoResult<()> {
// Execute workflow
println!("\nStarting workflow...\n");
- let final_state = workflow.orchestrate(RequestState::Start).await?;
+ let final_state = workflow
+ .orchestrate(RequestState::Start, CancellationToken::disabled())
+ .await?;
// Display results
println!("\nFinal Results:");
diff --git a/cano/examples/workflow_total_timeout.rs b/cano/examples/workflow_total_timeout.rs
index 94f8e31..4dcf2a7 100644
--- a/cano/examples/workflow_total_timeout.rs
+++ b/cano/examples/workflow_total_timeout.rs
@@ -91,7 +91,10 @@ async fn main() {
.register(Step::Ship, Ship)
.add_exit_state(Step::Done);
- match workflow.orchestrate(Step::Reserve).await {
+ match workflow
+ .orchestrate(Step::Reserve, CancellationToken::disabled())
+ .await
+ {
Ok(state) => println!("\nworkflow completed at {state:?}"),
Err(error) => println!("\nworkflow failed, rolled back: {error}"),
}
diff --git a/cano/examples/workflow_validation.rs b/cano/examples/workflow_validation.rs
index 2409b2e..384ae68 100644
--- a/cano/examples/workflow_validation.rs
+++ b/cano/examples/workflow_validation.rs
@@ -102,7 +102,9 @@ async fn main() -> Result<(), Box> {
Err(e) => println!(" validate_initial_state(Prepare) -> Err: {e}"),
}
- let result = workflow.orchestrate(Step::Prepare).await?;
+ let result = workflow
+ .orchestrate(Step::Prepare, CancellationToken::disabled())
+ .await?;
println!(" orchestrate -> {result:?}\n");
}
diff --git a/cano/src/bin/recovery_resume.rs b/cano/src/bin/recovery_resume.rs
index 6173382..cc1a2a1 100644
--- a/cano/src/bin/recovery_resume.rs
+++ b/cano/src/bin/recovery_resume.rs
@@ -125,8 +125,16 @@ async fn main() -> Result<(), Box> {
}));
let final_state = match mode {
- "resume" => workflow.resume_from(workflow_id).await?,
- _ => workflow.orchestrate(Step::Start).await?,
+ "resume" => {
+ workflow
+ .resume_from(workflow_id, CancellationToken::disabled())
+ .await?
+ }
+ _ => {
+ workflow
+ .orchestrate(Step::Start, CancellationToken::disabled())
+ .await?
+ }
};
println!("DONE {final_state:?}");
let _ = std::io::stdout().flush();
diff --git a/cano/src/bin/stepped_resume.rs b/cano/src/bin/stepped_resume.rs
index f410286..c0e5309 100644
--- a/cano/src/bin/stepped_resume.rs
+++ b/cano/src/bin/stepped_resume.rs
@@ -107,13 +107,17 @@ async fn main() -> Result<(), Box> {
let final_state = match mode {
"resume" => {
- let result = workflow.resume_from(WORKFLOW_ID).await?;
+ let result = workflow
+ .resume_from(WORKFLOW_ID, CancellationToken::disabled())
+ .await?;
println!("RESUME COMPLETE final={result:?}");
let _ = std::io::stdout().flush();
result
}
_ => {
- let result = workflow.orchestrate(State::Crunch).await?;
+ let result = workflow
+ .orchestrate(State::Crunch, CancellationToken::disabled())
+ .await?;
println!("RUN COMPLETE final={result:?}");
let _ = std::io::stdout().flush();
result
diff --git a/cano/src/cancel.rs b/cano/src/cancel.rs
new file mode 100644
index 0000000..ae6e860
--- /dev/null
+++ b/cano/src/cancel.rs
@@ -0,0 +1,264 @@
+//! Cooperative cancellation for workflow runs.
+//!
+//! [`CancellationToken`] / [`CancellationHandle`] form a clonable signal pair built on
+//! [`tokio::sync::watch`] — no extra dependency. Hand a token to
+//! [`Workflow::orchestrate`](crate::workflow::Workflow::orchestrate)
+//! (or [`resume_from`](crate::workflow::Workflow::resume_from)) and keep
+//! the handle; calling [`CancellationHandle::cancel`] aborts the in-flight cancellable task at its
+//! next await point, drains the saga compensation stack, and surfaces
+//! [`CanoError::Cancelled`](crate::error::CanoError::Cancelled). To opt a run out of cancellation
+//! entirely, pass [`CancellationToken::disabled`].
+//!
+//! Cancellation is **cooperative**: the engine drops the running task future at its next `.await`,
+//! so a task doing uninterrupted synchronous/CPU work is not interrupted until it next yields. A
+//! [`CompensatableTask`](crate::saga::CompensatableTask) is deliberately *never* interrupted
+//! mid-run (that would orphan a committed side effect with no entry to roll back) — it runs to
+//! completion and the cancel is honoured at the next state boundary. The compensation drain itself
+//! is uncancellable.
+//!
+//! ```
+//! use cano::prelude::*;
+//! use cano::CancellationToken;
+//!
+//! #[derive(Clone, Debug, PartialEq, Eq, Hash)]
+//! enum Step { Start, Done }
+//!
+//! struct Noop;
+//! #[task]
+//! impl Task for Noop {
+//! async fn run_bare(&self) -> Result, CanoError> {
+//! Ok(TaskResult::Single(Step::Done))
+//! }
+//! }
+//!
+//! # #[tokio::main]
+//! # async fn main() {
+//! let (handle, token) = CancellationToken::new();
+//! let workflow = Workflow::bare()
+//! .register(Step::Start, Noop)
+//! .add_exit_state(Step::Done);
+//!
+//! // Cancel from anywhere (another task, a signal handler, …):
+//! handle.cancel();
+//!
+//! let result = workflow.orchestrate(Step::Start, token).await;
+//! assert!(matches!(result, Err(e) if e.category() == "cancelled"));
+//! # }
+//! ```
+
+/// The observing half of a cancellation signal. Clonable and cheap to pass into a workflow.
+///
+/// A token built via [`CancellationToken::new`] observes its paired [`CancellationHandle`]; a
+/// [`disabled`](CancellationToken::disabled) token never fires and adds no overhead, so a run
+/// passed one opts out of cancellation entirely.
+#[derive(Clone, Debug)]
+pub struct CancellationToken {
+ rx: Option>,
+}
+
+/// The controlling half of a cancellation signal. Call [`cancel`](Self::cancel) to fire it.
+///
+/// Clonable, so several owners can trigger cancellation; [`cancel`](Self::cancel) is idempotent.
+#[derive(Clone, Debug)]
+pub struct CancellationHandle {
+ tx: tokio::sync::watch::Sender,
+}
+
+impl CancellationToken {
+ /// Create a fresh handle/token pair. The token is not cancelled until the handle's
+ /// [`cancel`](CancellationHandle::cancel) is called.
+ #[must_use]
+ pub fn new() -> (CancellationHandle, CancellationToken) {
+ let (tx, rx) = tokio::sync::watch::channel(false);
+ (
+ CancellationHandle { tx },
+ CancellationToken { rx: Some(rx) },
+ )
+ }
+
+ /// A token that can never be cancelled — pass it to
+ /// [`orchestrate`](crate::workflow::Workflow::orchestrate) /
+ /// [`resume_from`](crate::workflow::Workflow::resume_from) to opt a run out of cancellation.
+ /// No channel is allocated, so this path stays allocation- and overhead-free: the FSM skips
+ /// the cancellation `select!` entirely.
+ #[must_use]
+ pub fn disabled() -> Self {
+ Self { rx: None }
+ }
+
+ /// Whether this token has already been cancelled. Non-blocking poll.
+ #[must_use]
+ pub fn is_cancelled(&self) -> bool {
+ self.rx.as_ref().is_some_and(|rx| *rx.borrow())
+ }
+
+ /// Whether this token can ever observe a cancellation. `false` for the internal "never"
+ /// token, letting the FSM hot path skip the cancellation `select!` entirely.
+ pub(crate) fn can_cancel(&self) -> bool {
+ self.rx.is_some()
+ }
+
+ /// Resolve once the token is cancelled. A "never" token (or one whose handle was dropped
+ /// without cancelling) stays pending forever — making it safe to use as a `select!` branch
+ /// that simply never wins.
+ pub async fn cancelled(&self) {
+ match &self.rx {
+ None => std::future::pending::<()>().await,
+ Some(rx) => {
+ let mut rx = rx.clone();
+ if *rx.borrow() {
+ return;
+ }
+ while rx.changed().await.is_ok() {
+ if *rx.borrow() {
+ return;
+ }
+ }
+ // Sender dropped without ever sending `true`: never cancels.
+ std::future::pending::<()>().await;
+ }
+ }
+ }
+}
+
+impl CancellationHandle {
+ /// Signal cancellation to every token observing this handle. Idempotent — calling it again
+ /// after the first cancel is a no-op.
+ pub fn cancel(&self) {
+ let _ = self.tx.send(true);
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::time::Duration;
+
+ #[tokio::test]
+ async fn cancel_propagates_to_receivers() {
+ let (handle, token) = CancellationToken::new();
+ assert!(!token.is_cancelled());
+ handle.cancel();
+ // `cancelled()` resolves promptly.
+ tokio::time::timeout(Duration::from_secs(1), token.cancelled())
+ .await
+ .expect("cancelled() should resolve after cancel");
+ assert!(token.is_cancelled());
+ }
+
+ #[tokio::test]
+ async fn clone_after_cancel_still_observes() {
+ let (handle, token) = CancellationToken::new();
+ handle.cancel();
+ let cloned = token.clone();
+ assert!(cloned.is_cancelled());
+ tokio::time::timeout(Duration::from_secs(1), cloned.cancelled())
+ .await
+ .expect("a clone made after cancel still observes it");
+ }
+
+ #[test]
+ fn is_cancelled_polls_without_await() {
+ let (handle, token) = CancellationToken::new();
+ assert!(!token.is_cancelled());
+ handle.cancel();
+ assert!(token.is_cancelled());
+ }
+
+ #[tokio::test]
+ async fn cancel_is_idempotent() {
+ let (handle, token) = CancellationToken::new();
+ handle.cancel();
+ handle.cancel(); // second call is a no-op
+ assert!(token.is_cancelled());
+ }
+
+ #[tokio::test]
+ async fn dropping_handle_without_cancel_keeps_token_pending() {
+ let (handle, token) = CancellationToken::new();
+ drop(handle);
+ assert!(!token.is_cancelled());
+ // `cancelled()` must NOT resolve just because the sender dropped.
+ let res = tokio::time::timeout(Duration::from_millis(50), token.cancelled()).await;
+ assert!(
+ res.is_err(),
+ "cancelled() should stay pending after handle drop"
+ );
+ }
+
+ #[tokio::test]
+ async fn disabled_is_never_cancelled_and_can_cancel_false() {
+ let token = CancellationToken::disabled();
+ assert!(!token.is_cancelled());
+ assert!(!token.can_cancel());
+ let res = tokio::time::timeout(Duration::from_millis(50), token.cancelled()).await;
+ assert!(res.is_err(), "disabled token should stay pending");
+ }
+
+ #[test]
+ fn can_cancel_true_for_new_token() {
+ let (_handle, token) = CancellationToken::new();
+ assert!(token.can_cancel());
+ }
+
+ // Cancel fires *after* the await begins — exercises the `rx.changed().await`
+ // wakeup path (the other tests cancel first and hit the fast-path `borrow()`).
+ #[tokio::test]
+ async fn cancelled_resolves_when_cancel_fires_while_awaiting() {
+ let (handle, token) = CancellationToken::new();
+ let waiter = tokio::spawn(async move { token.cancelled().await });
+ // Let the waiter park inside `changed().await` before cancelling.
+ tokio::time::sleep(Duration::from_millis(20)).await;
+ handle.cancel();
+ tokio::time::timeout(Duration::from_secs(1), waiter)
+ .await
+ .expect("waiter should wake on cancel")
+ .expect("waiter task should not panic");
+ }
+
+ // A clone taken *before* the cancel still observes it (both halves share state).
+ #[tokio::test]
+ async fn clone_before_cancel_is_observed() {
+ let (handle, token) = CancellationToken::new();
+ let cloned = token.clone();
+ assert!(!cloned.is_cancelled());
+ handle.cancel();
+ assert!(cloned.is_cancelled());
+ assert!(token.is_cancelled());
+ tokio::time::timeout(Duration::from_secs(1), cloned.cancelled())
+ .await
+ .expect("a clone made before cancel still resolves");
+ }
+
+ // `CancellationHandle` is `Clone`; cancelling via a clone still fires, even after
+ // the original handle is dropped.
+ #[tokio::test]
+ async fn cloned_handle_triggers_cancellation() {
+ let (handle, token) = CancellationToken::new();
+ let handle2 = handle.clone();
+ drop(handle); // only the clone remains
+ assert!(!token.is_cancelled());
+ handle2.cancel();
+ assert!(token.is_cancelled());
+ }
+
+ // One cancel wakes every concurrent awaiter.
+ #[tokio::test]
+ async fn multiple_awaiters_all_wake_on_cancel() {
+ let (handle, token) = CancellationToken::new();
+ let waiters: Vec<_> = (0..5)
+ .map(|_| {
+ let t = token.clone();
+ tokio::spawn(async move { t.cancelled().await })
+ })
+ .collect();
+ tokio::time::sleep(Duration::from_millis(20)).await;
+ handle.cancel();
+ for w in waiters {
+ tokio::time::timeout(Duration::from_secs(1), w)
+ .await
+ .expect("every awaiter should wake")
+ .expect("awaiter task should not panic");
+ }
+ }
+}
diff --git a/cano/src/error.rs b/cano/src/error.rs
index e58a56e..666605e 100644
--- a/cano/src/error.rs
+++ b/cano/src/error.rs
@@ -205,6 +205,17 @@ pub enum CanoError {
limit: std::time::Duration,
},
+ /// A run was cancelled via a [`CancellationToken`](crate::cancel::CancellationToken).
+ ///
+ /// Emitted by [`orchestrate`](crate::workflow::Workflow::orchestrate)
+ /// and [`resume_from`](crate::workflow::Workflow::resume_from) when the paired
+ /// [`CancellationHandle`](crate::cancel::CancellationHandle) fires. The in-flight
+ /// cancellable task is dropped at its next await point and the compensation stack is drained
+ /// before this error surfaces. Like every task error from the FSM it is wrapped in
+ /// [`CanoError::WithStateContext`] (clean rollback); a dirty rollback yields
+ /// [`CanoError::CompensationFailed`] whose `errors[0]` carries the wrapped `Cancelled`.
+ Cancelled,
+
/// A call was rejected because the circuit breaker is open.
///
/// Emitted by [`crate::circuit::CircuitBreaker::try_acquire`] (and surfaced through the
@@ -382,6 +393,11 @@ impl CanoError {
CanoError::WorkflowTimeout { elapsed, limit }
}
+ /// Create a new cancellation error.
+ pub fn cancelled() -> Self {
+ CanoError::Cancelled
+ }
+
/// Create a new circuit-open error
pub fn circuit_open>(msg: S) -> Self {
CanoError::CircuitOpen(msg.into())
@@ -510,6 +526,7 @@ impl CanoError {
CanoError::RetryExhausted { source, .. } => source.message(),
CanoError::Timeout(msg) => msg,
CanoError::WorkflowTimeout { .. } => "workflow total timeout exceeded",
+ CanoError::Cancelled => "workflow cancelled",
CanoError::CircuitOpen(msg) => msg,
CanoError::RateLimited { .. } => "rate limited",
CanoError::CheckpointStore(msg) => msg,
@@ -588,6 +605,7 @@ impl CanoError {
CanoError::RetryExhausted { .. } => "retry_exhausted",
CanoError::Timeout(_) => "timeout",
CanoError::WorkflowTimeout { .. } => "workflow_timeout",
+ CanoError::Cancelled => "cancelled",
CanoError::CircuitOpen(_) => "circuit_open",
CanoError::RateLimited { .. } => "rate_limited",
CanoError::CheckpointStore(_) => "checkpoint_store",
@@ -618,6 +636,7 @@ impl std::fmt::Display for CanoError {
f,
"Workflow total timeout exceeded: elapsed={elapsed:?} limit={limit:?}"
),
+ CanoError::Cancelled => write!(f, "Workflow cancelled"),
CanoError::CircuitOpen(msg) => write!(f, "Circuit open: {msg}"),
CanoError::RateLimited { tier, retry_after } => {
write!(
@@ -713,6 +732,7 @@ impl PartialEq for CanoError {
limit: l2,
},
) => e1 == e2 && l1 == l2,
+ (CanoError::Cancelled, CanoError::Cancelled) => true,
(CanoError::CircuitOpen(a), CanoError::CircuitOpen(b)) => a == b,
(
CanoError::RateLimited {
@@ -991,6 +1011,43 @@ mod tests {
assert_ne!(timeout, workflow);
}
+ #[test]
+ fn test_cancelled_constructor_category_display_and_eq() {
+ let err = CanoError::cancelled();
+ assert_eq!(err.message(), "workflow cancelled");
+ assert_eq!(err.category(), "cancelled");
+ assert_eq!(err.outer_category(), "cancelled");
+ assert_eq!(format!("{err}"), "Workflow cancelled");
+ assert_eq!(CanoError::cancelled(), CanoError::Cancelled);
+ assert_ne!(CanoError::cancelled(), CanoError::timeout("x"));
+ }
+
+ #[test]
+ fn test_cancelled_wrapped_in_state_context() {
+ // How a cancel actually surfaces from orchestrate: wrapped with FSM context.
+ let wrapped = CanoError::with_state_context(
+ "Ship",
+ 0,
+ vec!["Reserve".into(), "Ship".into()],
+ CanoError::cancelled(),
+ );
+ // `category()` unwraps `WithStateContext` so alerting still buckets on the cause.
+ assert_eq!(wrapped.category(), "cancelled");
+ assert_eq!(wrapped.outer_category(), "with_state_context");
+ assert!(matches!(wrapped.inner(), CanoError::Cancelled));
+ // A dirty rollback nests it under CompensationFailed with errors[0] = the wrapped cancel.
+ let dirty = CanoError::compensation_failed(vec![
+ wrapped,
+ CanoError::task_execution("compensator boom"),
+ ]);
+ assert_eq!(dirty.category(), "compensation_failed");
+ if let CanoError::CompensationFailed { errors } = &dirty {
+ assert_eq!(errors[0].category(), "cancelled");
+ } else {
+ panic!("expected CompensationFailed");
+ }
+ }
+
#[test]
fn test_circuit_open_constructor_and_category() {
let err = CanoError::circuit_open("breaker tripped");
diff --git a/cano/src/lib.rs b/cano/src/lib.rs
index f08514c..3fbc78b 100644
--- a/cano/src/lib.rs
+++ b/cano/src/lib.rs
@@ -57,7 +57,7 @@
//! .register(Step::Process, ProcessTask)
//! .add_exit_state(Step::Done);
//!
-//! let final_state = workflow.orchestrate(Step::Fetch).await?;
+//! let final_state = workflow.orchestrate(Step::Fetch, CancellationToken::disabled()).await?;
//! assert_eq!(final_state, Step::Done);
//!
//! // The sum of 1..=3 is 6.
@@ -95,7 +95,7 @@
//! .register(Step::Compute, ComputeTask)
//! .add_exit_state(Step::Done);
//!
-//! let final_state = workflow.orchestrate(Step::Compute).await?;
+//! let final_state = workflow.orchestrate(Step::Compute, CancellationToken::disabled()).await?;
//! assert_eq!(final_state, Step::Done);
//! # Ok(())
//! # }
@@ -199,13 +199,25 @@
//!
//! ### Timeouts
//!
-//! Three layered budgets bound a run. [`TaskConfig::with_attempt_timeout`](task::TaskConfig::with_attempt_timeout)
+//! Two layered budgets bound a run. [`TaskConfig::with_attempt_timeout`](task::TaskConfig::with_attempt_timeout)
//! caps each individual task attempt. [`Workflow::with_total_timeout`] sets a wall-clock
//! budget for the entire [`orchestrate`](Workflow::orchestrate) / [`resume_from`](Workflow::resume_from)
//! call; when it elapses the in-flight task is aborted, the saga compensation stack drains
//! against its own bounded budget (configurable via [`Workflow::with_compensation_timeout`]),
-//! and the call returns [`CanoError::WorkflowTimeout`]. Contrast with [`Workflow::with_timeout`],
-//! a blunt outer `tokio::time::timeout` that offers no graceful compensation.
+//! and the call returns [`CanoError::WorkflowTimeout`]. To stop a run on an external signal rather
+//! than a deadline, use [cooperative cancellation](#cooperative-cancellation).
+//!
+//! ### Cooperative cancellation
+//!
+//! [`Workflow::orchestrate`] (and [`resume_from`](Workflow::resume_from))
+//! take a [`CancellationToken`] obtained from [`CancellationToken::new`]; firing the paired
+//! [`CancellationHandle`] aborts the in-flight cancellable task at its next `.await`, drains the
+//! saga compensation stack, and returns [`CanoError::Cancelled`]. Cancellation is *cooperative*
+//! (a task in tight synchronous work isn't interrupted until it yields) and *saga-safe* (a
+//! [`CompensatableTask`] always runs to completion so its rollback entry is recorded; the cancel
+//! is honoured at the next state boundary). The compensation drain itself is uncancellable.
+//! To opt a run out of cancellation, pass [`CancellationToken::disabled`] — it never fires and is
+//! zero-cost (the FSM skips the cancellation `select!`). See the [`cancel`] module.
//!
//! ## Module Overview
//!
@@ -215,6 +227,7 @@
//! - [`task::timer`]: The [`TimerTask`] trait — wait-then-transition via `wait()`/`after_wait()`; registered with [`Workflow::register`]
//! - [`task::batch`]: The [`BatchTask`] trait — fan-out over data items via `load`/`process_item`/`finish`; registered with [`Workflow::register`]
//! - [`task::stepped`]: The [`SteppedTask`] trait — resumable iterative work via `step()` with a serializable cursor; registered with [`Workflow::register_stepped`] (persists the cursor when a checkpoint store is attached)
+//! - [`cancel`]: [`CancellationToken`] / [`CancellationHandle`] — cooperative cancellation for [`orchestrate`](Workflow::orchestrate)
//! - [`workflow`]: [`Workflow`] — FSM orchestration with Split/Join support
//! - `scheduler` (requires `scheduler` feature): `Scheduler` (builder) and `RunningScheduler` (live handle) — cron and interval scheduling
//! - [`mod@resource`]: [`Resource`] trait, [`Resources`] dictionary, and [`HealthStatus`] — lifecycle-aware resource management and health probes
@@ -240,6 +253,7 @@
//! 2. Read the module docs — each module has detailed documentation and examples
//! 3. Run benchmarks: `cargo bench --bench workflow_performance`
+pub mod cancel;
pub mod circuit;
pub mod error;
pub mod observer;
@@ -261,6 +275,7 @@ pub mod scheduler;
pub mod testing;
// Core public API - simplified imports
+pub use cancel::{CancellationHandle, CancellationToken};
pub use circuit::{CircuitBreaker, CircuitPolicy, CircuitState, Permit as CircuitPermit};
pub use error::{CanoError, CanoResult};
pub use observer::WorkflowObserver;
@@ -353,14 +368,14 @@ pub mod prelude {
//! Use `use cano::prelude::*;` to import the most commonly used types and traits.
pub use crate::{
- BatchTask, CanoError, CanoResult, CheckpointRow, CheckpointStore, CircuitBreaker,
- CircuitPermit, CircuitPolicy, CircuitState, CompensatableTask, HealthStatus, JoinConfig,
- JoinStrategy, MemoryStore, Meter, MeterStatus, MultiPermit, MultiRateLimiter,
- PollErrorPolicy, PollOutcome, PollTask, RateLimiter, RateLimiterPermit, RateLimiterPolicy,
- Reservation, Resource, Resources, RetryMode, RouterTask, RowKind, SplitResult,
- SplitTaskResult, StateEntry, StepOutcome, SteppedTask, Task, TaskConfig, TaskObject,
- TaskResult, Tier, TimerOutcome, TimerTask, WindowPermit, WindowPolicy, WindowedRateLimiter,
- Workflow, WorkflowObserver, run_stepped,
+ BatchTask, CancellationHandle, CancellationToken, CanoError, CanoResult, CheckpointRow,
+ CheckpointStore, CircuitBreaker, CircuitPermit, CircuitPolicy, CircuitState,
+ CompensatableTask, HealthStatus, JoinConfig, JoinStrategy, MemoryStore, Meter, MeterStatus,
+ MultiPermit, MultiRateLimiter, PollErrorPolicy, PollOutcome, PollTask, RateLimiter,
+ RateLimiterPermit, RateLimiterPolicy, Reservation, Resource, Resources, RetryMode,
+ RouterTask, RowKind, SplitResult, SplitTaskResult, StateEntry, StepOutcome, SteppedTask,
+ Task, TaskConfig, TaskObject, TaskResult, Tier, TimerOutcome, TimerTask, WindowPermit,
+ WindowPolicy, WindowedRateLimiter, Workflow, WorkflowObserver, run_stepped,
};
#[cfg(feature = "scheduler")]
diff --git a/cano/src/metrics.rs b/cano/src/metrics.rs
index c08098d..60a1b1c 100644
--- a/cano/src/metrics.rs
+++ b/cano/src/metrics.rs
@@ -102,6 +102,7 @@ pub const OBSERVED_WORKFLOW_TIMEOUT_LIMIT_SECONDS: &str =
pub const OBSERVED_WORKFLOW_TIMEOUT_ELAPSED_SECONDS: &str =
"cano_observed_workflow_timeout_elapsed_seconds";
pub const OBSERVED_UNKNOWN_RESUME_STATES_TOTAL: &str = "cano_observed_unknown_resume_states_total";
+pub const OBSERVED_CANCELLATIONS_TOTAL: &str = "cano_observed_cancellations_total";
// Always-on direct instrumentation:
pub const WORKFLOW_RUNS_TOTAL: &str = "cano_workflow_runs_total";
@@ -194,11 +195,16 @@ pub fn describe() {
Unit::Count,
"Checkpoint rows whose state label is not registered on the current workflow (emitted by MetricsObserver during resume_from)"
);
+ describe_counter!(
+ OBSERVED_CANCELLATIONS_TOTAL,
+ Unit::Count,
+ "Workflow runs cancelled via a CancellationToken (emitted by MetricsObserver via on_cancelled)"
+ );
describe_counter!(
WORKFLOW_RUNS_TOTAL,
Unit::Count,
- "Workflow runs (via Workflow::orchestrate/resume_from), by terminal outcome (completed|failed|timeout)"
+ "Workflow runs (via Workflow::orchestrate/resume_from), by terminal outcome (completed|failed)"
);
describe_histogram!(
WORKFLOW_DURATION_SECONDS,
@@ -402,6 +408,9 @@ pub(crate) fn observed_workflow_timeout(elapsed: Duration, limit: Duration) {
pub(crate) fn observed_unknown_resume_state() {
counter!(OBSERVED_UNKNOWN_RESUME_STATES_TOTAL).increment(1);
}
+pub(crate) fn observed_cancellation() {
+ counter!(OBSERVED_CANCELLATIONS_TOTAL).increment(1);
+}
// ----- workflow run -----
diff --git a/cano/src/observer.rs b/cano/src/observer.rs
index e32d6e0..6099b65 100644
--- a/cano/src/observer.rs
+++ b/cano/src/observer.rs
@@ -107,6 +107,15 @@ pub trait WorkflowObserver: Send + Sync + 'static {
/// wrapped timeout (dirty rollback).
fn on_workflow_timeout(&self, _elapsed: std::time::Duration, _limit: std::time::Duration) {}
+ /// Called when a run is cancelled via a
+ /// [`CancellationToken`](crate::cancel::CancellationToken) — either observed at a state
+ /// boundary or while a cancellable task was in flight. `state` is the `Debug` rendering of the
+ /// state the cancellation was observed at. Fires exactly once per cancelled run, immediately
+ /// before the compensation stack is drained. Followed on the public API's return by a
+ /// `CanoError::WithStateContext` wrapping a `CanoError::Cancelled` (clean rollback), or a
+ /// `CanoError::CompensationFailed` whose `errors[0]` is the wrapped `Cancelled` (dirty rollback).
+ fn on_cancelled(&self, _state: &str) {}
+
/// Called when the engine attempted to clear a checkpoint log (after a
/// successful run or after a clean compensation drain) and the backend
/// returned an error.
@@ -236,6 +245,9 @@ impl WorkflowObserver for TracingObserver {
"workflow total timeout exceeded"
);
}
+ fn on_cancelled(&self, state: &str) {
+ tracing::warn!(state, "workflow cancelled");
+ }
fn on_checkpoint_clear_failed(&self, workflow_id: &str, error: &CanoError) {
tracing::warn!(workflow_id, error = %error, "checkpoint log clear failed");
}
@@ -297,6 +309,9 @@ impl WorkflowObserver for MetricsObserver {
fn on_workflow_timeout(&self, elapsed: std::time::Duration, limit: std::time::Duration) {
crate::metrics::observed_workflow_timeout(elapsed, limit);
}
+ fn on_cancelled(&self, _state: &str) {
+ crate::metrics::observed_cancellation();
+ }
fn on_checkpoint_clear_failed(&self, _workflow_id: &str, _error: &CanoError) {
crate::metrics::checkpoint_clear(false);
}
@@ -382,7 +397,13 @@ mod tests {
.add_exit_state(S::Done)
.with_observer(Arc::new(obs));
- assert_eq!(workflow.orchestrate(S::Start).await.unwrap(), S::Done);
+ assert_eq!(
+ workflow
+ .orchestrate(S::Start, CancellationToken::disabled())
+ .await
+ .unwrap(),
+ S::Done
+ );
let events = rec.labels();
assert!(
@@ -415,7 +436,12 @@ mod tests {
.add_exit_state(S::Done)
.with_observer(Arc::new(obs));
- assert!(workflow.orchestrate(S::Start).await.is_err());
+ assert!(
+ workflow
+ .orchestrate(S::Start, CancellationToken::disabled())
+ .await
+ .is_err()
+ );
let events = rec.labels();
assert!(
@@ -463,7 +489,10 @@ mod tests {
.add_exit_state(S::Done)
.with_observer(Arc::new(obs));
- let err = workflow.orchestrate(S::Start).await.unwrap_err();
+ let err = workflow
+ .orchestrate(S::Start, CancellationToken::disabled())
+ .await
+ .unwrap_err();
// The FSM wraps the failure with state context; the inner is CircuitOpen.
assert!(matches!(err.inner(), CanoError::CircuitOpen(_)), "{err}");
@@ -484,7 +513,13 @@ mod tests {
let workflow = Workflow::bare()
.register(S::Start, OkTask)
.add_exit_state(S::Done);
- assert_eq!(workflow.orchestrate(S::Start).await.unwrap(), S::Done);
+ assert_eq!(
+ workflow
+ .orchestrate(S::Start, CancellationToken::disabled())
+ .await
+ .unwrap(),
+ S::Done
+ );
}
#[test]
@@ -580,7 +615,7 @@ mod metrics_observer_tests {
.register(S::Start, GoTo(S::Mid))
.register(S::Mid, GoTo(S::Done))
.add_exit_state(S::Done)
- .orchestrate(S::Start)
+ .orchestrate(S::Start, CancellationToken::disabled())
.await
});
assert_eq!(res.unwrap(), S::Done);
@@ -633,7 +668,7 @@ mod metrics_observer_tests {
.with_total_timeout(std::time::Duration::from_millis(20))
.register(S::Start, SlowTask)
.add_exit_state(S::Done)
- .orchestrate(S::Start)
+ .orchestrate(S::Start, CancellationToken::disabled())
.await
});
assert!(res.is_err());
@@ -681,7 +716,7 @@ mod metrics_observer_tests {
.with_observer(Arc::new(MetricsObserver::new()))
.register(S::Start, Flaky(n2))
.add_exit_state(S::Done)
- .orchestrate(S::Start)
+ .orchestrate(S::Start, CancellationToken::disabled())
.await
});
assert_eq!(res.unwrap(), S::Done);
diff --git a/cano/src/scheduler.rs b/cano/src/scheduler.rs
index 560db9f..6433bce 100644
--- a/cano/src/scheduler.rs
+++ b/cano/src/scheduler.rs
@@ -30,6 +30,7 @@ mod backoff;
pub use backoff::BackoffPolicy;
+use crate::cancel::CancellationHandle;
use crate::error::CanoResult;
use crate::workflow::Workflow;
use chrono::{DateTime, Utc};
@@ -53,6 +54,12 @@ enum SchedulerCommand {
id: Arc,
response: oneshot::Sender>,
},
+ /// Request cooperative cancellation of a flow's in-flight run. A no-op when
+ /// the flow isn't currently running.
+ Cancel {
+ id: Arc,
+ response: oneshot::Sender>,
+ },
}
/// Simplified scheduling options
@@ -134,6 +141,11 @@ where
schedule: ParsedSchedule,
info: Arc>,
policy: Arc,
+ /// Cancellation handle for the flow's *currently executing* run, published
+ /// by `execute_reserved_flow` while a run is in flight and cleared when it
+ /// finishes. `None` when the flow is idle. Lets `cancel_flow` and graceful
+ /// shutdown cooperatively cancel an in-flight run.
+ cancel: Arc>>,
}
impl Clone for FlowData
@@ -148,6 +160,7 @@ where
schedule: self.schedule.clone(),
info: Arc::clone(&self.info),
policy: self.policy.clone(),
+ cancel: Arc::clone(&self.cancel),
}
}
}
diff --git a/cano/src/scheduler/builder.rs b/cano/src/scheduler/builder.rs
index 6378fd6..e9a2f29 100644
--- a/cano/src/scheduler/builder.rs
+++ b/cano/src/scheduler/builder.rs
@@ -184,6 +184,7 @@ where
schedule,
info,
policy: Arc::new(BackoffPolicy::default()),
+ cancel: Arc::new(RwLock::new(None)),
},
);
self.flow_order.push(id);
@@ -323,6 +324,7 @@ where
let initial_state = fd.initial_state.clone();
let info = Arc::clone(&fd.info);
let policy = fd.policy.clone();
+ let cancel = Arc::clone(&fd.cancel);
let running_clone = Arc::clone(&running);
let notify_clone = Arc::clone(&stop_notify);
@@ -334,6 +336,7 @@ where
initial_state,
info,
policy,
+ cancel,
running_clone,
notify_clone,
interval,
@@ -347,6 +350,7 @@ where
initial_state,
info,
policy,
+ cancel,
running_clone,
notify_clone,
cron_schedule,
diff --git a/cano/src/scheduler/loops.rs b/cano/src/scheduler/loops.rs
index 6df6dff..bd5af6d 100644
--- a/cano/src/scheduler/loops.rs
+++ b/cano/src/scheduler/loops.rs
@@ -13,6 +13,7 @@ use tokio::sync::{Notify, RwLock, mpsc, watch};
use tokio::task::{AbortHandle, JoinHandle};
use tokio::time::{Duration, sleep};
+use crate::cancel::{CancellationHandle, CancellationToken};
use crate::error::{CanoError, CanoResult};
use crate::workflow::Workflow;
@@ -73,11 +74,13 @@ async fn sleep_unless_stopped(
/// Per-flow `Every`-schedule loop body. Lives outside `start` so the driver
/// task and the loops are decoupled — the driver owns the workflows
/// HashMap, the loops just see the data they need.
+#[allow(clippy::too_many_arguments)]
pub(super) async fn spawn_every_loop(
workflow: Arc>,
initial_state: TState,
info: Arc>,
policy: Arc,
+ cancel: Arc>>,
running: Arc>,
stop_notify: Arc,
interval: Duration,
@@ -98,6 +101,7 @@ pub(super) async fn spawn_every_loop(
initial_state.clone(),
Arc::clone(&info),
&policy,
+ Arc::clone(&cancel),
)
.await;
}
@@ -134,6 +138,7 @@ pub(super) async fn spawn_every_loop(
initial_state.clone(),
Arc::clone(&info),
&policy,
+ Arc::clone(&cancel),
)
.await;
}
@@ -141,11 +146,13 @@ pub(super) async fn spawn_every_loop(
/// Per-flow `Cron`-schedule loop body. See [`spawn_every_loop`] for the
/// rationale on splitting the loop bodies out of `start`.
+#[allow(clippy::too_many_arguments)]
pub(super) async fn spawn_cron_loop(
workflow: Arc>,
initial_state: TState,
info: Arc>,
policy: Arc,
+ cancel: Arc>>,
running: Arc>,
stop_notify: Arc,
schedule: Box,
@@ -208,6 +215,7 @@ pub(super) async fn spawn_cron_loop(
initial_state.clone(),
Arc::clone(&info),
&policy,
+ Arc::clone(&cancel),
)
.await;
}
@@ -258,8 +266,16 @@ pub(super) async fn driver_task(
let initial_state = flow.initial_state.clone();
let info = Arc::clone(&flow.info);
let policy = Arc::clone(&flow.policy);
+ let cancel = Arc::clone(&flow.cancel);
let handle = tokio::spawn(async move {
- execute_reserved_flow(workflow, initial_state, info, &policy).await;
+ execute_reserved_flow(
+ workflow,
+ initial_state,
+ info,
+ &policy,
+ cancel,
+ )
+ .await;
});
let mut tasks = scheduler_tasks.write().await;
tasks.retain(|h| !h.is_finished());
@@ -299,6 +315,24 @@ pub(super) async fn driver_task(
)))
};
+ let _ = response.send(outcome);
+ }
+ SchedulerCommand::Cancel { id, response } => {
+ let outcome = if let Some(flow) = workflows.get(&id) {
+ // Fire the in-flight run's cancellation handle, if any. The
+ // run observes `Cancelled` at its next await, drains its saga,
+ // and `apply_outcome` returns the flow to `Idle`. A flow that
+ // isn't currently running has no handle — an idempotent no-op.
+ if let Some(h) = flow.cancel.read().await.as_ref() {
+ h.cancel();
+ }
+ Ok(())
+ } else {
+ Err(CanoError::Workflow(format!(
+ "No workflow registered with id '{id}'"
+ )))
+ };
+
let _ = response.send(outcome);
}
}
@@ -313,6 +347,17 @@ pub(super) async fn driver_task(
// how long an in-flight workflow takes — not by the schedule interval.
stop_notify.notify_waiters();
+ // Cooperatively cancel every in-flight run so shutdown latency is bounded by
+ // the time to the next await + the saga drain, not by how long the workflow
+ // would naturally take. Each cancelled run drains its compensation stack and
+ // returns `Cancelled` (recorded as Idle, not a failure, by `apply_outcome`).
+ // The bounded wait below still caps the total drain time.
+ for flow in workflows.values() {
+ if let Some(h) = flow.cancel.read().await.as_ref() {
+ h.cancel();
+ }
+ }
+
// Wait for all scheduler loop tasks to finish.
//
// Pop with a short-lived write lock per iteration (rather than holding
@@ -387,6 +432,7 @@ async fn execute_flow(
initial_state: TState,
info: Arc>,
policy: &BackoffPolicy,
+ cancel: Arc>>,
) where
TState: Clone + Send + Sync + 'static + std::fmt::Debug + std::hash::Hash + Eq,
TResourceKey: Hash + Eq + Send + Sync + 'static,
@@ -398,7 +444,7 @@ async fn execute_flow(
return;
}
- execute_reserved_flow(workflow, initial_state, info, policy).await;
+ execute_reserved_flow(workflow, initial_state, info, policy, cancel).await;
}
/// Result of attempting to reserve a flow for dispatch. The Tripped and
@@ -434,6 +480,7 @@ async fn execute_reserved_flow(
initial_state: TState,
info: Arc>,
policy: &BackoffPolicy,
+ cancel: Arc>>,
) where
TState: Clone + Send + Sync + 'static + std::fmt::Debug + std::hash::Hash + Eq,
TResourceKey: Hash + Eq + Send + Sync + 'static,
@@ -455,6 +502,13 @@ async fn execute_reserved_flow(
.total_timeout
.map(|d| (std::time::Instant::now(), d));
+ // Publish a fresh cancellation handle for this run so `cancel_flow` and
+ // graceful shutdown can cooperatively stop it (and drain its saga). A fresh
+ // token per run means cancelling one run never poisons a later one. Cleared
+ // below once the run finishes, so a `cancel_flow` on an idle flow is a no-op.
+ let (handle, token) = CancellationToken::new();
+ *cancel.write().await = Some(handle);
+
// Wrap the workflow future in `catch_unwind`. A panic inside any path
// that bypasses the FSM's own `catch_unwind` (e.g. an observer that
// panics, a custom checkpoint store that panics) would otherwise abort
@@ -465,10 +519,10 @@ async fn execute_reserved_flow(
// `BackoffPolicy`.
#[cfg(feature = "tracing")]
let workflow_fut = workflow
- .execute_workflow(initial_state, total_budget)
+ .execute_workflow(initial_state, total_budget, token)
.instrument(tracing::info_span!("execute_flow"));
#[cfg(not(feature = "tracing"))]
- let workflow_fut = workflow.execute_workflow(initial_state, total_budget);
+ let workflow_fut = workflow.execute_workflow(initial_state, total_budget, token);
let result = match AssertUnwindSafe(workflow_fut).catch_unwind().await {
Ok(inner) => inner,
@@ -482,6 +536,10 @@ async fn execute_reserved_flow(
}
};
+ // The run is over: drop the handle so a later `cancel_flow` on this now-idle
+ // flow is a clean no-op rather than firing a stale token.
+ *cancel.write().await = None;
+
#[cfg(feature = "metrics")]
crate::metrics::scheduler_flow_run(&_flow_id, result.is_ok(), _started.elapsed());
@@ -504,6 +562,14 @@ async fn apply_outcome(
info_guard.failure_streak = 0;
info_guard.next_eligible = None;
}
+ // A deliberate cancellation (via `cancel_flow` or graceful shutdown) is
+ // not a fault: return the flow to `Idle` without touching the failure
+ // streak or backoff window, so its next scheduled run fires normally. A
+ // *dirty* cancel whose rollback itself failed surfaces as
+ // `compensation_failed`, which falls through to the backoff arm below.
+ Err(ref e) if e.category() == "cancelled" => {
+ info_guard.status = Status::Idle;
+ }
Err(e) => {
let err_str: Arc = Arc::from(e.to_string());
let new_streak = info_guard.failure_streak.saturating_add(1);
diff --git a/cano/src/scheduler/running.rs b/cano/src/scheduler/running.rs
index bdd6e0f..91d4d14 100644
--- a/cano/src/scheduler/running.rs
+++ b/cano/src/scheduler/running.rs
@@ -210,6 +210,44 @@ where
})?
}
+ /// Request cooperative cancellation of a flow's in-flight run.
+ ///
+ /// Fires the run's [`CancellationToken`](crate::cancel::CancellationToken):
+ /// the in-flight workflow aborts at its next await point, drains its saga
+ /// compensation stack, and returns [`CanoError::Cancelled`]. The flow then
+ /// returns to [`Status::Idle`](crate::scheduler::Status::Idle) — a deliberate
+ /// cancel is **not** counted as a failure against the [`BackoffPolicy`](crate::scheduler::BackoffPolicy),
+ /// so the next scheduled run fires normally.
+ ///
+ /// A **no-op** (returns `Ok`) when the flow exists but isn't currently
+ /// running. Graceful [`stop`](Self::stop) cancels every in-flight flow this
+ /// same way before draining.
+ ///
+ /// # Errors
+ ///
+ /// - [`CanoError::Workflow`] — the scheduler is not running, `id` is unknown,
+ /// or the command queue is full.
+ pub async fn cancel_flow(&self, id: &str) -> CanoResult<()> {
+ let (response_tx, response_rx) = oneshot::channel();
+ self.command_tx
+ .try_send(SchedulerCommand::Cancel {
+ id: Arc::from(id),
+ response: response_tx,
+ })
+ .map_err(|e| match e {
+ mpsc::error::TrySendError::Closed(_) => CanoError::Workflow(
+ "Scheduler not running — call start() before cancel_flow()".to_string(),
+ ),
+ mpsc::error::TrySendError::Full(_) => {
+ CanoError::Workflow("Scheduler command queue full".to_string())
+ }
+ })?;
+
+ response_rx.await.map_err(|_| {
+ CanoError::Workflow("Scheduler stopped before cancel was processed".to_string())
+ })?
+ }
+
/// Get a snapshot of the workflow status.
pub async fn status(&self, id: &str) -> Option {
let info = self.flows.get(id)?;
@@ -722,74 +760,6 @@ mod tests {
assert!(result.is_ok(), "Test timed out");
}
- #[tokio::test(flavor = "multi_thread")]
- async fn test_trigger_during_graceful_shutdown_window_reports_not_running() {
- // While the driver task is parked waiting for a slow in-flight workflow
- // to finish, a concurrent trigger() must surface "not running" instead
- // of enqueueing into the closed command channel.
- #[derive(Clone)]
- struct SlowTask;
-
- #[task]
- impl Task for SlowTask {
- async fn run_bare(&self) -> Result, CanoError> {
- // Hold Status::Running long enough to span the shutdown window.
- sleep(Duration::from_millis(400)).await;
- Ok(TaskResult::Single(TestState::Complete))
- }
- }
-
- let timeout = Duration::from_secs(5);
- let result = tokio::time::timeout(timeout, async {
- let mut scheduler: Scheduler = Scheduler::::new();
- let slow_workflow = Workflow::bare()
- .register(TestState::Start, SlowTask)
- .add_exit_state(TestState::Complete)
- .add_exit_state(TestState::Error);
- scheduler
- .manual("slow_task", slow_workflow, TestState::Start)
- .unwrap();
-
- let running = scheduler.start().await.unwrap();
- let probe = running.clone();
-
- // Kick off the slow workflow and wait until it is actually Running.
- probe.trigger("slow_task").await.unwrap();
- sleep(Duration::from_millis(50)).await;
- assert!(
- probe.has_running_flows().await,
- "slow workflow should be Running before stop()"
- );
-
- // Spawn stop() so we can probe the shutdown window concurrently.
- let stop_handle = tokio::spawn(async move { running.stop().await });
-
- // Let the driver dequeue Stop and close the command channel. The
- // slow workflow is still running (~400ms total), so the driver is
- // parked inside has_running_flows() — the shutdown window we want
- // to probe.
- sleep(Duration::from_millis(50)).await;
- assert!(
- !stop_handle.is_finished(),
- "stop() must still be parked while the slow workflow is in flight"
- );
-
- // During the window, trigger() must report not-running.
- let err = probe.trigger("slow_task").await.unwrap_err();
- assert!(
- err.to_string().contains("Scheduler not running"),
- "expected not-running during shutdown window, got: {err}"
- );
-
- // stop() eventually returns Ok (teardown finishes).
- let stop_result = stop_handle.await.expect("stop task should not panic");
- stop_result.expect("stop should succeed once slow workflow finishes");
- })
- .await;
-
- assert!(result.is_ok(), "Test timed out");
- }
-
#[tokio::test(flavor = "multi_thread")]
async fn test_failed_workflow_registration() {
// Registering a "failing" workflow (one whose post() returns Err) is a
@@ -1534,97 +1504,229 @@ mod tests {
assert!(result.is_ok(), "Test timed out");
}
- #[tokio::test(flavor = "multi_thread")]
- async fn drop_aborts_wedged_handle_currently_being_awaited_by_driver() {
- // Regression for F9: when the driver pops a JoinHandle from
- // `scheduler_tasks` and awaits it, the popped handle no longer lives
- // in the Vec. A `Drop` firing while the await is in flight previously
- // aborted `driver_handle` (cancelling the driver future, which then
- // dropped the popped JoinHandle — detaching the underlying task
- // instead of aborting it). The wedged task leaked indefinitely.
- //
- // Now `RunningScheduler::in_flight_drain` holds the popped handle's
- // `AbortHandle` for the duration of the await, so Drop can reach the
- // wedged task. This test triggers a workflow whose task sleeps for
- // far longer than the test's tolerance, stops the scheduler so the
- // driver enters its drain phase, drops the last clone, and asserts
- // that the workflow's completion counter never advances.
- use std::sync::atomic::{AtomicUsize, Ordering};
-
- #[derive(Clone)]
- struct SlowTask {
- completions: Arc,
+ // A long-running, cancellable flow task that records when it starts and (if
+ // never cancelled) when it completes — used to verify graceful shutdown
+ // cooperatively cancels in-flight flows.
+ #[derive(Clone)]
+ struct CancellableSlow {
+ started: std::sync::Arc,
+ completed: std::sync::Arc,
+ }
+ #[task]
+ impl Task for CancellableSlow {
+ fn config(&self) -> crate::task::TaskConfig {
+ crate::task::TaskConfig::minimal()
}
- #[task]
- impl Task for SlowTask {
- fn config(&self) -> crate::task::TaskConfig {
- crate::task::TaskConfig::minimal()
- }
- async fn run_bare(&self) -> Result, CanoError> {
- // Sleeps far longer than the test tolerance. If the abort
- // doesn't reach this task, the counter eventually ticks up.
- sleep(Duration::from_secs(30)).await;
- self.completions.fetch_add(1, Ordering::SeqCst);
- Ok(TaskResult::Single(TestState::Complete))
- }
+ async fn run_bare(&self) -> Result, CanoError> {
+ self.started.fetch_add(1, Ordering::SeqCst);
+ sleep(Duration::from_secs(30)).await;
+ self.completed.fetch_add(1, Ordering::SeqCst);
+ Ok(TaskResult::Single(TestState::Complete))
}
+ }
- let timeout = Duration::from_secs(8);
- let result = tokio::time::timeout(timeout, async {
- let completions = Arc::new(AtomicUsize::new(0));
+ #[tokio::test(flavor = "multi_thread")]
+ async fn graceful_stop_cancels_in_flight_flow() {
+ // Graceful shutdown cooperatively cancels a running flow instead of
+ // blocking until it finishes: `stop()` returns promptly (not after the
+ // task's 30s sleep) and the task never reaches completion.
+ let result = tokio::time::timeout(Duration::from_secs(5), async {
+ let started = std::sync::Arc::new(AtomicU32::new(0));
+ let completed = std::sync::Arc::new(AtomicU32::new(0));
let mut scheduler: Scheduler = Scheduler::new();
- scheduler
- .manual(
- "wedged",
- Workflow::bare()
- .register(
- TestState::Start,
- SlowTask {
- completions: Arc::clone(&completions),
- },
- )
- .add_exit_state(TestState::Complete)
- .add_exit_state(TestState::Error),
+ let wf = Workflow::bare()
+ .register(
TestState::Start,
+ CancellableSlow {
+ started: started.clone(),
+ completed: completed.clone(),
+ },
)
- .unwrap();
+ .add_exit_state(TestState::Complete)
+ .add_exit_state(TestState::Error);
+ scheduler.manual("slow", wf, TestState::Start).unwrap();
let running = scheduler.start().await.unwrap();
- running.trigger("wedged").await.unwrap();
- // Give the spawn time to land in scheduler_tasks.
- sleep(Duration::from_millis(100)).await;
+ running.trigger("slow").await.unwrap();
+ // Wait until the flow is actually in flight.
+ while started.load(Ordering::SeqCst) == 0 {
+ sleep(Duration::from_millis(5)).await;
+ }
+ assert!(running.has_running_flows().await);
+
+ let t0 = std::time::Instant::now();
+ running.stop().await.expect("graceful stop should succeed");
+ assert!(
+ t0.elapsed() < Duration::from_secs(5),
+ "stop() must cancel the in-flight flow, not wait for its 30s sleep"
+ );
+ assert_eq!(
+ completed.load(Ordering::SeqCst),
+ 0,
+ "the in-flight flow must be cancelled, not run to completion"
+ );
+ })
+ .await;
+ assert!(result.is_ok(), "Test timed out");
+ }
- // Spawn stop() so the driver advances into its drain loop and
- // pops the wedged Trigger handle. stop() will not return because
- // the awaited handle is sleeping for 30s.
- let running_for_stop = running.clone();
- let stop_handle = tokio::spawn(async move { running_for_stop.stop().await });
+ #[tokio::test(flavor = "multi_thread")]
+ async fn trigger_after_graceful_stop_reports_not_running() {
+ // Once graceful shutdown has run, the command channel is closed, so a
+ // subsequent trigger() reports "not running" rather than enqueueing.
+ let result = tokio::time::timeout(Duration::from_secs(5), async {
+ let started = std::sync::Arc::new(AtomicU32::new(0));
+ let completed = std::sync::Arc::new(AtomicU32::new(0));
+ let mut scheduler: Scheduler = Scheduler::new();
+ let wf = Workflow::bare()
+ .register(
+ TestState::Start,
+ CancellableSlow {
+ started: started.clone(),
+ completed: completed.clone(),
+ },
+ )
+ .add_exit_state(TestState::Complete)
+ .add_exit_state(TestState::Error);
+ scheduler.manual("slow", wf, TestState::Start).unwrap();
- // Let the driver actually enter the drain phase and pop the handle.
- sleep(Duration::from_millis(200)).await;
+ let running = scheduler.start().await.unwrap();
+ running.trigger("slow").await.unwrap();
+ while started.load(Ordering::SeqCst) == 0 {
+ sleep(Duration::from_millis(5)).await;
+ }
+ running.stop().await.expect("graceful stop should succeed");
+
+ let err = running.trigger("slow").await.unwrap_err();
assert!(
- !stop_handle.is_finished(),
- "stop() should still be parked while the wedged trigger handle is in flight"
+ err.to_string().contains("Scheduler not running"),
+ "trigger after shutdown must report not-running, got: {err}"
);
+ })
+ .await;
+ assert!(result.is_ok(), "Test timed out");
+ }
- // Drop every clone — the in_flight_drain slot's AbortHandle must
- // be used to abort the popped, in-flight handle.
- drop(stop_handle.abort_handle());
- stop_handle.abort();
- drop(running);
+ #[tokio::test(flavor = "multi_thread")]
+ async fn cancel_flow_cancels_in_flight_run_and_returns_to_idle() {
+ // `cancel_flow` cooperatively cancels the in-flight run; the flow returns
+ // to Idle (a deliberate cancel is NOT a failure, so the streak stays 0 and
+ // the flow does not trip) and the task never completes.
+ let result = tokio::time::timeout(Duration::from_secs(5), async {
+ let started = std::sync::Arc::new(AtomicU32::new(0));
+ let completed = std::sync::Arc::new(AtomicU32::new(0));
+ let mut scheduler: Scheduler = Scheduler::new();
+ let wf = Workflow::bare()
+ .register(
+ TestState::Start,
+ CancellableSlow {
+ started: started.clone(),
+ completed: completed.clone(),
+ },
+ )
+ .add_exit_state(TestState::Complete)
+ .add_exit_state(TestState::Error);
+ scheduler.manual("slow", wf, TestState::Start).unwrap();
+
+ let running = scheduler.start().await.unwrap();
+ running.trigger("slow").await.unwrap();
+ while started.load(Ordering::SeqCst) == 0 {
+ sleep(Duration::from_millis(5)).await;
+ }
+
+ running
+ .cancel_flow("slow")
+ .await
+ .expect("cancel_flow should succeed");
- // Wait long enough that, if abort had failed, the slow task
- // could have advanced. With the fix, the task is aborted before
- // it can increment completions.
- sleep(Duration::from_secs(2)).await;
+ // Wait for the cancelled run's apply_outcome to settle the status.
+ loop {
+ let st = running.status("slow").await.unwrap().status;
+ if st != crate::scheduler::Status::Running {
+ break;
+ }
+ sleep(Duration::from_millis(5)).await;
+ }
+ let info = running.status("slow").await.unwrap();
+ assert_eq!(
+ info.status,
+ crate::scheduler::Status::Idle,
+ "a cancelled run returns to Idle"
+ );
+ assert_eq!(info.failure_streak, 0, "cancel must not count as a failure");
assert_eq!(
- completions.load(Ordering::SeqCst),
+ completed.load(Ordering::SeqCst),
0,
- "wedged spawn must have been aborted by Drop's in_flight_drain abort path"
+ "task was cancelled, not completed"
);
+ running.stop().await.unwrap();
})
.await;
+ assert!(result.is_ok(), "Test timed out");
+ }
+
+ #[tokio::test(flavor = "multi_thread")]
+ async fn cancel_flow_on_idle_flow_is_noop() {
+ // Cancelling a registered flow that isn't running is an idempotent no-op.
+ let result = tokio::time::timeout(Duration::from_secs(5), async {
+ let started = std::sync::Arc::new(AtomicU32::new(0));
+ let completed = std::sync::Arc::new(AtomicU32::new(0));
+ let mut scheduler: Scheduler = Scheduler::new();
+ let wf = Workflow::bare()
+ .register(
+ TestState::Start,
+ CancellableSlow {
+ started: started.clone(),
+ completed: completed.clone(),
+ },
+ )
+ .add_exit_state(TestState::Complete)
+ .add_exit_state(TestState::Error);
+ scheduler.manual("idle", wf, TestState::Start).unwrap();
+ let running = scheduler.start().await.unwrap();
+ // Never triggered → no in-flight run → cancel is a no-op Ok.
+ running
+ .cancel_flow("idle")
+ .await
+ .expect("cancel on idle flow is a no-op");
+ assert_eq!(
+ running.status("idle").await.unwrap().status,
+ crate::scheduler::Status::Idle
+ );
+ running.stop().await.unwrap();
+ })
+ .await;
+ assert!(result.is_ok(), "Test timed out");
+ }
+
+ #[tokio::test(flavor = "multi_thread")]
+ async fn cancel_flow_unknown_flow_errors() {
+ let result = tokio::time::timeout(Duration::from_secs(5), async {
+ let started = std::sync::Arc::new(AtomicU32::new(0));
+ let completed = std::sync::Arc::new(AtomicU32::new(0));
+ let mut scheduler: Scheduler = Scheduler::new();
+ let wf = Workflow::bare()
+ .register(
+ TestState::Start,
+ CancellableSlow {
+ started: started.clone(),
+ completed: completed.clone(),
+ },
+ )
+ .add_exit_state(TestState::Complete)
+ .add_exit_state(TestState::Error);
+ scheduler.manual("known", wf, TestState::Start).unwrap();
+
+ let running = scheduler.start().await.unwrap();
+ let err = running.cancel_flow("nope").await.unwrap_err();
+ assert!(
+ err.to_string().contains("No workflow registered"),
+ "unknown flow must error, got: {err}"
+ );
+ running.stop().await.unwrap();
+ })
+ .await;
assert!(result.is_ok(), "Test timed out");
}
diff --git a/cano/src/task.rs b/cano/src/task.rs
index 1ac6127..e2e7388 100644
--- a/cano/src/task.rs
+++ b/cano/src/task.rs
@@ -69,7 +69,7 @@
//! let result = Workflow::new(resources)
//! .register(Step::Fetch, FetchTask)
//! .add_exit_state(Step::Done)
-//! .orchestrate(Step::Fetch)
+//! .orchestrate(Step::Fetch, CancellationToken::disabled())
//! .await?;
//! assert_eq!(result, Step::Done);
//! # Ok(())
diff --git a/cano/src/task/batch.rs b/cano/src/task/batch.rs
index 505d284..1b6d581 100644
--- a/cano/src/task/batch.rs
+++ b/cano/src/task/batch.rs
@@ -72,7 +72,7 @@
//! .register(Step::Process, CsvProcessor)
//! .add_exit_state(Step::Done);
//!
-//! let result = workflow.orchestrate(Step::Process).await?;
+//! let result = workflow.orchestrate(Step::Process, CancellationToken::disabled()).await?;
//! assert_eq!(result, Step::Done);
//! # Ok(())
//! # }
@@ -119,7 +119,7 @@
//! let workflow = Workflow::bare()
//! .register(Step::Process, TolerantProcessor)
//! .add_exit_state(Step::Done);
-//! let result = workflow.orchestrate(Step::Process).await?;
+//! let result = workflow.orchestrate(Step::Process, CancellationToken::disabled()).await?;
//! assert_eq!(result, Step::Done);
//! # Ok(())
//! # }
@@ -468,6 +468,7 @@ pub type BatchTaskObject> =
#[cfg(test)]
mod tests {
use super::*;
+ use crate::cancel::CancellationToken;
use crate::resource::Resources;
use crate::task;
use crate::task::Task;
@@ -869,7 +870,10 @@ mod tests {
.register(Step::Process, IndexedBatch { n: 3 })
.add_exit_state(Step::Done);
- let result = workflow.orchestrate(Step::Process).await.unwrap();
+ let result = workflow
+ .orchestrate(Step::Process, CancellationToken::disabled())
+ .await
+ .unwrap();
assert_eq!(result, Step::Done);
}
@@ -892,6 +896,7 @@ mod tests {
#[cfg(all(test, feature = "metrics"))]
mod metrics_tests {
use super::*;
+ use crate::cancel::CancellationToken;
use crate::metrics::test_support::*;
use crate::task::Task;
use crate::workflow::Workflow;
@@ -939,7 +944,9 @@ mod metrics_tests {
let workflow = Workflow::bare()
.register(St::Process, ThreeItemBatch)
.add_exit_state(St::Done);
- workflow.orchestrate(St::Process).await
+ workflow
+ .orchestrate(St::Process, CancellationToken::disabled())
+ .await
});
assert!(result.is_ok(), "workflow should succeed: {result:?}");
assert_eq!(
diff --git a/cano/src/task/poll.rs b/cano/src/task/poll.rs
index 54a85c5..f0bf88b 100644
--- a/cano/src/task/poll.rs
+++ b/cano/src/task/poll.rs
@@ -47,7 +47,7 @@
//! .register(Step::Wait, counter)
//! .add_exit_state(Step::Done);
//!
-//! let result = workflow.orchestrate(Step::Wait).await?;
+//! let result = workflow.orchestrate(Step::Wait, CancellationToken::disabled()).await?;
//! assert_eq!(result, Step::Done);
//! # Ok(())
//! # }
@@ -109,7 +109,7 @@
//! .register(Step::Poll, TraitPoller)
//! .add_exit_state(Step::Done);
//!
-//! let result = workflow.orchestrate(Step::Poll).await?;
+//! let result = workflow.orchestrate(Step::Poll, CancellationToken::disabled()).await?;
//! assert_eq!(result, Step::Done);
//! # Ok(())
//! # }
@@ -344,6 +344,7 @@ pub type PollTaskObject> =
#[cfg(test)]
mod tests {
use super::*;
+ use crate::cancel::CancellationToken;
use crate::resource::Resources;
use crate::task;
use crate::task::Task;
@@ -590,7 +591,10 @@ mod tests {
// But wait: poll 1 => count becomes 1, 1 < 2 => Pending; poll 2 => count becomes 2, 2 >= 2 => Ready(Done)
// But we registered Step::Done as exit state so Done is the final state
// Actually CountingPoller returns Single(Step::Done) when ready, so we skip Next entirely
- let result = workflow.orchestrate(Step::Wait).await.unwrap();
+ let result = workflow
+ .orchestrate(Step::Wait, CancellationToken::disabled())
+ .await
+ .unwrap();
assert_eq!(result, Step::Done);
}
@@ -754,7 +758,10 @@ mod tests {
.add_exit_state(Step::Done);
let start = std::time::Instant::now();
- let err = workflow.orchestrate(Step::Wait).await.unwrap_err();
+ let err = workflow
+ .orchestrate(Step::Wait, CancellationToken::disabled())
+ .await
+ .unwrap_err();
let elapsed = start.elapsed();
// The FSM wraps the failure with state context; `.inner()` peels one layer.
diff --git a/cano/src/task/router.rs b/cano/src/task/router.rs
index 0f6819b..3cd590b 100644
--- a/cano/src/task/router.rs
+++ b/cano/src/task/router.rs
@@ -53,7 +53,7 @@
//! .register(Step::PathA, DoPathA)
//! .add_exit_state(Step::Done);
//!
-//! let result = workflow.orchestrate(Step::Route).await?;
+//! let result = workflow.orchestrate(Step::Route, CancellationToken::disabled()).await?;
//! assert_eq!(result, Step::Done);
//! # Ok(())
//! # }
@@ -84,7 +84,7 @@
//! .register(Step::Route, SimpleRouter)
//! .add_exit_state(Step::Done);
//!
-//! let result = workflow.orchestrate(Step::Route).await?;
+//! let result = workflow.orchestrate(Step::Route, CancellationToken::disabled()).await?;
//! assert_eq!(result, Step::Done);
//! # Ok(())
//! # }
@@ -209,6 +209,7 @@ pub type RouterTaskObject> =
#[cfg(test)]
mod tests {
use super::*;
+ use crate::cancel::CancellationToken;
use crate::resource::Resources;
use crate::task;
use crate::task::Task;
@@ -357,7 +358,10 @@ mod tests {
.register(Step::PathA, PathATask)
.add_exit_state(Step::Done);
- let result = workflow.orchestrate(Step::Decide).await.unwrap();
+ let result = workflow
+ .orchestrate(Step::Decide, CancellationToken::disabled())
+ .await
+ .unwrap();
assert_eq!(result, Step::Done);
}
diff --git a/cano/src/task/stepped.rs b/cano/src/task/stepped.rs
index 639c525..9f9f751 100644
--- a/cano/src/task/stepped.rs
+++ b/cano/src/task/stepped.rs
@@ -56,7 +56,7 @@
//! .register(MyState::Process, scanner)
//! .add_exit_state(MyState::Done);
//!
-//! let result = workflow.orchestrate(MyState::Process).await?;
+//! let result = workflow.orchestrate(MyState::Process, CancellationToken::disabled()).await?;
//! assert_eq!(result, MyState::Done);
//! # Ok(())
//! # }
@@ -98,7 +98,7 @@
//! .register(MyState::Process, TraitStepper)
//! .add_exit_state(MyState::Done);
//!
-//! let result = workflow.orchestrate(MyState::Process).await?;
+//! let result = workflow.orchestrate(MyState::Process, CancellationToken::disabled()).await?;
//! assert_eq!(result, MyState::Done);
//! # Ok(())
//! # }
@@ -397,6 +397,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
+ use crate::cancel::CancellationToken;
use crate::resource::Resources;
use crate::task;
use crate::task::Task;
@@ -729,7 +730,10 @@ mod tests {
.register(MyState::Next, NextTask)
.add_exit_state(MyState::Done);
- let result = workflow.orchestrate(MyState::Work).await.unwrap();
+ let result = workflow
+ .orchestrate(MyState::Work, CancellationToken::disabled())
+ .await
+ .unwrap();
assert_eq!(result, MyState::Done);
}
@@ -787,6 +791,7 @@ mod tests {
#[cfg(all(test, feature = "metrics"))]
mod metrics_tests {
use super::*;
+ use crate::cancel::CancellationToken;
use crate::metrics::test_support::*;
use crate::task::Task;
use crate::workflow::Workflow;
@@ -849,7 +854,9 @@ mod metrics_tests {
let workflow = Workflow::bare()
.register_stepped(St::Work, TwoMoreOneDone)
.add_exit_state(St::Done);
- workflow.orchestrate(St::Work).await
+ workflow
+ .orchestrate(St::Work, CancellationToken::disabled())
+ .await
});
assert!(result.is_ok(), "workflow should succeed: {result:?}");
assert_eq!(
diff --git a/cano/src/task/timer.rs b/cano/src/task/timer.rs
index 9e4cfbc..bd3409d 100644
--- a/cano/src/task/timer.rs
+++ b/cano/src/task/timer.rs
@@ -63,7 +63,7 @@
//! .register(Step::Wait, CoolDown)
//! .add_exit_state(Step::Done);
//!
-//! let result = workflow.orchestrate(Step::Wait).await?;
+//! let result = workflow.orchestrate(Step::Wait, CancellationToken::disabled()).await?;
//! assert_eq!(result, Step::Done);
//! # Ok(())
//! # }
diff --git a/cano/src/testing.rs b/cano/src/testing.rs
index 8f1c079..954b82a 100644
--- a/cano/src/testing.rs
+++ b/cano/src/testing.rs
@@ -49,7 +49,7 @@
//! .register(S::Start, OkTask)
//! .add_exit_state(S::Done)
//! .with_observer(observer.clone());
-//! assert_eq!(wf.orchestrate(S::Start).await.unwrap(), S::Done);
+//! assert_eq!(wf.orchestrate(S::Start, CancellationToken::disabled()).await.unwrap(), S::Done);
//! observer.assert_path(&["Start", "Done"]);
//! # }
//! ```
@@ -120,6 +120,11 @@ pub enum RecordedEvent {
/// The sequence of the last persisted row.
sequence: u64,
},
+ /// A run was cancelled via a [`CancellationToken`](crate::cancel::CancellationToken).
+ Cancelled {
+ /// The `Debug` rendering of the state cancellation was observed at.
+ state: String,
+ },
}
/// A [`WorkflowObserver`] that records every event it
@@ -291,6 +296,11 @@ impl WorkflowObserver for RecordingObserver {
sequence,
});
}
+ fn on_cancelled(&self, state: &str) {
+ self.events.lock().push(RecordedEvent::Cancelled {
+ state: state.into(),
+ });
+ }
}
/// A process-local [`CheckpointStore`] for resume /
@@ -549,7 +559,12 @@ mod tests {
.register(S::Start, OkTask)
.add_exit_state(S::Done)
.with_observer(observer.clone());
- assert_eq!(wf.orchestrate(S::Start).await.unwrap(), S::Done);
+ assert_eq!(
+ wf.orchestrate(S::Start, CancellationToken::disabled())
+ .await
+ .unwrap(),
+ S::Done
+ );
observer.assert_path(&["Start", "Done"]);
observer.assert_completed_with("Done");
assert!(observer.events().contains(&RecordedEvent::TaskSucceeded {
@@ -564,7 +579,9 @@ mod tests {
.register(S::Start, OkTask)
.add_exit_state(S::Done)
.with_observer(observer.clone());
- wf.orchestrate(S::Start).await.unwrap();
+ wf.orchestrate(S::Start, CancellationToken::disabled())
+ .await
+ .unwrap();
assert!(!observer.events().is_empty());
observer.clear();
assert!(observer.events().is_empty());
@@ -687,7 +704,10 @@ mod tests {
.register(S::Start, task)
.add_exit_state(S::Done)
.with_observer(observer.clone());
- let err = wf.orchestrate(S::Start).await.unwrap_err();
+ let err = wf
+ .orchestrate(S::Start, CancellationToken::disabled())
+ .await
+ .unwrap_err();
assert!(err.to_string().contains("panic"), "{err}");
let retries = observer
.events()
@@ -710,7 +730,12 @@ mod tests {
let wf = Workflow::bare()
.register(S::Start, panic_on_attempt(0, S::Done))
.add_exit_state(S::Done);
- assert_eq!(wf.orchestrate(S::Start).await.unwrap(), S::Done);
+ assert_eq!(
+ wf.orchestrate(S::Start, CancellationToken::disabled())
+ .await
+ .unwrap(),
+ S::Done
+ );
}
#[test]
@@ -734,7 +759,9 @@ mod tests {
.register(S::B, Go(S::Done))
.add_exit_state(S::Done)
.with_observer(observer.clone());
- wf.orchestrate(S::A).await.unwrap();
+ wf.orchestrate(S::A, CancellationToken::disabled())
+ .await
+ .unwrap();
observer
.assert_all_states_entered(&[S::A, S::B, S::Done])
.expect("all states visited");
@@ -747,7 +774,9 @@ mod tests {
.register(S::A, Go(S::Done))
.add_exit_state(S::Done)
.with_observer(observer.clone());
- wf.orchestrate(S::A).await.unwrap();
+ wf.orchestrate(S::A, CancellationToken::disabled())
+ .await
+ .unwrap();
let missing = observer
.assert_all_states_entered(&[S::A, S::B, S::C, S::Done])
.unwrap_err();
@@ -761,7 +790,9 @@ mod tests {
.register(S::A, Go(S::Done))
.add_exit_state(S::Done)
.with_observer(observer.clone());
- wf.orchestrate(S::A).await.unwrap();
+ wf.orchestrate(S::A, CancellationToken::disabled())
+ .await
+ .unwrap();
let missing = observer
.assert_all_states_entered(&[S::A, S::A, S::B])
.unwrap_err();
@@ -776,7 +807,9 @@ mod tests {
.register(S::B, Go(S::Done))
.add_exit_state(S::Done)
.with_observer(observer.clone());
- wf.orchestrate(S::A).await.unwrap();
+ wf.orchestrate(S::A, CancellationToken::disabled())
+ .await
+ .unwrap();
observer
.assert_registered_states_entered(&wf)
.expect("all registered states visited");
@@ -790,7 +823,9 @@ mod tests {
.register(S::C, Go(S::Done)) // never routed to
.add_exit_state(S::Done)
.with_observer(observer.clone());
- wf.orchestrate(S::A).await.unwrap();
+ wf.orchestrate(S::A, CancellationToken::disabled())
+ .await
+ .unwrap();
let missing = observer.assert_registered_states_entered(&wf).unwrap_err();
assert!(missing.contains(&"C".to_string()), "missing={missing:?}");
}
diff --git a/cano/src/workflow.rs b/cano/src/workflow.rs
index b448392..277f71f 100644
--- a/cano/src/workflow.rs
+++ b/cano/src/workflow.rs
@@ -77,6 +77,7 @@ use std::hash::Hash;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
+use crate::cancel::CancellationToken;
use crate::error::CanoError;
use crate::observer::WorkflowObserver;
use crate::recovery::CheckpointStore;
@@ -213,8 +214,6 @@ where
states: HashMap>>,
/// Shared resources for all tasks
pub(crate) resources: Arc>,
- /// Global workflow timeout
- workflow_timeout: Option,
/// Total wall-clock budget for the entire `orchestrate` / `resume_from` call.
/// When set, the FSM aborts the in-flight task at its next await point as soon
/// as the budget elapses and drains the compensation stack against
@@ -268,7 +267,6 @@ where
Self {
states: HashMap::new(),
resources: Arc::new(resources),
- workflow_timeout: None,
total_timeout: None,
compensation_timeout: None,
exit_states: Vec::new(),
@@ -283,27 +281,6 @@ where
}
}
- /// Set a blunt wall-clock timeout for the entire `orchestrate` /
- /// `resume_from` call.
- ///
- /// Implemented as a single `tokio::time::timeout` around the workflow
- /// future. The in-flight task is dropped at its next await point and the
- /// call returns `CanoError::Workflow("Workflow timeout exceeded")` —
- /// compensation does **not** run.
- ///
- /// When [`with_total_timeout`](Self::with_total_timeout) is also set, the
- /// engine treats this value as a *floor* on the total budget: the
- /// effective wall-clock cap is `min(with_timeout, with_total_timeout)`
- /// and the graceful total-timeout path drives it (compensation runs
- /// under [`with_compensation_timeout`](Self::with_compensation_timeout),
- /// `on_workflow_timeout` fires). This preserves the "with_timeout is a
- /// hard upper bound" intent while avoiding a race between two outer
- /// timeouts that would drop the inner compensation drain mid-flight.
- pub fn with_timeout(mut self, timeout: Duration) -> Self {
- self.workflow_timeout = Some(timeout);
- self
- }
-
/// Set a wall-clock budget for the entire `orchestrate` (or `resume_from`) call.
///
/// When the budget elapses, the in-flight task is aborted at its next await
@@ -343,7 +320,7 @@ where
/// .add_exit_state(Step::Done);
///
/// let err = workflow
- /// .orchestrate(Step::Start)
+ /// .orchestrate(Step::Start, CancellationToken::disabled())
/// .await
/// .expect_err("budget elapses before Done");
/// // The engine wraps task errors with state context; `.inner()` peels one layer.
@@ -638,7 +615,7 @@ where
/// .register(Step::Start, NoopTask)
/// .add_exit_state(Step::Done)
/// .with_observer(counter.clone());
- /// workflow.orchestrate(Step::Start).await?;
+ /// workflow.orchestrate(Step::Start, CancellationToken::disabled()).await?;
/// assert_eq!(counter.0.load(Ordering::Relaxed), 1);
/// # Ok(())
/// # }
@@ -901,13 +878,35 @@ where
///
/// Runs lifecycle setup before execution and teardown after, regardless of outcome.
///
+ /// `token` controls cooperative cancellation. Drive the run with a [`CancellationToken`]
+ /// obtained from [`CancellationToken::new`](crate::cancel::CancellationToken::new) and keep the
+ /// paired [`CancellationHandle`](crate::cancel::CancellationHandle); when the handle's
+ /// [`cancel`](crate::cancel::CancellationHandle::cancel) fires, the in-flight cancellable task
+ /// is dropped at its next await point, the saga compensation stack is drained, and the call
+ /// returns [`CanoError::Cancelled`] (wrapped in [`CanoError::WithStateContext`]; a dirty
+ /// rollback yields [`CanoError::CompensationFailed`] whose `errors[0]` is the wrapped cancel).
+ /// To opt a run out of cancellation, pass [`CancellationToken::disabled`] — it never fires and
+ /// is zero-cost (the FSM skips the cancellation `select!` entirely).
+ ///
+ /// Cancellation is cooperative and saga-safe: a task is only interrupted at an `.await`, and a
+ /// [`CompensatableTask`](crate::saga::CompensatableTask) is never interrupted mid-run (it
+ /// completes so its rollback entry is recorded, and the cancel is honoured at the next state
+ /// boundary). The compensation drain itself is uncancellable. See the
+ /// [`cancel`](crate::cancel) module for the full semantics and precedence rules against
+ /// [`with_total_timeout`](Self::with_total_timeout).
+ ///
/// # Errors
///
/// - [`CanoError::Workflow`] -- no handler is registered for the current state, a single
/// task returned a `TaskResult::Split` (use [`Workflow::register_split`] instead), the
/// global workflow timeout was exceeded, or a split strategy was misconfigured
+ /// - [`CanoError::Cancelled`] -- the run was cancelled via `token` (see above)
/// - Any [`CanoError`] variant propagated from a task during execution
- pub async fn orchestrate(&self, initial_state: TState) -> Result {
+ pub async fn orchestrate(
+ &self,
+ initial_state: TState,
+ token: CancellationToken,
+ ) -> Result {
#[cfg(feature = "tracing")]
let workflow_span = self.tracing_span.clone().unwrap_or_else(|| {
if tracing::enabled!(tracing::Level::INFO) {
@@ -937,79 +936,48 @@ where
self.validate_initial_state(&initial_state)?;
self.resources.setup_all().await?;
- let result = self.run_workflow(initial_state).await;
+ let result = self.run_workflow(initial_state, token).await;
self.resources
.teardown_range(0..self.resources.lifecycle_len())
.await;
result
}
- async fn run_workflow(&self, initial_state: TState) -> Result {
+ async fn run_workflow(
+ &self,
+ initial_state: TState,
+ token: CancellationToken,
+ ) -> Result {
#[cfg(feature = "metrics")]
let _active = crate::metrics::WorkflowActiveGuard::new();
let started = std::time::Instant::now();
let total_budget = self.resolve_total_budget(started);
- let workflow_future = self.execute_workflow(initial_state, total_budget);
- self.await_with_outer_timeout(workflow_future, total_budget, started)
- .await
+ let result = self
+ .execute_workflow(initial_state, total_budget, token)
+ .await;
+ Self::record_run_outcome(&result, started);
+ result
}
- /// Resolve the effective wall-clock budget for the entire FSM call.
- ///
- /// Precedence:
- /// 1. Both `with_timeout` and `with_total_timeout` set → graceful
- /// total-timeout path with `min(...)` as the budget. Treating
- /// `with_timeout` as a floor preserves the user's intent that it is
- /// a hard upper bound, while avoiding a race between two outer
- /// timeouts that would drop the compensation drain mid-flight.
- /// 2. Only total set → graceful total-timeout path.
- /// 3. Only `with_timeout` set → legacy blunt `tokio::time::timeout`
- /// wrapper applied externally; the FSM loop runs unbudgeted.
- /// 4. Neither → zero-cost path.
+ /// Resolve the wall-clock budget for the entire FSM call: the
+ /// [`with_total_timeout`](Self::with_total_timeout) duration, or `None`
+ /// (the zero-cost path) when unset.
pub(crate) fn resolve_total_budget(
&self,
started: std::time::Instant,
) -> Option<(std::time::Instant, Duration)> {
- let effective = match (self.workflow_timeout, self.total_timeout) {
- (Some(w), Some(t)) => Some(w.min(t)),
- (_, Some(t)) => Some(t),
- _ => None,
- };
- effective.map(|d| (started, d))
- }
-
- /// Apply the legacy `with_timeout` outer wrapper when (and only when) the
- /// graceful total-timeout path is NOT also active. Emits the workflow-run
- /// outcome metric exactly once per invocation — on the legacy-timeout
- /// path the early return ensures `outcome="timeout"` is recorded
- /// *without* a follow-up `outcome="failed"` for the same run; on the
- /// non-timeout paths the post-match emission records `completed`/`failed`.
- ///
- /// Used by both `run_workflow` (forward direction) and
- /// `execute_resume_inner` (resume direction) so the precedence rule
- /// lives in one place.
- pub(crate) async fn await_with_outer_timeout(
- &self,
- fut: F,
- total_budget: Option<(std::time::Instant, Duration)>,
- #[allow(unused_variables)] started: std::time::Instant,
- ) -> Result
- where
- F: std::future::Future
+#Cancelling a Run
+
+orchestrate always takes a CancellationToken as its second argument. The
+example above passes CancellationToken::disabled() — a token that never fires, opting
+the run out of cancellation at zero cost. To stop a run early on a signal you control — a
+shutdown handler, a user "stop" button, a parent task giving up — pass a live token from
+CancellationToken::new() instead and keep its paired CancellationHandle.
+Firing the handle aborts the in-flight task at its next .await, drains the
+saga compensation stack, and returns CanoError::Cancelled.
+
+
+```rust
+use cano::prelude::*;
+
+let (handle, token) = CancellationToken::new();
+
+// Cancel from anywhere — the handle is Clone and cancel() is idempotent:
+tokio::spawn(async move {
+ shutdown_signal().await;
+ handle.cancel();
+});
+
+let result = workflow.orchestrate(OrderState::Start, token).await;
+assert!(matches!(result, Err(e) if e.category() == "cancelled"));
+```
+
+
+Cancellation is cooperative (a task is interrupted only at an .await) and
+saga-safe (a compensatable task is never interrupted mid-run). The
+Resilience → Cooperative Cancellation page covers the full
+semantics, the on_cancelled observer hook, and precedence against
+with_total_timeout.
+
+
+
#Builder Pattern and #[must_use]
Workflow uses a builder pattern where the register* methods and
@@ -319,21 +355,21 @@ fn build_workflow(store: MemoryStore) -> Workflow {
.register(TextPipelineState::Parse, ParseTask)
.register(TextPipelineState::Transform, TransformTask)
.add_exit_state(TextPipelineState::Done)
- .with_timeout(Duration::from_secs(5))
+ .with_total_timeout(Duration::from_secs(5))
}
// Inside an HTTP handler:
let store = MemoryStore::new(); // fresh store — full isolation
store.put("input_text", text)?;
let workflow = build_workflow(store.clone());
-let final_state = workflow.orchestrate(TextPipelineState::Parse).await?; // which terminal branch ran
+let final_state = workflow.orchestrate(TextPipelineState::Parse, CancellationToken::disabled()).await?; // which terminal branch ran
let word_count: usize = store.get("word_count")?;
```
Tip
-Use .with_timeout() on the workflow to keep a hung request from blocking indefinitely. For
+Use .with_total_timeout() on the workflow to keep a hung request from blocking indefinitely. For
read-heavy workloads with shared reference data, pre-populate one store, share it via Arc,
and use per-request keys to avoid collisions. The full Axum version is in
cargo run --example workflow_on_request.
diff --git a/docs/content/workflows/validation-and-errors.md b/docs/content/workflows/validation-and-errors.md
index 0c3dabd..d6e956d 100644
--- a/docs/content/workflows/validation-and-errors.md
+++ b/docs/content/workflows/validation-and-errors.md
@@ -90,7 +90,7 @@ async fn main() -> Result<(), CanoError> {
workflow.validate_initial_state(&State::Start)?;
// Safe to orchestrate
- let _result = workflow.orchestrate(State::Start).await?;
+ let _result = workflow.orchestrate(State::Start, CancellationToken::disabled()).await?;
Ok(())
}
```
@@ -133,9 +133,9 @@ during execution. Understanding these errors helps you build robust error recove
Increase with_total_timeout() or speed up the workflow; see Resilience → Workflow Total Timeout |
-CanoError::Workflow |
-Legacy with_timeout() outer tokio::time::timeout elapsed (no graceful compensation) |
-Prefer with_total_timeout() for new code; otherwise increase with_timeout() or optimize task execution time |
+CanoError::Cancelled |
+Run cancelled via a live CancellationToken passed to orchestrate / resume_from; in-flight task aborted, compensation stack drained. Surfaced under CanoError::WithStateContext (or CompensationFailed on a dirty rollback). |
+Expected when you cancel deliberately; see Resilience → Cooperative Cancellation |
CanoError::Configuration |
@@ -181,7 +181,7 @@ through the join strategy.
```rust
-match workflow.orchestrate(State::Start).await {
+match workflow.orchestrate(State::Start, CancellationToken::disabled()).await {
Ok(final_state) => println!("Completed: {:?}", final_state),
Err(CanoError::Workflow(msg)) => eprintln!("Workflow error: {}", msg),
Err(CanoError::Configuration(msg)) => eprintln!("Config error: {}", msg),