Skip to content

Flow Model

github-actions[bot] edited this page Jun 23, 2026 · 1 revision

Flow Model

@Flow.model turns a plain Python function into a real CallableModel.

The design is intentionally narrow:

  • ordinary unmarked parameters are regular bound inputs,
  • FromContext[T] marks the only runtime/contextual inputs,
  • @Flow.context_transform defines reusable contextual rewrites,
  • .flow.compute(...) is the execution entry point for the full DAG,
  • .flow.with_context(*patches, **field_overrides) rewires contextual inputs on one dependency edge,
  • upstream CallableModels can still be passed as ordinary inputs.

The goal is that a reader can look at one function signature and immediately see:

  1. which values come from runtime context,
  2. which values must be bound as regular configuration or dependencies,
  3. how to rewrite contextual inputs for one branch of the graph.

Generated models still plug into the existing evaluator, registry, cache, Hydra, and serialization machinery — @Flow.model does not create a new execution engine.

Core Example

from ccflow import Flow, FromContext


@Flow.model
def foo(a: int, b: FromContext[int]) -> int:
    return a + b


# Build an instance with a=11 bound, then supply b=12 at runtime:
configured = foo(a=11)
result = configured.flow.compute(b=12)
assert result.value == 23  # .value unwraps the GenericResult wrapper

# Or create a different instance that stores b=12 as its contextual default:
prefilled = foo(a=11, b=12)
result = prefilled.flow.compute()
assert result.value == 23

Note: When the function returns a plain value (like int above) instead of a ResultBase subclass, @Flow.model automatically wraps it in GenericResult. Access the inner value with .value.

This is the core contract:

  • a is a regular parameter — it must be bound at construction time,
  • b is contextual because it is marked with FromContext[int] — it can come from runtime context, a contextual default stored on the model instance, or a function default,
  • .flow.compute(...) may carry extra ambient context for upstream graph branches, but it never binds regular parameters.

Nothing is being mutated at execution time in the second example. prefilled = foo(a=11, b=12) constructs a different model instance whose contextual default for b is already 12. Because b is still contextual, incoming runtime context can still override that default.

This means the following is invalid:

foo().flow.compute(a=11, b=12)
# TypeError: compute() cannot satisfy unbound regular parameter(s): a.
# Bind them at construction time; compute() only supplies runtime context.

a is not contextual, so it must be bound at construction time (foo(a=11)). By contrast, extra ambient fields that are only needed by upstream with_context(...) rewrites are allowed in compute(**kwargs) because @Flow.model generated models use FlowContext (an open bag) as their runtime context type.

Regular Parameters vs Contextual Parameters

Regular Parameters

Regular parameters are the unmarked ones.

They can be satisfied by:

  • a literal value,
  • a default value from the function signature,
  • an upstream CallableModel.

When an upstream model is supplied, @Flow.model evaluates it with the current context and passes the resolved value into the function. This is how you wire stages together — just pass one model as an argument to another:

from ccflow import Flow, FlowContext, FromContext


@Flow.model
def load_value(value: FromContext[int], offset: int) -> int:
    return value + offset


@Flow.model
def add(a: int, b: FromContext[int]) -> int:
    return a + b


# Wire load_value into add's 'a' parameter:
model = add(a=load_value(offset=5))

# At runtime, load_value runs first (value=7 + offset=5 = 12),
# then add runs (a=12 + b=12 = 24):
assert model.flow.compute(value=7, b=12).value == 24

Only direct regular-parameter values are treated as upstream dependencies in this first version. Containers such as list, tuple, and dict are ordinary literal values; @Flow.model does not scan them for nested models.

Contextual Parameters

Contextual parameters are the ones marked with FromContext[...].

They can be satisfied by:

  • runtime context,
  • construction-time keyword inputs, stored as contextual defaults on the model instance,
  • keyword callable literals for FromContext[Callable[..., T]] fields,
  • function defaults.

