Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ The engine is built on three core concepts: **Tasks** for logic, **Workflows** f
## Features

- **Type-Safe State Machines**: Enum-driven transitions with compile-time guarantees.
- **Multiple Processing Models**: `Task` for general-purpose work, plus `RouterTask`, `PollTask`, `TimerTask`, `BatchTask`, and `SteppedTask` for specialized shapes — mixed freely in one workflow.
- **Multiple Processing Models**: `Task` for general-purpose work, plus `RouterTask`, `PollTask`, `TimerTask`, `BatchTask`, `SteppedTask`, and `StreamTask` for specialized shapes — mixed freely in one workflow.
- **Resource Dependency Injection**: Typed, lifecycle-managed `Resources` dictionary with `setup`/`teardown`/`health` hooks, looked up by key and type, plus `#[derive(FromResources)]` for ergonomic wiring.
- **Parallel Execution (Split/Join)**: Run tasks concurrently and join results with strategies like `All`, `Any`, `Quorum`, or `PartialResults`, with an optional bulkhead to cap concurrency.
- **Robust Retry Logic**: Configurable strategies including exponential backoff with jitter and per-attempt timeouts.
Expand Down
1 change: 1 addition & 0 deletions cano-macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ syn = { version = "2", features = ["full", "visit-mut"] }

[dev-dependencies]
cano = { path = "../cano" }
futures-util = "0.3"
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "time", "test-util"] }
30 changes: 30 additions & 0 deletions cano-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! - `#[cano::task::timer]` — for `impl TimerTask` and the `TimerTask` trait
//! - `#[cano::task::batch]` — for `impl BatchTask` and the `BatchTask` trait
//! - `#[cano::task::stepped]` — for `impl SteppedTask` and the `SteppedTask` trait
//! - `#[cano::task::stream]` — for `impl StreamTask` and the `StreamTask` trait
//! - `#[cano::saga::task]` — for `impl CompensatableTask`
//! - `#[cano::resource]` — for `impl Resource` and the `Resource` trait
//! - `#[cano::checkpoint_store]` — for `impl CheckpointStore` and the `CheckpointStore` trait
Expand All @@ -38,6 +39,7 @@ mod poll_task_impl;
mod resource_derive;
mod router_task_impl;
mod stepped_task_impl;
mod stream_task_impl;
mod task_impl;
mod timer_task_impl;

Expand Down Expand Up @@ -354,6 +356,34 @@ pub fn stepped_task(attr: TokenStream, item: TokenStream) -> TokenStream {
.into()
}

/// Apply to the `StreamTask` trait definition, an `impl StreamTask<S [, K]> for T`
/// block, or an inherent `impl T { ... }` block.
///
/// Use as `#[cano::task::stream]`. `StreamTask` is a genuine stream-processing model:
/// consume an `impl Stream` continuously, flush per [`WindowPolicy`] window, run until
/// the workflow's `CancellationToken` fires, and persist a resumable cursor (via
/// [`Workflow::register_stream`]). Per-item errors are governed by [`StreamErrorPolicy`].
///
/// Two surface forms on impl blocks:
///
/// 1. **Trait-impl form:** `#[task::stream] impl StreamTask<S> for T { type Item = ..; .. }`.
/// 2. **Inherent-impl form:** `#[task::stream(state = S [, key = K])] impl T { async fn open(..) .. }` —
/// the macro infers `type Item` from `process_item`'s owned `item` parameter and
/// `type Output` / `type Cursor` from the `Ok` 2-tuple of `process_item`'s return,
/// requires `open` / `process_item` / `flush_window` / `on_close`, and emits a
/// companion `impl Task<S [, K]> for T` whose `run` forwards to `StreamTask::run_in_memory`.
///
/// On a trait definition the macro just performs the async-fn-in-trait rewrite.
///
/// The default `config()` injected by the inherent form is [`TaskConfig::minimal()`]
/// (no outer retry — like `PollTask`; an outer retry would re-invoke `open()`).
#[proc_macro_attribute]
pub fn stream_task(attr: TokenStream, item: TokenStream) -> TokenStream {
stream_task_impl::expand(attr.into(), item.into())
.unwrap_or_else(syn::Error::into_compile_error)
.into()
}

/// Derive an empty `cano::Resource` impl (uses the trait's default no-op
/// `setup` / `teardown`).
///
Expand Down
Loading
Loading