A production-style quant data-engineering pipeline — ingest → validate → features → backtest — with an interactive Streamlit dashboard, an APScheduler cron daemon, a full pytest suite, and Docker support.
flowchart TD
YF["📡 yfinance\n(market data API)"]
DB[("🦆 DuckDB\nprices.duckdb\ntable: ohlcv")]
VAL["🔍 OHLCVValidator\n• no null close/volume\n• High ≥ Close ≥ Low\n• date contiguity"]
FE["⚙️ FeatureEngineer\n• Momentum ROC-20\n• RSI-14 Wilder EMA\n• Rolling Volatility\n• SMA Crossover Signal"]
BT["📊 Backtester\n• vectorbt (primary)\n• pandas (fallback)\n• Sharpe / MaxDD / CAGR"]
PF["💼 PortfolioBacktester\n• equal-weight 10-ticker\n• Sharpe / Sortino / Calmar"]
CLI["🖥️ Typer + Rich CLI\ningest / features /\nbacktest / portfolio /\nvalidate / run-all / schedule"]
DASH["📈 Streamlit Dashboard\n• price chart + signals\n• signal comparison table\n• cumulative returns\n• deep-dive sub-panels"]
SCH["🕕 APScheduler\nMon–Fri 18:00 UTC\ncron daemon"]
RPT["📄 Validation Reports\nlogs/pipeline.log\nreports/*.json"]
YF -->|"OHLCV download"| DB
DB -->|"per-ticker df"| VAL
VAL -->|"clean df"| FE
FE -->|"feature DataFrame"| BT
FE -->|"feature DataFrame"| PF
BT -->|"BacktestResult"| CLI
PF -->|"PortfolioResult"| CLI
DB -->|"live query"| DASH
SCH -->|"triggers ingest"| YF
SCH -->|"triggers features"| FE
VAL -->|"JSON report"| RPT
SCH -->|"rotating log"| RPT
# 1. Clone and enter the project
git clone https://github.com/KavinSundarr/alpha-pipeline.git
cd alpha-pipeline
# 2. Build the image and start the dashboard
docker-compose up --build dashboard
# 3. In a separate terminal, run the full pipeline
docker-compose run --rm pipeline ingest
docker-compose run --rm pipeline run-all --start 2020-01-01
# Dashboard is live at http://localhost:8501# 1. Create and activate a virtual environment
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
# 2. Install dependencies
# (pin numpy first to avoid the vectorbt / NumPy-2 conflict)
pip install "numpy==1.26.4"
pip install -r requirements.txt
# 3. Ingest market data (downloads ~10 years of OHLCV for 10 tickers)
python main.py ingest
# 4. Run the full pipeline
python main.py run-all --start 2020-01-01
# 5. Launch the dashboard
streamlit run dashboard/app.pySMA(20/50) crossover strategy · 2020-01-01 → 2024-12-31 · $100,000 starting capital · 10 bps commission Run
python main.py run-all --start 2020-01-01to reproduce on your machine.
| # | Ticker | Sharpe | Max DD | Ann. Return | Total Return | Trades | Win Rate |
|---|---|---|---|---|---|---|---|
| 1 | NVDA | +1.24 | 28.1% | +41.3% | +212.4% | 8 | 62.5% |
| 2 | AAPL | +0.87 | 18.4% | +19.2% | +89.6% | 10 | 60.0% |
| 3 | META | +0.76 | 31.2% | +22.1% | +106.3% | 7 | 57.1% |
| 4 | MSFT | +0.71 | 19.8% | +17.4% | +80.1% | 9 | 55.6% |
| 5 | GOOGL | +0.64 | 22.3% | +15.6% | +71.2% | 11 | 54.5% |
| 6 | SPY | +0.58 | 16.7% | +11.2% | +49.8% | 6 | 50.0% |
| 7 | AMZN | +0.43 | 35.6% | +12.8% | +58.7% | 12 | 50.0% |
| 8 | JPM | +0.31 | 25.1% | +8.9% | +39.4% | 8 | 50.0% |
| 9 | TSLA | -0.12 | 52.3% | -1.4% | -6.2% | 14 | 42.9% |
| 10 | JNJ | -0.24 | 18.9% | -3.2% | -14.1% | 6 | 33.3% |
Sorted by Sharpe ratio descending. Results are example output — run the pipeline to generate current figures.
alpha-pipeline/
├── main.py # CLI entry-point (Typer + Rich)
├── Dockerfile # Multi-stage Docker image
├── docker-compose.yml # pipeline + dashboard + scheduler services
├── pyproject.toml # Poetry config with pinned deps
├── requirements.txt # pip-installable dependencies
├── requirements-dev.txt # test dependencies
├── pytest.ini # pytest + coverage config
├── test.bat # Windows test runner
├── Makefile # Unix/macOS test runner
├── LICENSE # MIT
│
├── alpha_pipeline/
│ ├── __init__.py
│ ├── ingestion.py # yfinance → DuckDB (Ingester)
│ ├── features.py # FeatureEngineer: momentum/RSI/vol/SMA
│ ├── backtest.py # Backtester + BacktestResult
│ ├── portfolio.py # Equal-weight PortfolioBacktester
│ ├── validate.py # OHLCVValidator (great_expectations)
│ └── scheduler.py # APScheduler cron daemon
│
├── dashboard/
│ └── app.py # Streamlit + Plotly interactive UI
│
├── tests/
│ ├── conftest.py # shared pytest fixtures
│ ├── helpers.py # synthetic OHLCV generator (no network)
│ ├── test_features.py # unit tests: RSI/momentum/SMA/vol
│ ├── test_integration.py # end-to-end pipeline (DuckDB → backtest)
│ └── test_validation.py # data-quality validation tests
│
├── .github/
│ └── workflows/
│ └── ci.yml # GitHub Actions: test + Docker build
│
├── data/ # Auto-created — prices.duckdb lives here
├── logs/ # Auto-created — pipeline.log (rotating)
└── reports/ # Auto-created — validation JSON reports
Downloads historical OHLCV data via yfinance and persists it to a local DuckDB database. Automatically runs OHLCVValidator after every download; raises DataValidationError (with a JSON report path) if any check fails. Supports idempotent upserts — re-running never duplicates rows.
from alpha_pipeline.ingestion import Ingester
with Ingester(db_path="data/prices.duckdb") as ing:
ing.ingest(tickers=["AAPL", "MSFT"], start="2020-01-01", end="2024-12-31")
df = ing.load("AAPL")Vectorised feature computation on top of an OHLCV DataFrame. No loops — all operations are pandas/numpy broadcasting.
| Feature | Formula | Default window |
|---|---|---|
| Momentum | (P_t / P_{t-n}) − 1 |
20 days |
| RSI | Wilder EMA of gains / losses | 14 days |
| Volatility | std(log_ret) × √252 |
20 days |
| SMA Signal | 1 if SMA(fast) > SMA(slow) |
20 / 50 days |
from alpha_pipeline.features import compute_features
feat_df = compute_features(price_df, sma_fast=20, sma_slow=50)Vectorised SMA-crossover backtest engine. Uses vectorbt when available; falls back to a pure-pandas implementation that produces equivalent metrics. Signals are lagged by one bar to eliminate look-ahead bias.
| Metric | Description |
|---|---|
| Sharpe Ratio | Annualised (Rf = 0) |
| Max Drawdown | Peak-to-trough, positive decimal |
| CAGR | Compound annual growth rate |
| Total Return | Cumulative strategy return |
| Win Rate | Fraction of profitable round-trips |
Runs the SMA crossover strategy simultaneously across all tickers and combines positions into an equal-weighted portfolio (1/N cash allocation). Computes Sharpe, Sortino, Calmar, and Max Drawdown at the portfolio level, plus per-ticker Sharpe attribution.
great_expectations-backed data quality suite with a 4-level engine fallback (GE 1.x → GE 0.18 → legacy → pure pandas). Writes a timestamped JSON report to reports/ on every run regardless of outcome.
Checks:
no_null_close— zero nulls in the Close columnno_null_volume— zero nulls in the Volume columnhigh_gte_close— High ≥ Close for every rowclose_gte_low— Close ≥ Low for every rowdate_contiguity— no calendar gap between consecutive trading dates >max_gap_days
APScheduler cron daemon that refreshes OHLCV data and recomputes all features every weekday at 18:00 UTC. Logs every run to logs/pipeline.log via a rotating file handler (10 MB cap, 5 backups). Prints a Rich summary panel to the terminal after each run.
python main.py schedule # block at 18:00 UTC Mon–Fri
python main.py schedule --run-now # fire immediately, then keep scheduling
python main.py schedule --hour 9 --minute 30 # custom timeInteractive Plotly dashboard with four sections and a live sidebar.
| Section | Description |
|---|---|
| Price History & Signals | Price line + SMA overlays + buy ▲ / sell ▼ markers |
| Signal Comparison | Side-by-side Sharpe / MaxDD / Return / Trades table (all 4 signals vs buy-and-hold) |
| Cumulative Returns | All strategy curves vs buy-and-hold benchmark, starting at 1.0 |
| Signal Deep-Dive | Per-signal indicator sub-panel (RSI levels, momentum zero-line, etc.) |
python main.py --help| Command | Key options |
|---|---|
ingest |
--tickers A,B,C · --start · --end · --db-path |
features |
-t TICKER · --start · --sma-fast · --sma-slow · --rows |
backtest |
-t TICKER · --start · --sma-fast · --sma-slow · --cash · --fees |
run-all |
--tickers · --start · --sma-fast · --sma-slow |
portfolio |
--tickers · --start · --end · --sma-fast · --sma-slow |
validate |
-t TICKER · --start · --reports-dir · --max-gap-days |
schedule |
--tickers · --hour · --minute · --run-now · --log-dir |
All commands accept -v / --verbose for debug-level logging.
# Build both services
docker-compose build
# Start the dashboard only
docker-compose up dashboard
# Run a one-shot pipeline command
docker-compose run --rm pipeline ingest
docker-compose run --rm pipeline run-all --start 2020-01-01
docker-compose run --rm pipeline backtest --ticker NVDA --start 2022-01-01
# Start the scheduler daemon (weekday 18:00 UTC auto-refresh)
docker-compose --profile schedule up schedule| Volume / bind | Description |
|---|---|
pipeline_data (named) |
DuckDB file shared between pipeline and dashboard |
./logs (bind) |
pipeline.log — visible on the host |
./reports (bind) |
Validation JSON reports — visible on the host |
| Variable | Default | Description |
|---|---|---|
PYTHONUNBUFFERED |
1 |
Flush stdout immediately (set in compose) |
PYTHONDONTWRITEBYTECODE |
1 |
No .pyc files in the container |
# Install test dependencies
pip install -r requirements-dev.txt
# Run the full suite with coverage (Unix/macOS)
make test
# Run the full suite with coverage (Windows)
test
# Targeted runs
make test-unit # feature unit tests only (Windows: test unit)
make test-integration # end-to-end pipeline tests (Windows: test integration)
make test-validation # data quality tests (Windows: test validation)
make test-fast # stop at first failure (Windows: test fast)The test suite contains ~70 tests across three files:
test_features.py— RSI bounds, exact momentum values on known data, SMA crossover timing, volatility formula verificationtest_integration.py— full ingest → features → backtest pipeline with synthetic DuckDB data (no network); look-ahead bias guardtest_validation.py— null checks, OHLC constraint violations, date gap detection, report file creation,DataValidationErrorpropagation
CREATE TABLE ohlcv (
ticker VARCHAR NOT NULL,
date DATE NOT NULL,
open DOUBLE,
high DOUBLE,
low DOUBLE,
close DOUBLE,
adj_close DOUBLE,
volume BIGINT,
PRIMARY KEY (ticker, date)
)AAPL · MSFT · GOOGL · AMZN · META · NVDA · TSLA · JPM · JNJ · SPY
| Goal | How |
|---|---|
| Add a feature | Add a method to FeatureEngineer in features.py |
| Add a strategy | Compute a new signal column; pass via --signal-col |
| Export results | BacktestResult.to_dict() → pd.DataFrame → .to_csv() |
| New data source | Subclass or replace Ingester; keep the load() interface |
| Add a validation check | Add a method to OHLCVValidator._run_pandas_checks() |
This project is licensed under the MIT License — see LICENSE for details.
Built with Python · yfinance · DuckDB · pandas · vectorbt · APScheduler · Typer · Rich · Streamlit · Plotly · great-expectations · pytest · Docker