Construction-time contextual defaults cannot be CallableModel values, because that would be ambiguous with dependency binding. If the contextual field itself is typed to accept a CallableModel, pass the model as runtime context or via .flow.with_context(...) as ordinary data. A raw callable passed positionally is always treated as a regular argument candidate, not as a contextual default. In other words, FromContext[Callable[..., T]] allows a callable literal only when it is provided by keyword for that contextual field.

A construction-time value for a contextual parameter is still a default, not a conversion into a regular bound parameter.

Generated models reserve a few framework attribute names that the runtime reads directly: meta, context_type, result_type, and type_. Do not use these as @Flow.model function parameter names.

flow is not reserved. model.flow is a non-data descriptor, so a parameter (field) named flow shadows it: model.flow then returns the field value. Reach the flow API for such a model with Flow.of(model) (equivalent to model.flow for models that do not shadow it). Defining a flow field emits a pydantic warning that it shadows the accessor — expected for this case.

from ccflow import Flow, FromContext


@Flow.model
def add(a: int, b: FromContext[int]) -> int:
    return a + b


model = add(a=10, b=5)
assert model.flow.compute().value == 15
assert model.flow.compute(b=7).value == 17

Contextual precedence is:

  1. branch-local .flow.with_context(...) rewrites,
  2. incoming runtime context,
  3. contextual defaults stored on the model instance,
  4. function defaults.

.flow.compute(...)

.flow.compute(...) is the ergonomic execution entry point for contextual execution of the whole DAG.

For generated @Flow.model stages it accepts either:

  • keyword inputs that become the ambient runtime context bag, or
  • one context object.

It does not accept both at the same time.

Plain handwritten CallableModel instances also expose .flow.compute(...). For those models, keyword inputs build or update the runtime context. If the decorated @Flow.call method declares a default context object, no-argument .flow.compute() uses that default, and keyword inputs override fields from that default for the .flow.compute(...) call.

from ccflow import Flow, FlowContext, FromContext


@Flow.model
def add(a: int, b: FromContext[int]) -> int:
    return a + b


model = add(a=10)
assert model.flow.compute(b=5).value == 15
assert model.flow.compute(FlowContext(b=6)).value == 16

For @Flow.model generated models, the kwargs form is intentionally a DAG entrypoint: it can include extra fields needed only by upstream transformed dependencies. Regular parameters are still never read from runtime context. compute() enforces two guardrails on keyword inputs:

  • If a key matches an unbound regular parameter, it raises early instead of silently treating that value as configuration.
  • If a key matches an already-bound regular parameter, it raises to prevent accidental rebinding. Use a context object (FlowContext) when you need ambient fields whose names collide with bound regular parameters.
from ccflow import Flow, FromContext


@Flow.model
def source(value: FromContext[int]) -> int:
    return value


@Flow.model
def add(left: int, right: int, bonus: FromContext[int]) -> int:
    return left + right + bonus


@Flow.context_transform
def add_offset(value: FromContext[int], amount: int) -> int:
    return value + amount


base = source()
model = add(
    left=base.flow.with_context(value=add_offset(amount=1)),
    right=base.flow.with_context(value=add_offset(amount=10)),
)

assert model.flow.inspect().context_inputs == {"bonus": int}
assert model.flow.compute(value=5, bonus=100).value == 121

If a regular parameter is already bound on the root model and you need to pass an ambient context field with the same name for upstream graph nodes, use a context object instead of keyword inputs. The kwargs form rejects keys that match already-bound regular parameters to prevent accidental rebinding:

from ccflow import Flow, FlowContext, FromContext


@Flow.model
def source(a: FromContext[int]) -> int:
    return a


@Flow.model
def combine(a: int, left: int, bonus: FromContext[int]) -> int:
    return a + left + bonus


model = combine(a=100, left=source())

# The context object form lets ambient 'a=7' flow to upstream nodes
# while root 'a' stays bound to 100:
assert model.flow.compute(FlowContext(a=7, bonus=5)).value == 112

