Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ benchmark_results*/
# Local dataset files
liander_dataset/

# Deployment example run artifacts (MLflow store, forecasts, dataset, Celery/Airflow state)
openstef_deployment_runs/

# Mlflow
/mlflow
/mlflow_artifacts_local
Expand Down
16 changes: 16 additions & 0 deletions docs/source/user_guide/guides/deployment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,22 @@ observability. If you are getting started, the *Scheduled Notebooks* pattern
covers small deployments; the *DAG* and *Queued* patterns cover the next two
tiers.

.. admonition:: Runnable examples
:class: tip

The `examples/deployment <https://github.com/OpenSTEF/openstef/tree/main/examples/deployment>`_
directory contains self-contained, runnable versions of the *DAG* and *Queued*
patterns for three popular orchestrators. They simulate data integration with the
Liander benchmark dataset, so they run with no external infrastructure:

- `Dagster <https://github.com/OpenSTEF/openstef/tree/main/examples/deployment/src/dagster_app>`_ and
`Airflow <https://github.com/OpenSTEF/openstef/tree/main/examples/deployment/src/airflow_app>`_ —
the *DAG-based orchestration* pattern (separate train/predict tasks). Run with
``uv run poe deploy-dagster`` or ``uv run poe deploy-airflow-forecast``.
- `Celery <https://github.com/OpenSTEF/openstef/tree/main/examples/deployment/src/celery_app>`_ —
the *Queued execution* pattern (fan-out over many targets). Run with
``uv run poe deploy-celery`` (eager mode, no broker required).

Deployment Strategies
---------------------

Expand Down
10 changes: 9 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,12 @@ Before you start, make sure to install all the necessary dependencies by running

