From 0c20961d7b50de4f41bfd993c7c04a5e4a6633fd Mon Sep 17 00:00:00 2001 From: ShivianNaidoo Date: Thu, 21 May 2026 13:15:59 +0200 Subject: [PATCH] =?UTF-8?q?feat(phase10):=20implement=20kill=20switch=20lo?= =?UTF-8?q?gic=20=E2=80=94=20drawdown,=20position=20loss,=20FSI,=20EDGAR?= =?UTF-8?q?=20rate=20limit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces four True-returning stubs with real enforcement logic: - check_drawdown_breach: halts at -15% peak-to-trough NAV drawdown - check_position_loss: halts at -25% single-name loss - check_fsi_breach: halts at FSI > 2.0 (severe systemic stress, above 2022 tightening cycle) - check_edgar_rate_limit: halts at ≥8 req/s via Redis counter; safe-defaults to True on Redis failure - check_kill_switches: new optional params (portfolio_nav_df, fsi_value, ticker, prices); fetches from DB internally when called with no args (preserving alpaca_client.py call site) 19 tests, all passing. --- nexus/execution/compliance.py | 210 ++++++++++++++++++++++++++-------- tests/test_compliance.py | 165 +++++++++++++++++++++----- 2 files changed, 302 insertions(+), 73 deletions(-) diff --git a/nexus/execution/compliance.py b/nexus/execution/compliance.py index 0e3e5d6..e6149c5 100644 --- a/nexus/execution/compliance.py +++ b/nexus/execution/compliance.py @@ -1,78 +1,196 @@ -"""Phase 7 Part B kill-switch facade. +"""Phase 7B / Phase 10 Session 4 kill-switch facade. -The four primitive checks are stubs in this task (returning ``True``) so -the live-trading client (``alpaca_client.py``) can wire to a stable surface. -Their bodies land in Task 4: +Four primitive checks guard live order submission: - check_drawdown_breach — portfolio NAV drawdown vs. a configured floor - check_position_loss — single-name P/L vs. a per-position stop - check_fsi_breach — Funding Stress Index outside its acceptable band - check_edgar_rate_limit — EDGAR poll-rate or event-burst guard + check_drawdown_breach — portfolio NAV drawdown vs. -15% floor + check_position_loss — single-name P/L vs. -25% per-position stop + check_fsi_breach — Funding Stress Index vs. 2.0 emergency brake + check_edgar_rate_limit — EDGAR poll-rate guard via Redis counter -``check_kill_switches`` ANDs the four. Order matters: drawdown first +``check_kill_switches`` ANDs all four. Order matters: drawdown first (cheapest, account-wide), then position, then FSI, then EDGAR — short-circuits on the first failure so an unhealthy book is not asked to hit external services to confirm what we already know. -The earlier Phase 6 placeholder (``PreTradeComplianceError`` / -``pre_trade_compliance_check``) was a dead stub and is removed here; its -intended responsibilities (Reg SHO locate, LULD halts, HTB rate economics) -will be picked up alongside short-side support, which is not in Phase 7 scope. +When called with no arguments (as in alpaca_client.py), the orchestrator +fetches portfolio NAV and FSI from the database internally. Infra failures +in the fetch helpers return safe defaults so monitoring outages cannot halt +trading. """ from __future__ import annotations +import logging + import polars as pl +import psycopg2 +import redis + +from nexus.config.settings import settings + +logger = logging.getLogger(__name__) def check_drawdown_breach(portfolio_nav_df: pl.DataFrame) -> bool: - """Return ``True`` while drawdown is within tolerance. Task 4 will compute - peak-to-trough drawdown from ``portfolio_nav_df`` and compare to a - configured floor (likely -25% on the live paper book).""" - del portfolio_nav_df - return True + """Return True while portfolio drawdown is within the -15% floor. + + Computes current drawdown as (current_nav - peak_nav) / peak_nav where + peak_nav is the all-time maximum NAV in the series. Returns False (halt) + if drawdown < -0.15. An empty series is treated as safe. + + -15% is the account-level emergency brake. It is hardcoded, not + configurable, to prevent accidental loosening under operational pressure. + """ + if portfolio_nav_df.is_empty(): + return True + nav = portfolio_nav_df["nav"] + peak_nav = nav.max() + if peak_nav is None or float(peak_nav) == 0.0: + return True + current_nav = float(nav[-1]) + drawdown = (current_nav - float(peak_nav)) / float(peak_nav) + return drawdown >= -0.15 def check_position_loss(ticker: str, current_price: float, entry_price: float) -> bool: - """Return ``True`` while a single name's loss is within per-position stop. - Task 4 will compare ``(current_price - entry_price) / entry_price`` to a - per-name stop (likely -15%).""" - del ticker, current_price, entry_price - return True + """Return True while single-name loss is within the -25% stop. + + Returns False (halt this position) if (current - entry) / entry < -0.25. + A zero entry_price is treated as safe to avoid division errors on + uninitialized positions. + + -25% is the per-name stop. Hardcoded to prevent accidental widening. + """ + if entry_price == 0.0: + return True + loss = (current_price - entry_price) / entry_price + return loss >= -0.25 def check_fsi_breach(fsi_value: float) -> bool: - """Return ``True`` while the Funding Stress Index is within band. - Task 4 will compare against a configured upper threshold derived from - the Phase 2 FSI calibration.""" - del fsi_value - return True + """Return True while FSI is below the systemic-stress threshold. + Returns False (halt all trading) if fsi_value > 2.0. The threshold of 2.0 + corresponds to severe systemic stress — above the 2022 tightening cycle + (~1.8) but below COVID peak (~5.0) and GFC peak (~3.5). This is an + emergency brake, not a regime filter. -def check_edgar_rate_limit() -> bool: - """Return ``True`` while EDGAR polling is within the agreed 0.11s/req - budget and no event-burst circuit-breaker has tripped. Task 4 will read - the current rate counters from Redis.""" - return True + 2.0 is hardcoded. The Phase 2 calibration owns the threshold value. + """ + return fsi_value <= 2.0 -def check_kill_switches() -> bool: - """Master kill switch. Returns ``True`` only when every individual check - returns ``True``. Short-circuits on the first failure so a tripped - drawdown does not need to also pay for FSI / EDGAR lookups. +def check_edgar_rate_limit() -> bool: + """Return True while EDGAR request rate is below 80% of the 10 req/sec limit. - The argument shapes for the individual checks differ — at this stub - stage we invoke each with placeholder zero-cost inputs. Task 4 will - re-shape this facade to take a single ``ComplianceContext`` so the - real inputs (NAV history, positions, latest FSI, ...) are routed in. + Reads the key ``edgar:request_count_1s`` from Redis. This counter is + incremented by the EDGAR client on each request. If the key is absent + (client does not yet write it) or Redis is unreachable, returns True — + monitoring infrastructure failure must not halt trading. """ - if not check_drawdown_breach( - pl.DataFrame({"date": [], "nav": []}, schema={"date": pl.Date, "nav": pl.Float64}) - ): + try: + r = redis.Redis( + host=settings.redis_host, + port=settings.redis_port, + socket_connect_timeout=1, + socket_timeout=1, + ) + count = r.get("edgar:request_count_1s") + if count is None: + return True + return int(count) < 8 # 80% of 10 req/sec; leaves headroom + except Exception: + logger.warning("check_edgar_rate_limit: Redis unreachable — allowing trade") + return True + + +def _fetch_portfolio_nav() -> pl.DataFrame: + """Query portfolio_nav for the full date/nav series. Returns empty DataFrame on failure.""" + _empty = pl.DataFrame( + {"date": [], "nav": []}, schema={"date": pl.Date, "nav": pl.Float64} + ) + try: + conn = psycopg2.connect(settings.database_url_sync) + with conn, conn.cursor() as cur: + cur.execute("SELECT date, nav FROM portfolio_nav ORDER BY date") + rows = cur.fetchall() + conn.close() + if not rows: + return _empty + return pl.DataFrame( + {"date": [r[0] for r in rows], "nav": [float(r[1]) for r in rows]}, + schema={"date": pl.Date, "nav": pl.Float64}, + ) + except Exception: + logger.warning("_fetch_portfolio_nav: DB unreachable — returning empty series") + return _empty + + +def _fetch_latest_fsi() -> float | None: + """Query fsi_history for the most recent FSI value. Returns None on failure.""" + try: + conn = psycopg2.connect(settings.database_url_sync) + with conn, conn.cursor() as cur: + cur.execute( + "SELECT fsi_value FROM fsi_history ORDER BY snapshot_date DESC LIMIT 1" + ) + row = cur.fetchone() + conn.close() + return float(row[0]) if row else None + except Exception: + logger.warning("_fetch_latest_fsi: DB unreachable — skipping FSI check") + return None + + +def check_kill_switches( + portfolio_nav_df: pl.DataFrame | None = None, + fsi_value: float | None = None, + ticker: str | None = None, + current_price: float | None = None, + entry_price: float | None = None, +) -> bool: + """Master kill switch. Returns True only when every applicable check passes. + + All parameters are optional. When called with no arguments (as in + alpaca_client.py), portfolio NAV and FSI are fetched from the database. + Position-level checks are skipped when ticker/prices are not provided. + + Order: drawdown (cheapest, account-wide) → position → FSI → EDGAR. + Short-circuits on first failure — a breached book does not need to hit + Redis to confirm what we already know. + """ + if portfolio_nav_df is None: + portfolio_nav_df = _fetch_portfolio_nav() + + if fsi_value is None: + fetched = _fetch_latest_fsi() + # DB unavailable → use 0.0 (CALM regime) as safe default so the FSI + # check still runs and can be exercised by monkeypatching in tests. + fsi_value = fetched if fetched is not None else 0.0 + + if not check_drawdown_breach(portfolio_nav_df): + logger.warning("kill-switch tripped: drawdown breach") return False - if not check_position_loss("", 0.0, 0.0): + + if ( + ticker is not None + and current_price is not None + and entry_price is not None + and not check_position_loss(ticker, current_price, entry_price) + ): + logger.warning( + "kill-switch tripped: position loss on %s (entry=%.2f current=%.2f)", + ticker, + entry_price, + current_price, + ) return False - if not check_fsi_breach(0.0): + + if not check_fsi_breach(fsi_value): + logger.warning("kill-switch tripped: FSI breach (fsi=%.3f)", fsi_value) return False + if not check_edgar_rate_limit(): + logger.warning("kill-switch tripped: EDGAR rate limit") return False + return True diff --git a/tests/test_compliance.py b/tests/test_compliance.py index 7efbf22..984f88c 100644 --- a/tests/test_compliance.py +++ b/tests/test_compliance.py @@ -1,15 +1,12 @@ -"""Unit tests for the Phase 7 Part B kill-switch facade. - -The four individual checks are stubs in Task 2 and will be implemented in -Task 4. These tests lock the *contract* — the function names, signatures -and the AND-semantics of ``check_kill_switches`` — before the bodies are -written, so the trading client can depend on the surface in this task -without coupling to the eventual policy. -""" +"""Unit tests for the Phase 7 Part B / Phase 10 Session 4 kill-switch facade.""" from __future__ import annotations -import pytest +import logging +from datetime import date +from unittest.mock import patch + import polars as pl +import pytest import nexus.execution.compliance as compliance from nexus.execution.compliance import ( @@ -20,49 +17,60 @@ check_position_loss, ) +# --------------------------------------------------------------------------- +# Primitive checks — contract / signature tests (kept from Phase 7B) +# --------------------------------------------------------------------------- def test_drawdown_stub_returns_true(): + # Empty series → no breach possible → safe nav = pl.DataFrame({"date": [], "nav": []}, schema={"date": pl.Date, "nav": pl.Float64}) assert check_drawdown_breach(nav) is True def test_position_loss_stub_returns_true(): + # 80/100 = -20% loss, within the -25% stop → safe assert check_position_loss("NVDA", current_price=80.0, entry_price=100.0) is True -def test_fsi_breach_stub_returns_true(): - assert check_fsi_breach(2.5) is True - - def test_edgar_rate_limit_stub_returns_true(): + # Key absent or Redis unreachable → safe (conservative default) assert check_edgar_rate_limit() is True +# --------------------------------------------------------------------------- +# Orchestrator — AND semantics and short-circuit +# --------------------------------------------------------------------------- + def test_check_kill_switches_true_when_all_pass(): - assert check_kill_switches() is True + nav_df = pl.DataFrame( + {"date": [date(2025, 1, 1)], "nav": [100.0]}, + schema={"date": pl.Date, "nav": pl.Float64}, + ) + assert check_kill_switches(portfolio_nav_df=nav_df, fsi_value=0.5) is True @pytest.mark.parametrize( - "patched", + "patched,extra_kwargs", [ - "check_drawdown_breach", - "check_position_loss", - "check_fsi_breach", - "check_edgar_rate_limit", + ("check_drawdown_breach", {}), + # position check only runs when ticker/prices are provided + ("check_position_loss", {"ticker": "NVDA", "current_price": 80.0, "entry_price": 100.0}), + ("check_fsi_breach", {}), + ("check_edgar_rate_limit", {}), ], ) -def test_check_kill_switches_false_when_any_check_fails(monkeypatch, patched): +def test_check_kill_switches_false_when_any_check_fails(monkeypatch, patched, extra_kwargs): """AND semantics: any single check returning False trips the master switch.""" + nav_df = pl.DataFrame( + {"date": [date(2025, 1, 1)], "nav": [100.0]}, + schema={"date": pl.Date, "nav": pl.Float64}, + ) monkeypatch.setattr(compliance, patched, lambda *a, **kw: False) - assert check_kill_switches() is False + assert check_kill_switches(portfolio_nav_df=nav_df, fsi_value=0.5, **extra_kwargs) is False def test_check_kill_switches_short_circuits_on_first_failure(monkeypatch): - """Once a check fails, downstream checks should not be called. - - Locking this in keeps the function cheap — a failing drawdown check - should not need to query Alpaca for position prices or hit FSI. - """ + """Once a check fails, downstream checks must not be called.""" call_log: list[str] = [] def make(name, value): @@ -71,10 +79,113 @@ def _check(*_a, **_kw): return value return _check + nav_df = pl.DataFrame( + {"date": [date(2025, 1, 1)], "nav": [100.0]}, + schema={"date": pl.Date, "nav": pl.Float64}, + ) + monkeypatch.setattr(compliance, "check_drawdown_breach", make("drawdown", False)) monkeypatch.setattr(compliance, "check_position_loss", make("position", True)) monkeypatch.setattr(compliance, "check_fsi_breach", make("fsi", True)) monkeypatch.setattr(compliance, "check_edgar_rate_limit", make("edgar", True)) - assert check_kill_switches() is False + assert check_kill_switches(portfolio_nav_df=nav_df, fsi_value=0.5) is False assert call_log == ["drawdown"] + + +# --------------------------------------------------------------------------- +# check_drawdown_breach — real logic +# --------------------------------------------------------------------------- + +def test_drawdown_breach_fires_at_15_pct(): + # peak=110, current=92.4 → (92.4-110)/110 ≈ -16% → halt + nav_df = pl.DataFrame( + { + "date": [date(2025, 1, d) for d in range(1, 6)], + "nav": [100.0, 105.0, 110.0, 100.0, 92.4], + }, + schema={"date": pl.Date, "nav": pl.Float64}, + ) + assert check_drawdown_breach(nav_df) is False + + +def test_drawdown_safe_at_14_pct(): + # peak=110, current=94.6 → (94.6-110)/110 ≈ -14% → safe + nav_df = pl.DataFrame( + { + "date": [date(2025, 1, d) for d in range(1, 6)], + "nav": [100.0, 105.0, 110.0, 100.0, 94.6], + }, + schema={"date": pl.Date, "nav": pl.Float64}, + ) + assert check_drawdown_breach(nav_df) is True + + +def test_drawdown_safe_with_recovery(): + # dropped 20% to 80, recovered to 90 → current drawdown = (90-100)/100 = -10% → safe + nav_df = pl.DataFrame( + { + "date": [date(2025, 1, 1), date(2025, 1, 2), date(2025, 1, 3)], + "nav": [100.0, 80.0, 90.0], + }, + schema={"date": pl.Date, "nav": pl.Float64}, + ) + assert check_drawdown_breach(nav_df) is True + + +# --------------------------------------------------------------------------- +# check_position_loss — real logic +# --------------------------------------------------------------------------- + +def test_position_loss_fires_at_26_pct(): + # entry=100, current=73 → (73-100)/100 = -27% → halt + assert check_position_loss("NVDA", current_price=73.0, entry_price=100.0) is False + + +def test_position_loss_safe_at_24_pct(): + # entry=100, current=76 → (76-100)/100 = -24% → safe + assert check_position_loss("NVDA", current_price=76.0, entry_price=100.0) is True + + +# --------------------------------------------------------------------------- +# check_fsi_breach — real logic +# --------------------------------------------------------------------------- + +def test_fsi_breach_fires_above_2(): + assert check_fsi_breach(2.1) is False + + +def test_fsi_safe_below_2(): + assert check_fsi_breach(1.9) is True + + +# --------------------------------------------------------------------------- +# check_edgar_rate_limit — Redis interaction +# --------------------------------------------------------------------------- + +def test_edgar_rate_limit_safe_when_redis_unreachable(): + with patch("nexus.execution.compliance.redis.Redis") as mock_cls: + mock_cls.return_value.get.side_effect = ConnectionError("redis down") + assert check_edgar_rate_limit() is True + + +def test_edgar_rate_limit_fires_at_8_requests(): + with patch("nexus.execution.compliance.redis.Redis") as mock_cls: + mock_cls.return_value.get.return_value = b"8" + assert check_edgar_rate_limit() is False + + +# --------------------------------------------------------------------------- +# check_kill_switches — logging on failure +# --------------------------------------------------------------------------- + +def test_check_kill_switches_halts_on_any_failure(caplog: pytest.LogCaptureFixture): + nav_df = pl.DataFrame( + {"date": [date(2025, 1, 1)], "nav": [100.0]}, + schema={"date": pl.Date, "nav": pl.Float64}, + ) + # Drawdown safe, FSI above threshold (2.5 > 2.0) → should halt and log "fsi" + with caplog.at_level(logging.WARNING, logger="nexus.execution.compliance"): + result = check_kill_switches(portfolio_nav_df=nav_df, fsi_value=2.5) + assert result is False + assert "fsi" in caplog.text.lower()