# The kwargs form rejects 'a' because it is a bound regular parameter:
# model.flow.compute(a=7, bonus=5)
# → TypeError: compute() does not accept regular parameter override(s): a.

compute() returns the same result object you would get from model(context), unless auto_unwrap=True is enabled for an auto-wrapped plain return type:

from ccflow import Flow, FromContext


@Flow.model(auto_unwrap=True)
def add(a: int, b: FromContext[int]) -> int:
    return a + b


result = add(a=10).flow.compute(b=5)
assert result == 15

@Flow.context_transform and .flow.with_context(...)

@Flow.context_transform defines reusable, serializable contextual rewrites using the same FromContext[...] language as @Flow.model. A transform's return type determines how it can be used in with_context():

  • Patch transforms return a Mapping (e.g. dict[str, object]) of contextual field names to replacement values. They are passed as positional inputs to with_context().
  • Field transforms return a single scalar value. They are passed as keyword inputs to with_context(), keyed by the contextual field they replace.
from datetime import date, timedelta

from ccflow import DateRangeContext, Flow, FromContext


@Flow.model
def count_visitors(location: str, start_date: FromContext[date], end_date: FromContext[date]) -> int:
    days = (end_date - start_date).days + 1
    return days * 12 + len(location)


@Flow.model
def visitor_delta(current: int, previous: int, start_date: FromContext[date], end_date: FromContext[date]) -> dict:
    return {"window": f"{start_date} -> {end_date}", "change": current - previous}


# Patch transform — shifts multiple fields together, passed positionally:
@Flow.context_transform
def previous_window(start_date: FromContext[date], end_date: FromContext[date], days: int) -> dict[str, object]:
    return {
        "start_date": start_date - timedelta(days=days),
        "end_date": end_date - timedelta(days=days),
    }


# Field transform — shifts one field, passed as a keyword:
@Flow.context_transform
def shift_end(end_date: FromContext[date], days: int) -> date:
    return end_date - timedelta(days=days)


current = count_visitors(location="library")

# Patch transform: shift both dates back 30 days
previous = current.flow.with_context(previous_window(days=30))

# Mix both forms: patch shifts start_date, keyword shifts end_date independently
custom = current.flow.with_context(
    previous_window(days=30),
    end_date=shift_end(days=7),  # keyword override replaces end_date from the patch
)

delta = visitor_delta(current=current, previous=previous)
result = delta.flow.compute(DateRangeContext(start_date=date(2024, 1, 1), end_date=date(2024, 1, 31)))

In this example, current and previous share the same underlying count_visitors configuration but see different date windows at runtime. previous uses a patch transform to shift both dates back 30 days. custom demonstrates combining both forms: the patch sets both dates, then the keyword override replaces end_date with a different shift. Keyword overrides always apply last.

Key rules:

  • with_context() only targets contextual fields,
  • positional inputs must be patch transforms,
  • keyword overrides may be literals or field transforms,
  • raw positional callables are rejected; use named @Flow.context_transform helpers for positional patch transforms,
  • keyword callable literals are allowed only when the target field is typed as FromContext[Callable[..., T]]; other keyword callables must be field transforms,
  • transforms are branch-local — they only affect the wrapped dependency, not the entire pipeline,
  • patch results merge left-to-right, then keyword overrides apply last,
  • every transform evaluates against the original incoming runtime context; if multiple fields must move together, put that logic inside one patch transform.

Context transforms serialize the analyzed transform contract directly in serialized_config so bound models can move through pickle and Ray workers without re-resolving annotations from the defining module. This applies to importable module functions, local functions, nested functions, __main__, and notebook-defined transforms. Treat that serialized_config as an opaque generated artifact owned by ccflow, not as a stable hand-written YAML/JSON configuration format.

context_type=...

When you want the FromContext[...] fields to match an existing nominal context shape, use context_type=...:

from datetime import date

from ccflow import DateRangeContext, Flow, FromContext


@Flow.model(context_type=DateRangeContext)
def count_visitors(location: str, start_date: FromContext[date], end_date: FromContext[date]) -> int:
    days = (end_date - start_date).days + 1
    return days * 12 + len(location)

