Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 179 additions & 2 deletions collectors/macro.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import json
import logging
import time
from datetime import datetime, timezone
from datetime import date, datetime, timedelta, timezone
from typing import Optional

import boto3
Expand Down Expand Up @@ -170,7 +170,23 @@ def collect(
logger.warning("macro_history write failed (macro.json unaffected): %s", e)
history_status = {"status": "error", "error": str(e)}

return {"status": "ok", "fields": len(macro), "macro_history": history_status}
# Macro release calendar — a SECOND dashboard-only artifact, guarded the same
# way (a calendar failure must never mask the macro.json success the trading
# pipeline depends on; the failure is recorded in the WARN log + the returned
# ``release_calendar`` status field).
release_status: dict = {"status": "skipped_empty", "rows": 0}
try:
release_status = write_release_calendar(bucket=bucket, s3_prefix=s3_prefix, dry_run=dry_run)
except Exception as e: # noqa: BLE001 - secondary artifact; never fail macro.json
logger.warning("release_calendar write failed (macro.json unaffected): %s", e)
release_status = {"status": "error", "error": str(e)}

return {
"status": "ok",
"fields": len(macro),
"macro_history": history_status,
"release_calendar": release_status,
}


def _fetch_fred() -> dict:
Expand Down Expand Up @@ -374,6 +390,167 @@ def write_macro_history(bucket: str, s3_prefix: str = "market_data/", dry_run: b
return {"status": "ok", "rows": len(df), "series": int(df["series_id"].nunique())}


# ── Macro release calendar ────────────────────────────────────────────────────
# A second dashboard-facing artifact: forward-looking macro EVENT dates (FRED
# data releases + scheduled FOMC meetings) for robodashboard's Calendar page.
# Unlike macro_history (full observation history), this carries the NEXT release
# date per indicator. Overwritten weekly. See ARTIFACT_REGISTRY.yaml.
_RELEASE_CALENDAR_KEY = "macro_release_calendar.parquet"
_RELEASE_HORIZON_DAYS = 180
_FRED_RELEASE_DATES_BASE = "https://api.stlouisfed.org/fred/release/dates"
_FRED_SERIES_RELEASE_BASE = "https://api.stlouisfed.org/fred/series/release"
_RELEASE_CALENDAR_COLS = ["date", "kind", "series_id", "label", "release_name"]

# FRED series → calendar label, restricted to indicators with a clean monthly /
# weekly release schedule. FEDFUNDS and the daily series (DGS2/DGS10/T10Y2Y/
# VIXCLS/HY-OAS) are intentionally absent: a daily release is noise on a
# calendar, and the meaningful fed-funds event is the FOMC meeting (emitted
# separately below), not the daily H.15 print.
_RELEASE_CALENDAR_SERIES = {
"CPIAUCSL": "CPI release",
"UNRATE": "Employment Situation (Unemployment)",
"ICSA": "Initial Jobless Claims",
"UMCSENT": "Consumer Sentiment",
}

# Scheduled 2026 FOMC meeting decision days — the SECOND day of each two-day
# meeting, when the statement the market reacts to is released. Source:
# federalreserve.gov FOMC calendar. REFRESH ANNUALLY (append 2027 dates when the
# Fed publishes them; stale years simply drop off via the >= today filter).
_FOMC_MEETINGS = (
"2026-01-28",
"2026-03-18",
"2026-04-29",
"2026-06-17",
"2026-07-29",
"2026-09-16",
"2026-10-28",
"2026-12-09",
)


def _fred_release_id(series_id: str, api_key: str) -> tuple[int, str] | None:
"""Return ``(release_id, release_name)`` for a FRED series, or None on failure."""
try:
params = {"series_id": series_id, "api_key": api_key, "file_type": "json"}
resp = requests.get(_FRED_SERIES_RELEASE_BASE, params=params, timeout=_FRED_TIMEOUT)
resp.raise_for_status()
releases = resp.json().get("releases", [])
if not releases:
return None
r = releases[0]
return int(r["id"]), r.get("name", "")
except Exception as e: # noqa: BLE001 - omit this series, don't fail the artifact
logger.warning("FRED series/release %s failed: %s", series_id, e)
return None


def _fred_release_dates(release_id: int, api_key: str) -> list[str]:
"""Return scheduled release dates (ISO strings) for a FRED release.

``include_release_dates_with_no_data=true`` makes FRED include FUTURE
scheduled dates (which have no data yet). Returns ``[]`` on failure; the
caller filters to the forward horizon.
"""
try:
params = {
"release_id": release_id,
"api_key": api_key,
"file_type": "json",
"include_release_dates_with_no_data": "true",
"sort_order": "desc",
"limit": 24,
}
resp = requests.get(_FRED_RELEASE_DATES_BASE, params=params, timeout=_FRED_TIMEOUT)
resp.raise_for_status()
return [d["date"] for d in resp.json().get("release_dates", []) if d.get("date")]
except Exception as e: # noqa: BLE001 - omit this series, don't fail the artifact
logger.warning("FRED release/dates %s failed: %s", release_id, e)
return []


def build_release_calendar(api_key: str | None = None, today: date | None = None) -> pd.DataFrame:
"""Build the forward macro event calendar (FRED releases + FOMC meetings).

Columns: ``date, kind ('release'|'fomc'), series_id, label, release_name``.
One row per upcoming FRED release in ``[today, today+_RELEASE_HORIZON_DAYS]``
plus every future scheduled FOMC meeting. FRED fetches are best-effort per
series (a failure omits that series, not the artifact), so a FRED hiccup
still yields a FOMC-only calendar. Returns an empty frame (right columns)
only when there's no FRED key — matching ``build_macro_history`` so the
caller skips the write cleanly.
"""
if api_key is None:
api_key = get_secret("FRED_API_KEY", required=False, default="")
if not api_key:
logger.warning("FRED_API_KEY not set — skipping release calendar")
return pd.DataFrame(columns=_RELEASE_CALENDAR_COLS)
if today is None:
today = datetime.now(timezone.utc).date()
horizon = today + timedelta(days=_RELEASE_HORIZON_DAYS)

records: list[dict] = []
for series_id, label in _RELEASE_CALENDAR_SERIES.items():
rel = _fred_release_id(series_id, api_key)
if rel is None:
continue
release_id, release_name = rel
for d in _fred_release_dates(release_id, api_key):
try:
dd = datetime.strptime(d, "%Y-%m-%d").date()
except ValueError:
continue
if today <= dd <= horizon:
records.append(
{"date": d, "kind": "release", "series_id": series_id,
"label": label, "release_name": release_name}
)

for d in _FOMC_MEETINGS:
if datetime.strptime(d, "%Y-%m-%d").date() >= today:
records.append(
{"date": d, "kind": "fomc", "series_id": "FOMC",
"label": "FOMC Meeting", "release_name": "Federal Open Market Committee"}
)

df = pd.DataFrame(records, columns=_RELEASE_CALENDAR_COLS)
if not df.empty:
df.sort_values("date", inplace=True, kind="stable")
df.reset_index(drop=True, inplace=True)
logger.info("Built release calendar: %d events", len(df))
return df


def write_release_calendar(bucket: str, s3_prefix: str = "market_data/", dry_run: bool = False) -> dict:
"""Build + write the release calendar to ``market_data/macro_release_calendar.parquet``.

OVERWRITES the single fixed key each run (idempotent — the build is fully
derived from FRED's schedule + the FOMC constant). An empty build (no FRED
key) is a no-op rather than clobbering a good artifact with nothing.
"""
df = build_release_calendar()
if df.empty:
logger.warning("release calendar empty — skipping write (no FRED key)")
return {"status": "skipped_empty", "rows": 0}

if dry_run:
logger.info("[dry-run] release_calendar: %d events", len(df))
return {"status": "ok_dry_run", "rows": len(df)}

buf = io.BytesIO()
df.to_parquet(buf, engine="pyarrow", compression="snappy", index=False)
buf.seek(0)
key = f"{s3_prefix}{_RELEASE_CALENDAR_KEY}"
boto3.client("s3").put_object(
Bucket=bucket,
Key=key,
Body=buf.getvalue(),
ContentType="application/octet-stream",
)
logger.info("Wrote release calendar to s3://%s/%s (%d events)", bucket, key, len(df))
return {"status": "ok", "rows": len(df)}


def _fetch_market_prices() -> dict:
"""Fetch commodity and index prices via yfinance."""
commodity_tickers = ["CL=F", "GC=F", "HG=F"]
Expand Down
2 changes: 1 addition & 1 deletion tests/test_artifact_registry_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
"collectors/daily_closes_fred_repair.py": 1,
"collectors/fred_history.py": 1,
"collectors/fundamentals.py": 1,
"collectors/macro.py": 2, # weekly/<date>/macro.json + macro_history.parquet
"collectors/macro.py": 3, # weekly/<date>/macro.json + macro_history.parquet + macro_release_calendar.parquet
"collectors/prices.py": 1,
"collectors/short_interest.py": 1,
"collectors/signal_returns.py": 1,
Expand Down
137 changes: 137 additions & 0 deletions tests/test_release_calendar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
"""Tests for the macro release-calendar artifact (collectors.macro).

The release calendar (market_data/macro_release_calendar.parquet) is a second
dashboard-facing artifact: forward-looking macro EVENT dates — FRED data
releases (CPI / unemployment / claims / sentiment) plus scheduled FOMC meetings
— consumed by robodashboard's Calendar page. These tests keep the FRED fetch +
S3 write offline via monkeypatch and pin ``today`` for determinism.
"""

from __future__ import annotations

import io
from datetime import date

import pandas as pd

from collectors import macro

_CAL_COLS = ["date", "kind", "series_id", "label", "release_name"]

_TODAY = date(2026, 6, 4)


def _fake_release_id(series_id, api_key):
# Each calendar series maps to a distinct fake release id + name.
names = {
"CPIAUCSL": "Consumer Price Index",
"UNRATE": "Employment Situation",
"ICSA": "Unemployment Insurance Weekly Claims",
"UMCSENT": "Surveys of Consumers",
}
ids = {"CPIAUCSL": 10, "UNRATE": 50, "ICSA": 180, "UMCSENT": 91}
return (ids[series_id], names[series_id])


def _fake_release_dates(release_id, api_key):
# One past date (filtered out), one in-window future, one far-future (out of
# the 180d horizon). Same shape for every release id.
return ["2026-05-13", "2026-06-11", "2027-01-15"]


def _patch_fred(monkeypatch):
monkeypatch.setattr(macro, "_fred_release_id", _fake_release_id)
monkeypatch.setattr(macro, "_fred_release_dates", _fake_release_dates)


def test_build_release_calendar_shape_and_kinds(monkeypatch):
_patch_fred(monkeypatch)
df = macro.build_release_calendar(api_key="fake-key", today=_TODAY)
assert list(df.columns) == _CAL_COLS
assert set(df["kind"]) == {"release", "fomc"}
# All four configured release series appear (each contributes the in-window date).
assert set(df[df["kind"] == "release"]["series_id"]) == set(macro._RELEASE_CALENDAR_SERIES)


def test_build_release_calendar_future_only_and_horizon(monkeypatch):
_patch_fred(monkeypatch)
df = macro.build_release_calendar(api_key="fake-key", today=_TODAY)
rel_dates = set(df[df["kind"] == "release"]["date"])
assert "2026-05-13" not in rel_dates # past → dropped
assert "2026-06-11" in rel_dates # in-window future → kept
assert "2027-01-15" not in rel_dates # beyond 180d horizon → dropped


def test_build_release_calendar_fomc_future_only(monkeypatch):
_patch_fred(monkeypatch)
df = macro.build_release_calendar(api_key="fake-key", today=_TODAY)
fomc = df[df["kind"] == "fomc"]
assert list(fomc["series_id"].unique()) == ["FOMC"]
fomc_dates = set(fomc["date"])
# 2026-06-17 onward are future as of 2026-06-04; the Jan/Mar/Apr meetings are past.
assert "2026-06-17" in fomc_dates
assert "2026-12-09" in fomc_dates
assert "2026-04-29" not in fomc_dates


def test_build_release_calendar_sorted_by_date(monkeypatch):
_patch_fred(monkeypatch)
df = macro.build_release_calendar(api_key="fake-key", today=_TODAY)
assert list(df["date"]) == sorted(df["date"])


def test_build_release_calendar_fomc_only_when_fred_down(monkeypatch):
# A FRED outage (no release id) must still yield a FOMC-only calendar, not
# an empty artifact — best-effort per series.
monkeypatch.setattr(macro, "_fred_release_id", lambda *a, **k: None)
df = macro.build_release_calendar(api_key="fake-key", today=_TODAY)
assert not df.empty
assert set(df["kind"]) == {"fomc"}


def test_build_release_calendar_empty_without_key(monkeypatch):
monkeypatch.setattr(macro, "get_secret", lambda *a, **k: "")
df = macro.build_release_calendar()
assert df.empty
assert list(df.columns) == _CAL_COLS


def test_write_release_calendar_puts_parquet(monkeypatch):
df = pd.DataFrame(
[{"date": "2026-06-11", "kind": "release", "series_id": "CPIAUCSL",
"label": "CPI release", "release_name": "Consumer Price Index"}],
columns=_CAL_COLS,
)
monkeypatch.setattr(macro, "build_release_calendar", lambda *a, **k: df)

captured = {}

class _FakeS3:
def put_object(self, **kwargs):
captured.update(kwargs)

monkeypatch.setattr(macro.boto3, "client", lambda service: _FakeS3())

result = macro.write_release_calendar(bucket="test-bucket")
assert result["status"] == "ok"
assert result["rows"] == 1
assert captured["Key"] == "market_data/macro_release_calendar.parquet"
back = pd.read_parquet(io.BytesIO(captured["Body"]), engine="pyarrow")
assert list(back.columns) == _CAL_COLS
assert back.iloc[0]["kind"] == "release"


def test_write_release_calendar_skips_empty(monkeypatch):
monkeypatch.setattr(macro, "build_release_calendar", lambda *a, **k: pd.DataFrame(columns=_CAL_COLS))

calls = []

class _FakeS3:
def put_object(self, **kwargs):
calls.append(kwargs)

monkeypatch.setattr(macro.boto3, "client", lambda service: _FakeS3())

result = macro.write_release_calendar(bucket="test-bucket")
assert result["status"] == "skipped_empty"
assert calls == []
Loading