```bash
uv sync --all-extras --all-groups --all-packages
```
```

## Contents

- [`tutorials/`](tutorials) — step-by-step notebooks covering forecasting, backtesting,
feature engineering, ensembles, and more.
- [`benchmarks/`](benchmarks) — production-scale benchmark runs on the Liander 2024 dataset.
- [`deployment/`](deployment) — runnable deployment examples for Dagster, Airflow, and
Celery. See [`deployment/README.md`](deployment/README.md).
8 changes: 8 additions & 0 deletions examples/deployment/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# SPDX-FileCopyrightText: 2026 Contributors to the OpenSTEF project <openstef@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0

# Local run artifacts produced by the deployment examples (MLflow store, published
# forecasts, the Airflow home, and the downloaded dataset).
openstef_deployment_runs/
liander_dataset/
96 changes: 96 additions & 0 deletions examples/deployment/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
<!--
SPDX-FileCopyrightText: 2026 Contributors to the OpenSTEF project <openstef@lfenergy.org>

SPDX-License-Identifier: MPL-2.0
-->

# OpenSTEF deployment examples

Runnable, self-contained examples showing how to operate OpenSTEF on three popular
orchestrators. They implement the patterns described in the
[deployment guide](../../docs/source/user_guide/guides/deployment.rst):

| Example | Pattern | Best for |
| --- | --- | --- |
| [`dagster_app`](src/dagster_app) | DAG-based orchestration | Teams wanting retries, dependency tracking, audit trails |
| [`airflow_app`](src/airflow_app) | DAG-based orchestration | Teams already running Airflow |
| [`celery_app`](src/celery_app) | Queued execution | Large fleets of forecast targets (fan-out) |

All three share one thin layer in [`common`](src/common):

- **`config.py`** — a single `Settings` object (pydantic-settings) that embeds OpenSTEF's
own `ForecastingWorkflowConfig`, so every knob lives in one place and is environment
overridable.
- **`services.py`** — the mocked external systems a real deployment owns: fetching
measurements from a metering system, fetching a weather forecast from a weather provider,
and publishing the forecast downstream. They speak OpenSTEF's `TimeSeriesDataset` /
`ForecastDataset` types — **replace these with your own integrations**.
- **`pipeline.py`** — the real OpenSTEF code path: it combines the fetched data into the
dataset OpenSTEF expects and builds the workflow. The `fit` / `predict` calls stay in each
orchestrator.

## Simulated data

Like the [tutorials](../tutorials), these examples load the
[Liander 2024 benchmark dataset](https://huggingface.co/datasets/OpenSTEF/liander2024-energy-forecasting-benchmark)
from the HuggingFace Hub instead of wiring real data sources, so they run end-to-end with
zero external infrastructure. A fixed `reference_time` inside the 2024 data plays the role
of "now".

## Cross-process model handoff

Training and prediction run as **separate processes** in every orchestrator. The training
task persists its model to a local MLflow store (a self-contained SQLite tracking backend
under the data directory); the prediction task creates a fresh workflow and OpenSTEF's
`MLFlowStorageCallback` automatically loads the latest stored model for the same `model_id`.
This is the production-correct pattern — run training at least once before prediction. The
Dagster example shows this between two assets; Airflow and Celery between two tasks.

## Install

From the repository root (installs all three orchestrators plus the `poe` runner):

```bash
uv sync --all-extras --all-packages
```

## Run

Each example is wrapped in `poe` tasks so you do not need to know the framework CLIs. Every
framework has the **same three commands** — a UI plus train and forecast — and all of them run
locally with **no external infrastructure** (no database, broker, or message queue):

| Framework | UI | Train (CLI) | Forecast (CLI) |
| --- | --- | --- | --- |
| Dagster | `uv run poe deploy-dagster-ui` | `uv run poe deploy-dagster-train` | `uv run poe deploy-dagster-forecast` |
| Airflow | `uv run poe deploy-airflow-ui` | `uv run poe deploy-airflow-train` | `uv run poe deploy-airflow-forecast` |
| Celery | `uv run poe deploy-celery-ui` | `uv run poe deploy-celery-train` | `uv run poe deploy-celery-forecast` |

The web UIs serve at http://localhost:3000 (Dagster), :8080 (Airflow), and :5555 (Celery/Flower).
Run **train before forecast** — the forecast loads the model training persisted.

### Celery has no broker dependency

The Celery example defaults to a **filesystem broker** under the data directory, so a worker,
beat schedule, and Flower all run with no server. The `-train` / `-forecast` tasks run eagerly
in-process (simplest). To exercise the real queue, start a worker and watch it in Flower:

```bash
uv run --extra celery celery -A celery_app.app worker --pool solo # in one terminal
uv run poe deploy-celery-ui # Flower, in another
```

For production scale, point the broker at Redis with no code change:

```bash
export OPENSTEF_DEPLOY_BROKER_URL=redis://localhost:6379/0
export OPENSTEF_DEPLOY_RESULT_BACKEND=redis://localhost:6379/1
```

> **Notes:** Flower's live worker/task monitoring needs a broker that supports remote control
> (e.g. Redis); over the filesystem broker the dashboard runs but shows limited live data. On
> macOS the Airflow *web UI* (`deploy-airflow-ui`) can crash its gunicorn workers due to a known
> fork-safety issue — the Airflow **CLI** tasks are unaffected. Both work on Linux/CI.

See each subpackage's module docstring for more, including the real Redis broker, Celery beat,
and the Airflow/Dagster schedulers.
103 changes: 103 additions & 0 deletions examples/deployment/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# SPDX-FileCopyrightText: 2026 Contributors to the OpenSTEF project <openstef@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0

[build-system]
build-backend = "hatchling.build"

requires = [ "hatchling" ]

[project]
name = "openstef-deployment-examples"
version = "0.0.0"
description = "Runnable deployment examples for OpenSTEF (Dagster, Airflow, Celery)"
readme = "README.md"
requires-python = ">=3.12,<4.0"
dependencies = [
"openstef-core[benchmark]",
"openstef-models[xgb-cpu]",
"pydantic-settings>=2.14,<3",
]

optional-dependencies.airflow = [ "apache-airflow>=2.10" ]
optional-dependencies.celery = [
"celery[redis]>=5.4",
"flower>=2",
]
optional-dependencies.dagster = [
"dagster>=1.9",
"dagster-webserver>=1.9",
]

[tool.hatch.build.targets.wheel]
packages = [
"src/common",
"src/dagster_app",
"src/airflow_app",
"src/celery_app",
]

# Run-the-example tasks, co-located with the example. The root pyproject pulls these in via
# `[tool.poe] include`, so `uv run poe deploy-*` works from the repo root (where POE_ROOT,
# and therefore the paths below, resolve). Each framework has the same three commands:
# `-ui` (launch the web UI), `-train`, and `-forecast`.

[tool.poe.tasks.deploy-dagster-ui]
help = "Dagster (UI): the Dagster web UI at http://localhost:3000"
env.OPENSTEF_DEPLOY_DATA_DIR = "${POE_ROOT}/examples/deployment/openstef_deployment_runs"
cmd = "uv run --no-sync --package openstef-deployment-examples dagster dev -m dagster_app.definitions"

[tool.poe.tasks.deploy-dagster-train]
help = "Dagster (CLI): train every target (materialize the trained_model asset)"
env.OPENSTEF_DEPLOY_DATA_DIR = "${POE_ROOT}/examples/deployment/openstef_deployment_runs"
cmd = "uv run --no-sync --package openstef-deployment-examples python -m dagster_app.run train"

[tool.poe.tasks.deploy-dagster-forecast]
help = "Dagster (CLI): forecast every target (materialize the forecast asset)"
env.OPENSTEF_DEPLOY_DATA_DIR = "${POE_ROOT}/examples/deployment/openstef_deployment_runs"
cmd = "uv run --no-sync --package openstef-deployment-examples python -m dagster_app.run forecast"

[tool.poe.tasks.deploy-airflow-ui]
help = "Airflow (UI): all-in-one webserver + scheduler at http://localhost:8080"
env.OPENSTEF_DEPLOY_DATA_DIR = "${POE_ROOT}/examples/deployment/openstef_deployment_runs"
env.AIRFLOW_HOME = "${POE_ROOT}/examples/deployment/openstef_deployment_runs/airflow"
env.AIRFLOW__CORE__DAGS_FOLDER = "${POE_ROOT}/examples/deployment/src/airflow_app/dags"
env.AIRFLOW__CORE__LOAD_EXAMPLES = "False"
cmd = "uv run --no-sync --package openstef-deployment-examples airflow standalone"

[tool.poe.tasks.deploy-airflow-train]
help = "Airflow (CLI): run the training DAG once (no scheduler)"
env.OPENSTEF_DEPLOY_DATA_DIR = "${POE_ROOT}/examples/deployment/openstef_deployment_runs"
env.AIRFLOW_HOME = "${POE_ROOT}/examples/deployment/openstef_deployment_runs/airflow"
env.AIRFLOW__CORE__DAGS_FOLDER = "${POE_ROOT}/examples/deployment/src/airflow_app/dags"
env.AIRFLOW__CORE__LOAD_EXAMPLES = "False"
sequence = [
{ cmd = "uv run --no-sync --package openstef-deployment-examples airflow db migrate" },
{ cmd = "uv run --no-sync --package openstef-deployment-examples airflow dags test openstef_train 2024-04-15" },
]

[tool.poe.tasks.deploy-airflow-forecast]
help = "Airflow (CLI): run the forecast DAG once (no scheduler)"
env.OPENSTEF_DEPLOY_DATA_DIR = "${POE_ROOT}/examples/deployment/openstef_deployment_runs"
env.AIRFLOW_HOME = "${POE_ROOT}/examples/deployment/openstef_deployment_runs/airflow"
env.AIRFLOW__CORE__DAGS_FOLDER = "${POE_ROOT}/examples/deployment/src/airflow_app/dags"
env.AIRFLOW__CORE__LOAD_EXAMPLES = "False"
sequence = [
{ cmd = "uv run --no-sync --package openstef-deployment-examples airflow db migrate" },
{ cmd = "uv run --no-sync --package openstef-deployment-examples airflow dags test openstef_forecast 2024-04-15" },
]

[tool.poe.tasks.deploy-celery-ui]
help = "Celery (UI): the Flower monitoring dashboard at http://localhost:5555"
env.OPENSTEF_DEPLOY_DATA_DIR = "${POE_ROOT}/examples/deployment/openstef_deployment_runs"
cmd = "uv run --no-sync --package openstef-deployment-examples celery -A celery_app.app flower"

[tool.poe.tasks.deploy-celery-train]
help = "Celery (CLI): train every target (eager, in-process, no broker)"
env.OPENSTEF_DEPLOY_DATA_DIR = "${POE_ROOT}/examples/deployment/openstef_deployment_runs"
cmd = "uv run --no-sync --package openstef-deployment-examples python -m celery_app.run train"

[tool.poe.tasks.deploy-celery-forecast]
help = "Celery (CLI): forecast every target (eager, in-process, no broker)"
env.OPENSTEF_DEPLOY_DATA_DIR = "${POE_ROOT}/examples/deployment/openstef_deployment_runs"
cmd = "uv run --no-sync --package openstef-deployment-examples python -m celery_app.run forecast"
3 changes: 3 additions & 0 deletions examples/deployment/src/airflow_app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# SPDX-FileCopyrightText: 2026 Contributors to the OpenSTEF project <openstef@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0
50 changes: 50 additions & 0 deletions examples/deployment/src/airflow_app/dags/openstef_forecast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# SPDX-FileCopyrightText: 2026 Contributors to the OpenSTEF project <openstef@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0

# pyright: reportUnknownMemberType=false, reportUnknownVariableType=false
# Airflow's TaskFlow decorators are not fully typed; silence that noise.

"""Airflow forecast DAG for OpenSTEF (DAG-based orchestration).

Runs hourly and, for each target, assembles prediction input, predicts, and publishes the
result. Each forecast process loads the model the training DAG persisted to the shared
MLflow store, so run ``openstef_train`` at least once first.

This example uses a fixed ``reference_time`` (inside the 2024 benchmark data) as "now". In a
real deployment you would instead derive it from the run's logical date
(``context["data_interval_end"]``).

Run it once without a scheduler with ``uv run poe deploy-airflow-forecast``.
"""

from __future__ import annotations

import pendulum
from airflow.decorators import dag, task
from common import pipeline, services
from common.config import Settings

settings = Settings()


@dag(
schedule="@hourly",
start_date=pendulum.datetime(2024, 4, 1, tz="UTC"),
catchup=False,
tags=["openstef"],
)
def openstef_forecast() -> None:
"""Forecast and publish for every target."""

@task
def forecast(target: str) -> None:
dataset = pipeline.prediction_dataset(target, settings=settings)
workflow = pipeline.build_workflow(target, settings=settings)
result = workflow.predict(dataset, forecast_start=settings.reference_time)
services.publish_forecast(result, target, settings=settings)

forecast.expand(target=settings.targets)


openstef_forecast()
45 changes: 45 additions & 0 deletions examples/deployment/src/airflow_app/dags/openstef_train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# SPDX-FileCopyrightText: 2026 Contributors to the OpenSTEF project <openstef@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0

# pyright: reportUnknownMemberType=false, reportUnknownVariableType=false
# Airflow's TaskFlow decorators are not fully typed; silence that noise.

"""Airflow training DAG for OpenSTEF (DAG-based orchestration).

Runs daily and trains one model per target, fanning out with Airflow's dynamic task
mapping so each target is an independently retriable task. The trained models are persisted
to the shared MLflow store for the forecast DAG to load.

Run it once without a scheduler with ``uv run poe deploy-airflow-train``.
"""

from __future__ import annotations

import pendulum
from airflow.decorators import dag, task
from common import pipeline
from common.config import Settings

settings = Settings()


@dag(
schedule="@daily",
start_date=pendulum.datetime(2024, 4, 1, tz="UTC"),
catchup=False,
tags=["openstef"],
)
def openstef_train() -> None:
"""Train one OpenSTEF model per target."""

@task
def train(target: str) -> None:
dataset = pipeline.training_dataset(target, settings=settings)
workflow = pipeline.build_workflow(target, settings=settings)
workflow.fit(dataset)

train.expand(target=settings.targets)


openstef_train()
3 changes: 3 additions & 0 deletions examples/deployment/src/celery_app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# SPDX-FileCopyrightText: 2026 Contributors to the OpenSTEF project <openstef@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0
Loading
Loading