That preserves the primary FromContext[...] authoring model while letting callers pass richer context objects whose relevant fields satisfy the declared context_type.

context_type=... is a validation/coercion contract for the named FromContext[...] fields. Generated @Flow.model instances still expose FlowContext as their runtime context_type.

If the function genuinely needs the runtime context object itself inside the function body on each call, use a normal CallableModel subclass instead of @Flow.model.

For class-based CallableModel methods that want to declare context fields as keyword-only parameters, see Flow.call(auto_context=...) in Workflows.

Introspection

The public .flow surface is intentionally small:

  • model.flow.compute(...): evaluate the model,
  • model.flow.with_context(...): return a branch-local contextual wrapper,
  • model.flow.inspect(...): return a structured debugging summary.

Use inspect(...) when you want to understand what is bound, what is still contextual, and which direct dependencies are attached:

inspection = model.flow.inspect()

inspection.inputs        # direct function inputs and their sources
inspection.context_inputs   # declared contextual contract
inspection.runtime_inputs   # direct runtime inputs after wrapper bindings
inspection.required_inputs  # required runtime inputs still needed
inspection.bound_inputs     # construction/static values already fixed
inspection.dependencies     # dependency edges, controlled by dependencies=...

The top-level input fields are intentionally current-level only. They describe the model or wrapper you inspected, not a flattened view of the whole dependency graph. inspection.inputs is a dict from that model's function input name to an InputSpec. Each InputSpec reports the expected type, whether the input is required, the declared default, the effective direct value if known, and whether that value came from construction, a function default, runtime context, or with_context(...).

Dependency information lives under inspection.dependencies. Each dependency edge reports the parameter path, target model, projected context values when known, and whether the edge is lazy. To inspect a child, inspect that child model directly:

inspection = model.flow.inspect(value=3)
dependency = inspection.dependencies[0]

if dependency.context is None:
    child = dependency.model.flow.inspect()
else:
    child = dependency.model.flow.inspect(dependency.context)

child.inputs
child.runtime_inputs
child.required_inputs

This explicit nesting avoids merging unrelated models into one ambiguous input namespace. It also avoids name collisions when multiple dependencies use the same context field name for different branches.

context_inputs intentionally stays faithful to the model's declared contract. For bound models, with_context(...) bindings are reflected in runtime_inputs, required_inputs, bound_inputs, and inputs. Literal bindings satisfy their target fields. Transform bindings with runtime inputs add those source context inputs to the effective runtime view. Static transforms, meaning transforms with no contextual parameters, may be evaluated during introspection so their output fields can be reported precisely. A transform parameter like seed: FromContext[int] = 0 is still a runtime input; its default only means the caller is not required to provide it.

inspect(...) reports the direct API for the current model or wrapper. By default, it also inspects immediate dependencies:

model.flow.inspect(dependencies="direct")  # default
model.flow.inspect(dependencies="recursive")  # follow inspect-visible dependencies
model.flow.inspect(dependencies="none")    # skip dependency inspection

Recursive inspection follows dependencies visible from constructed @Flow.model inputs and with_context(...) wrappers. It is intended for debugging generated-model trees, not as a complete evaluator graph. A handwritten CallableModel can still appear as a dependency target when it is bound to an @Flow.model regular input, but inspect(...) does not expand that handwritten model's custom CallableModel.__deps__ implementation. Recursive mode changes which dependency edges are listed; it does not change the meaning of the top-level input fields. Lazy dependency edges are listed as lazy; inspect the lazy target directly if you want to see what it could require when called.

Because introspection may evaluate static @Flow.context_transform functions, context transforms should be deterministic, side-effect-free, and cheap. This is the same practical contract expected by cache identity and dependency analysis.

Example:

from ccflow import Flow, FromContext


@Flow.model
def add(a: int, b: FromContext[int], c: FromContext[int] = 5) -> int:
    return a + b + c


