Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async fn main() -> Result<(), CanoError> {
.add_exit_state(FlowState::Complete);

// 5. Run.
let result = workflow.orchestrate(FlowState::Start).await?;
let result = workflow.orchestrate(FlowState::Start, CancellationToken::disabled()).await?;
println!("Workflow finished: {:?}", result);

Ok(())
Expand Down
13 changes: 11 additions & 2 deletions cano-e2e/src/bin/cano_workflow_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use std::str::FromStr;
use std::sync::Arc;

use cano::CancellationToken;
use cano_e2e::{Faults, Phase, PostgresCheckpointStore, StdoutTracer, build_workflow};

#[tokio::main]
Expand Down Expand Up @@ -62,8 +63,16 @@ async fn main() -> anyhow::Result<()> {
emit(&format!("READY {workflow_id} {mode}"));

let result = match mode.as_str() {
"resume" => workflow.resume_from(workflow_id.clone()).await,
"run" => workflow.orchestrate(Phase::Reserve).await,
"resume" => {
workflow
.resume_from(workflow_id.clone(), CancellationToken::disabled())
.await
}
"run" => {
workflow
.orchestrate(Phase::Reserve, CancellationToken::disabled())
.await
}
other => anyhow::bail!("unknown mode {other:?}"),
};
match result {
Expand Down
20 changes: 16 additions & 4 deletions cano-macros/tests/batch_task_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ async fn inherent_inferred_integrates_with_workflow() {
.register(Step::Process, InherentInferred)
.add_exit_state(Step::Done);

let result = workflow.orchestrate(Step::Process).await.unwrap();
let result = workflow
.orchestrate(Step::Process, CancellationToken::disabled())
.await
.unwrap();
assert_eq!(result, Step::Done);
}

Expand Down Expand Up @@ -197,7 +200,10 @@ async fn inherent_with_key_integrates_with_workflow() {
.register(Step::Process, InherentWithKey)
.add_exit_state(Step::Done);

let result = workflow.orchestrate(Step::Process).await.unwrap();
let result = workflow
.orchestrate(Step::Process, CancellationToken::disabled())
.await
.unwrap();
assert_eq!(result, Step::Done);
}

Expand Down Expand Up @@ -394,7 +400,10 @@ async fn trait_form_integrates_with_workflow() {
.register(Step::Process, TraitBatch)
.add_exit_state(Step::Done);

let result = workflow.orchestrate(Step::Process).await.unwrap();
let result = workflow
.orchestrate(Step::Process, CancellationToken::disabled())
.await
.unwrap();
assert_eq!(result, Step::Done);
}

Expand Down Expand Up @@ -534,7 +543,10 @@ async fn end_to_end_workflow_load_process_finish() {
.register(Step::Process, LoadStep)
.add_exit_state(Step::Done);

let result = workflow.orchestrate(Step::Process).await.unwrap();
let result = workflow
.orchestrate(Step::Process, CancellationToken::disabled())
.await
.unwrap();
assert_eq!(result, Step::Done);

let output: Vec<u32> = store.get("output").unwrap();
Expand Down
10 changes: 8 additions & 2 deletions cano-macros/tests/compensatable_task_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ async fn inherent_compensatable_impl_registers_and_compensates() {
.register(Step::Boom, Boom)
.add_exit_state(Step::Done);

let err = workflow.orchestrate(Step::Reserve).await.unwrap_err();
let err = workflow
.orchestrate(Step::Reserve, CancellationToken::disabled())
.await
.unwrap_err();
assert_eq!(err.message(), "boom"); // clean rollback -> the original failure is surfaced
assert!(
compensated.load(Ordering::SeqCst),
Expand All @@ -85,7 +88,10 @@ async fn inherent_compensatable_impl_registers_and_compensates() {
.register_with_compensation(Step::Reserve, ReserveNamed)
.add_exit_state(Step::Done);
assert_eq!(
workflow.orchestrate(Step::Reserve).await.unwrap(),
workflow
.orchestrate(Step::Reserve, CancellationToken::disabled())
.await
.unwrap(),
Step::Done
);
}
10 changes: 8 additions & 2 deletions cano-macros/tests/poll_task_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,10 @@ async fn inherent_poller_integrates_with_workflow() {
.register(Step::Poll, InherentPoller)
.add_exit_state(Step::Done);

let result = workflow.orchestrate(Step::Poll).await.unwrap();
let result = workflow
.orchestrate(Step::Poll, CancellationToken::disabled())
.await
.unwrap();
assert_eq!(result, Step::Done);
}

Expand All @@ -229,7 +232,10 @@ async fn trait_poller_integrates_with_workflow() {
.register(Step::Poll, TraitPoller)
.add_exit_state(Step::Done);

let result = workflow.orchestrate(Step::Poll).await.unwrap();
let result = workflow
.orchestrate(Step::Poll, CancellationToken::disabled())
.await
.unwrap();
assert_eq!(result, Step::Done);
}

Expand Down
10 changes: 8 additions & 2 deletions cano-macros/tests/router_task_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ async fn inherent_router_integrates_with_workflow() {
.register(Step::PathA, PathATask)
.add_exit_state(Step::Done);

let result = workflow.orchestrate(Step::Route).await.unwrap();
let result = workflow
.orchestrate(Step::Route, CancellationToken::disabled())
.await
.unwrap();
assert_eq!(result, Step::Done);
}

Expand All @@ -203,6 +206,9 @@ async fn trait_router_integrates_with_workflow() {
.register(Step::PathA, PathATask)
.add_exit_state(Step::Done);

let result = workflow.orchestrate(Step::Route).await.unwrap();
let result = workflow
.orchestrate(Step::Route, CancellationToken::disabled())
.await
.unwrap();
assert_eq!(result, Step::Done);
}
5 changes: 4 additions & 1 deletion cano-macros/tests/stepped_task_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,10 @@ async fn stepped_task_in_workflow() {
.register(MyState::Work, stepper)
.add_exit_state(MyState::Done);

let result = workflow.orchestrate(MyState::Work).await.unwrap();
let result = workflow
.orchestrate(MyState::Work, CancellationToken::disabled())
.await
.unwrap();
assert_eq!(result, MyState::Done);
}

Expand Down
8 changes: 7 additions & 1 deletion cano-macros/tests/task_impl_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,11 @@ async fn task_still_runs_in_a_workflow() {
let workflow = Workflow::bare()
.register(Step::Start, NamedInherentTask)
.add_exit_state(Step::Done);
assert_eq!(workflow.orchestrate(Step::Start).await.unwrap(), Step::Done);
assert_eq!(
workflow
.orchestrate(Step::Start, CancellationToken::disabled())
.await
.unwrap(),
Step::Done
);
}
5 changes: 4 additions & 1 deletion cano-macros/tests/timer_task_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ async fn inherent_timer_integrates_with_workflow() {
.register(Step::Wait, TraitTimer)
.add_exit_state(Step::Done);

let result = workflow.orchestrate(Step::Wait).await.unwrap();
let result = workflow
.orchestrate(Step::Wait, CancellationToken::disabled())
.await
.unwrap();
assert_eq!(result, Step::Done);
}

Expand Down
9 changes: 9 additions & 0 deletions cano/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ name = "scheduler_graceful_shutdown"
path = "examples/scheduler_graceful_shutdown.rs"
required-features = ["scheduler"]

[[example]]
name = "scheduler_cancellation"
path = "examples/scheduler_cancellation.rs"
required-features = ["scheduler"]

[[example]]
name = "scheduler_mixed_workflows"
path = "examples/scheduler_mixed_workflows.rs"
Expand Down Expand Up @@ -201,6 +206,10 @@ path = "examples/saga_payment.rs"
name = "workflow_total_timeout"
path = "examples/workflow_total_timeout.rs"

[[example]]
name = "workflow_cancellation"
path = "examples/workflow_cancellation.rs"

[[example]]
name = "router_task"
path = "examples/router_task.rs"
Expand Down
12 changes: 9 additions & 3 deletions cano/benches/workflow_performance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,9 @@ fn bench_orchestrate_overhead(c: &mut Criterion) {
b.to_async(&runtime).iter(|| {
let workflow = Arc::clone(&workflow);
async move {
let _ = workflow.orchestrate(S::Done).await;
let _ = workflow
.orchestrate(S::Done, CancellationToken::disabled())
.await;
}
});
});
Expand Down Expand Up @@ -617,7 +619,9 @@ fn bench_large_split_collect(c: &mut Criterion) {
.add_exit_state(S::Done);

b.to_async(&runtime).iter(|| async {
let _ = workflow.orchestrate(S::Start).await;
let _ = workflow
.orchestrate(S::Start, CancellationToken::disabled())
.await;
});
});
}
Expand Down Expand Up @@ -659,7 +663,9 @@ fn bench_tracing_overhead(c: &mut Criterion) {
b.to_async(&runtime).iter(|| {
let workflow = Arc::clone(&workflow);
async move {
let _ = workflow.orchestrate(S::Done).await;
let _ = workflow
.orchestrate(S::Done, CancellationToken::disabled())
.await;
}
});
});
Expand Down
4 changes: 3 additions & 1 deletion cano/examples/ai_workflow_yes_and.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,9 @@ async fn main() -> Result<(), CanoError> {

println!("Starting improvised story...\n");

let final_state = workflow.orchestrate(ConversationState::Start).await?;
let final_state = workflow
.orchestrate(ConversationState::Start, CancellationToken::disabled())
.await?;

println!("\nStory completed with state: {final_state:?}");

Expand Down
4 changes: 3 additions & 1 deletion cano/examples/batch_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ async fn main() -> CanoResult<()> {
.register(Step::Summarise, Summarise { url_count })
.add_exit_state(Step::Done);

let result = workflow.orchestrate(Step::ParseUrls).await?;
let result = workflow
.orchestrate(Step::ParseUrls, CancellationToken::disabled())
.await?;
assert_eq!(result, Step::Done);
println!("\ncompleted at {result:?}");

Expand Down
9 changes: 7 additions & 2 deletions cano/examples/circuit_breaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ async fn main() -> Result<(), CanoError> {

println!("Phase 1 — dependency unhealthy, threshold = 3.");
for attempt in 1..=5 {
let outcome = workflow.orchestrate(Step::Call).await;
let outcome = workflow
.orchestrate(Step::Call, CancellationToken::disabled())
.await;
// `orchestrate` wraps task failures in `WithStateContext`; unwrap one layer
// before pattern-matching on the underlying variant.
let label = match outcome {
Expand All @@ -125,7 +127,10 @@ async fn main() -> Result<(), CanoError> {

println!("Phase 3 — half-open trial probes the dependency, then closes the breaker.");
for attempt in 1..=3 {
match workflow.orchestrate(Step::Call).await {
match workflow
.orchestrate(Step::Call, CancellationToken::disabled())
.await
{
Ok(_) => println!(
" recovery call {attempt}: ok | state={:?}",
breaker.state()
Expand Down
9 changes: 7 additions & 2 deletions cano/examples/circuit_breaker_manual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ async fn main() -> Result<(), CanoError> {
// ------------------------------------------------------------------
println!("Phase 1: dependency unhealthy (threshold = 3 consecutive failures)");
for attempt in 1..=5 {
let outcome = workflow.orchestrate(Step::Call).await;
let outcome = workflow
.orchestrate(Step::Call, CancellationToken::disabled())
.await;
// `orchestrate` wraps task failures in `WithStateContext`; unwrap one layer
// before pattern-matching on the underlying variant.
let label = match &outcome {
Expand Down Expand Up @@ -185,7 +187,10 @@ async fn main() -> Result<(), CanoError> {
// ------------------------------------------------------------------
println!("\nPhase 3: half-open trial — one probe closes the breaker");
for attempt in 1..=3 {
match workflow.orchestrate(Step::Call).await {
match workflow
.orchestrate(Step::Call, CancellationToken::disabled())
.await
{
Ok(_) => println!(" call {attempt}: ok | breaker={:?}", breaker.state()),
Err(e) => println!(" call {attempt}: err: {e} | breaker={:?}", breaker.state()),
}
Expand Down
9 changes: 7 additions & 2 deletions cano/examples/custom_checkpoint_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// --- First run: Process crashes; the checkpoint log is kept. -------
println!("=== run 1: Process will crash ===");
match workflow.orchestrate(Step::Init).await {
match workflow
.orchestrate(Step::Init, CancellationToken::disabled())
.await
{
Ok(s) => println!(" completed at {s:?} (unexpected)"),
Err(e) => println!(" stopped with error: {e}"),
}
Expand Down Expand Up @@ -201,7 +204,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// --- Second run: resume from last checkpoint (Process, attempt 2). ---
println!("\n=== run 2: resume_from ===");
let final_state = workflow.resume_from(run_id).await?;
let final_state = workflow
.resume_from(run_id, CancellationToken::disabled())
.await?;
println!(" reached {final_state:?}");
assert_eq!(final_state, Step::Done);

Expand Down
4 changes: 3 additions & 1 deletion cano/examples/join_strategies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ async fn run_strategy(label: &str, strategy: JoinStrategy) -> CanoResult<()> {
.add_exit_state(Step::Done);

let start = Instant::now();
let result = workflow.orchestrate(Step::Parallel).await?;
let result = workflow
.orchestrate(Step::Parallel, CancellationToken::disabled())
.await?;
let elapsed = start.elapsed();

// Count how many workers managed to log a result before being cancelled.
Expand Down
2 changes: 1 addition & 1 deletion cano/examples/metrics_demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async fn main() {
// Run the workflow 3 times directly.
for _ in 0..3 {
workflow()
.orchestrate(Step::Fetch)
.orchestrate(Step::Fetch, CancellationToken::disabled())
.await
.expect("workflow run");
}
Expand Down
4 changes: 2 additions & 2 deletions cano/examples/metrics_tracing_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async fn main() {
// Path 1: Cano's own `workflow_orchestrate` span carries `workflow_id`.
workflow()
.with_workflow_id("demo-run-1")
.orchestrate(Step::Fetch)
.orchestrate(Step::Fetch, CancellationToken::disabled())
.await
.expect("workflow run");

Expand All @@ -103,7 +103,7 @@ async fn main() {
let span = info_span!("api_request", request_id = "abc");
let _enter = span.enter();
workflow()
.orchestrate(Step::Fetch)
.orchestrate(Step::Fetch, CancellationToken::disabled())
.await
.expect("workflow run");
}
Expand Down
5 changes: 4 additions & 1 deletion cano/examples/mixed_workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,10 @@ async fn main() -> CanoResult<()> {
.register(WorkflowState::GenerateReport, ReportTask)
.add_exit_states(vec![WorkflowState::Complete]);

match workflow.orchestrate(WorkflowState::GenerateData).await {
match workflow
.orchestrate(WorkflowState::GenerateData, CancellationToken::disabled())
.await
{
Ok(_final_state) => {
println!("\nWorkflow completed successfully!");

Expand Down
Loading
Loading