model = add(a=10)
inspection = model.flow.inspect()
assert inspection.context_inputs == {"b": int, "c": int}
assert inspection.runtime_inputs == {"b": int, "c": int}
assert inspection.required_inputs == {"b": int}
assert inspection.bound_inputs == {"a": 10}
assert set(inspection.inputs) == {"a", "b", "c"}


@Flow.context_transform
def from_seed(seed: FromContext[int]) -> int:
    return seed + 1


bound = add(a=10).flow.with_context(b=from_seed())
bound_inspection = bound.flow.inspect()
assert bound_inspection.context_inputs == {"b": int, "c": int}
assert bound_inspection.runtime_inputs == {"c": int, "seed": int}
assert bound_inspection.required_inputs == {"seed": int}
assert bound_inspection.bound_inputs == {"a": 10}

In the bound example, b remains in context_inputs because add still declares b as part of its contextual contract. It is absent from runtime_inputs because this wrapper supplies b from from_seed(). seed appears in runtime_inputs because the transform reads it from the caller's runtime context.

When you pass a proposed context object or runtime kwargs to inspect(...), inspection uses those values to fill known direct inputs and project context onto dependency edges. It does not validate the proposed context or report unused fields:

inspection = add(a=10).flow.inspect(b=2, unused=1)
assert inspection.inputs["b"].value == 2
assert "unused" not in inspection.inputs

This keeps inspect(...) structural. A stricter debug-time input checker can be added later with explicit current-model versus graph-wide semantics.

Lazy Dependencies

Lazy[T] defers evaluation of an upstream dependency until the function body explicitly calls it. This is useful when a dependency is expensive and only needed conditionally:

from ccflow import Flow, FlowContext, FromContext, Lazy


@Flow.model
def load_value(value: FromContext[int]) -> int:
    return value * 10


@Flow.model
def maybe_use(current: int, fallback: Lazy[int], threshold: FromContext[int]) -> int:
    if current > threshold:
        return current          # fallback is never evaluated
    return fallback()           # evaluate only when needed


model = maybe_use(current=50, fallback=load_value())

# current (50) > threshold (10), so load_value never runs:
assert model.flow.compute(value=3, threshold=10).value == 50

# current (5) <= threshold (10), so load_value runs (3 * 10 = 30):
model2 = maybe_use(current=5, fallback=load_value())
assert model2.flow.compute(value=3, threshold=10).value == 30

Without Lazy[T], the upstream model would always run. With it, the function controls exactly when (and whether) the dependency executes.

When To Use @Flow.model

Use @Flow.model when:

  • the stage logic is naturally a plain function,
  • you want ordinary inputs to look like ordinary Python function parameters,
  • the contextual contract is small and explicit,
  • the main goal is easy graph authoring on top of existing ccflow machinery.

Use a hand-written class-based CallableModel when:

  • the model needs custom methods or substantial internal state,
  • the full context object is the natural primary interface,
  • the stage is no longer best expressed as one function and a small amount of wiring.

Troubleshooting

compute() says a field is not contextual

That field is a regular parameter. Bind it at construction time. Only FromContext[...] fields belong in compute().

with_context() rejects a field

with_context() only rewrites contextual inputs. If you are trying to attach one stage to another, pass the upstream model as a regular argument at construction time.

A contextual parameter still shows up in context_inputs after I bound it

That is expected. context_inputs reports the declared contextual contract of the model or wrapped model. It does not mean the current wrapper still requires the caller to provide that field.

Use model.flow.inspect().runtime_inputs to see the effective direct runtime context inputs after with_context(...) bindings. Use model.flow.inspect().required_inputs to see what still must be provided by the caller. Static transforms with no contextual parameters may be evaluated during introspection, so their output fields can be removed from runtime_inputs and required_inputs or added to bound_inputs.

A shared dependency runs more than once

@Flow.model authors the graph cleanly, but execution still follows the normal ccflow evaluator path. If you need deduplication or graph scheduling, use the appropriate evaluators and cache settings just as you would for class-based CallableModels.

Clone this wiki locally