diff --git a/deployment_scripts/refresh_hsi_constituents.py b/deployment_scripts/refresh_hsi_constituents.py index 027e48b..29e33bb 100644 --- a/deployment_scripts/refresh_hsi_constituents.py +++ b/deployment_scripts/refresh_hsi_constituents.py @@ -1,132 +1,26 @@ #!/usr/bin/env python3 -"""Refresh the HSI constituents CSV from Wikipedia. +"""Compatibility wrapper for refreshing HSI constituents. -Run this script periodically (e.g. monthly) to keep -``data/hsi_constituents.csv`` up to date with current index membership. - -Usage:: - - python deployment_scripts/refresh_hsi_constituents.py - -Requires ``pandas`` and ``requests`` (both listed in requirements.txt). -The script extracts the "Constituents of Hang Seng Index" table and writes -the standard screener columns used by the app: - -* ``symbol``: yfinance-compatible HK ticker (e.g. ``0700.HK``) -* ``display_symbol``: zero-padded HK code for UI display (e.g. ``0700``) -* ``security``: company name -* ``sector``: sub-index bucket from source table (best available grouping) -* ``sub_industry``: left blank when not available in source +The maintained implementation now lives in ``web.maintenance`` so refreshes are +validated, backed up, cache-invalidated, and reported consistently. """ -import csv -import logging -import re -import sys -from io import StringIO from pathlib import Path +import json +import sys -logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") - - -def _normalise_hk_ticker(raw: str) -> tuple[str, str] | None: - """Convert a source ticker cell into ``(symbol, display_symbol)``. - - Examples: - * "700" -> ("0700.HK", "0700") - * "0700" -> ("0700.HK", "0700") - * "0700.HK" -> ("0700.HK", "0700") - """ - text = str(raw).strip().upper() - if not text: - return None - - # Keep only the leading numeric code if extra text appears in cell. - m = re.search(r"(\d{1,5})", text) - if not m: - return None - - code = m.group(1).zfill(4) - return f"{code}.HK", code - - -def _load_source_table() -> "object": - """Load the HSI constituents table from Wikipedia into a DataFrame.""" - try: - import pandas as pd - import requests - except ImportError as exc: - raise RuntimeError("pandas and requests are required: pip install pandas requests") from exc - - url = "https://en.wikipedia.org/wiki/Hang_Seng_Index" - logger.info("Fetching HSI constituent table from Wikipedia: %s", url) - - resp = requests.get(url, headers={"User-Agent": "Mozilla/5.0"}, timeout=30) - resp.raise_for_status() - - tables = pd.read_html(StringIO(resp.text)) - for df in tables: - cols = {str(c).strip().lower(): c for c in df.columns} - if "ticker" in cols and "name" in cols: - ticker_col = cols["ticker"] - name_col = cols["name"] - sub_index_col = cols.get("sub-index") - - out = pd.DataFrame() - out["raw_ticker"] = df[ticker_col] - out["security"] = df[name_col] - out["sector"] = df[sub_index_col] if sub_index_col is not None else "" - return out +# Allow direct execution via ``python deployment_scripts/refresh_hsi_constituents.py`` +# without requiring the package to be installed first. +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) - raise RuntimeError("Could not find expected HSI constituents table (Ticker/Name columns)") +from web.maintenance.runner import MaintenanceRunner # noqa: E402 +from web.maintenance.tasks import STATUS_FAILED # noqa: E402 def main() -> int: - """Fetch HSI constituents and write data/hsi_constituents.csv.""" - try: - import pandas as pd - except ImportError: - logger.error("pandas is required: pip install pandas") - return 1 - - repo_root = Path(__file__).resolve().parent.parent - output_path = repo_root / "data" / "hsi_constituents.csv" - - try: - source_df = _load_source_table() - except Exception as exc: - logger.error("Failed to fetch HSI constituents: %s", exc) - return 1 - - rows = [] - for _, rec in source_df.iterrows(): - normalised = _normalise_hk_ticker(rec.get("raw_ticker")) - if normalised is None: - continue - symbol, display_symbol = normalised - security = str(rec.get("security") or "").strip() - sector = str(rec.get("sector") or "").strip() - rows.append( - { - "symbol": symbol, - "display_symbol": display_symbol, - "security": security, - "sector": sector, - "sub_industry": "", - } - ) - - if not rows: - logger.error("No constituents parsed from source table") - return 1 - - df = pd.DataFrame(rows) - df = df.drop_duplicates(subset=["symbol"]).sort_values(by=["display_symbol"]).reset_index(drop=True) - df.to_csv(output_path, index=False, quoting=csv.QUOTE_ALL) - - logger.info("Written %d HSI constituents to %s", len(df), output_path) - return 0 + report = MaintenanceRunner().run(tasks=["hsi_constituents"], dry_run=False) + print(json.dumps(report.to_dict(), indent=2, sort_keys=True)) + return 1 if report.status == STATUS_FAILED else 0 if __name__ == "__main__": diff --git a/docs/MAINTENANCE.md b/docs/MAINTENANCE.md new file mode 100644 index 0000000..1b91eb7 --- /dev/null +++ b/docs/MAINTENANCE.md @@ -0,0 +1,94 @@ +# System Maintenance: Market Metadata Refresh & Data Hygiene + +TWS Robot includes a metadata-only System Maintenance workflow for keeping index universes and market-event context fresh without touching trading execution paths. + +## What it maintains + +- S&P 500, STI, and HSI constituent CSV files +- Market events through the existing market-events service +- Validation reports and backups + +## Safety boundaries + +The maintenance workflow must not place orders, change strategy behavior, start/stop strategies, modify autonomous trading configuration, or bypass emergency-stop controls. + +Allowed writes are limited to: + +- `data/*_constituents.csv` +- `data/backups/constituents/...` +- `reports/maintenance/...` +- Existing market-event rows through `data.market_events` + +## Web console + +Open: + +```text +/maintenance +``` + +Available actions: + +- **Dry Run All** — fetches proposed metadata and writes reports without replacing files +- **Dry Run Constituents** — previews S&P 500/STI/HSI constituent changes +- **Apply Constituents Refresh** — backs up and replaces constituent files only after validation passes +- **Refresh Market Events** — calls the existing event service for portfolio/strategy symbols +- **Validate Metadata Only** — validates current local metadata files + +## CLI + +Dry-run is the safe default unless `--apply` is passed. + +```bash +python -m web.maintenance run --dry-run +python -m web.maintenance run --task sp500_constituents --dry-run +python -m web.maintenance run --task hsi_constituents --apply +python -m web.maintenance run --task market_events --apply --symbol AAPL --symbol MSFT +python -m web.maintenance validate +``` + +Legacy wrappers remain available: + +```bash +python scripts/refresh_sp500_constituents.py +python deployment_scripts/refresh_hsi_constituents.py +``` + +## Validation rules + +Constituent refreshes are rejected before file replacement when: + +- Required columns are missing +- Row count is below the configured market threshold +- Symbols are blank or duplicated +- Symbol format does not match market-specific rules +- Count change is greater than 25%, unless explicitly allowed + +A warning is recorded when count change is greater than 10%. + +Default minimum counts: + +| Universe | Minimum rows | +| --- | ---: | +| S&P 500 | 450 | +| STI | 25 | +| HSI | 70 | + +## Reports and backups + +Each run writes both JSON and Markdown reports: + +```text +reports/maintenance/maintenance_*.json +reports/maintenance/maintenance_*.md +``` + +Apply mode creates timestamped backups before replacing any existing constituent file: + +```text +data/backups/constituents/YYYYMMDD_HHMMSS/.csv +``` + +## Recommended cadence + +Run manually every 2–3 days, or daily if desired, preferably outside active market hours. Because this is metadata-only, it is designed not to interfere with paper/live trading paths, but off-peak operation is still cleaner. diff --git a/scripts/refresh_sp500_constituents.py b/scripts/refresh_sp500_constituents.py index 934241c..e5ea835 100644 --- a/scripts/refresh_sp500_constituents.py +++ b/scripts/refresh_sp500_constituents.py @@ -1,58 +1,26 @@ #!/usr/bin/env python3 -"""Refresh the S&P 500 constituents CSV from Wikipedia. +"""Compatibility wrapper for refreshing S&P 500 constituents. -Run this script periodically (e.g. monthly) to keep -``data/sp500_constituents.csv`` up to date with current index membership. - -Usage:: - - python scripts/refresh_sp500_constituents.py - -Requires ``pandas`` and ``lxml`` (both listed in requirements.txt). -Tickers are normalised for yfinance compatibility: dots replaced with hyphens -(e.g. ``BRK.B`` → ``BRK-B``). +The maintained implementation now lives in ``web.maintenance`` so refreshes are +validated, backed up, cache-invalidated, and reported consistently. """ -import csv -import logging -import sys from pathlib import Path +import json +import sys + +# Allow direct execution via ``python scripts/refresh_sp500_constituents.py`` +# without requiring the package to be installed first. +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) -logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") +from web.maintenance.runner import MaintenanceRunner # noqa: E402 +from web.maintenance.tasks import STATUS_FAILED # noqa: E402 def main() -> int: - """Fetch the S&P 500 constituent list and write it to data/sp500_constituents.csv.""" - try: - import pandas as pd - except ImportError: - logger.error("pandas is required: pip install pandas") - return 1 - - repo_root = Path(__file__).resolve().parent.parent - output_path = repo_root / "data" / "sp500_constituents.csv" - - logger.info("Fetching S&P 500 constituent table from Wikipedia…") - url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies" - try: - tables = pd.read_html(url) - except Exception as exc: - logger.error("Failed to fetch Wikipedia table: %s", exc) - return 1 - - df = tables[0][["Symbol", "Security", "GICS Sector", "GICS Sub-Industry"]] - df.columns = ["symbol", "security", "sector", "sub_industry"] - - # Normalise tickers for yfinance compatibility - df["symbol"] = df["symbol"].str.replace(".", "-", regex=False) - - # Remove duplicates (same symbol appearing twice due to share classes) - df = df.drop_duplicates(subset=["symbol"]) - - df.to_csv(output_path, index=False, quoting=csv.QUOTE_ALL) - logger.info("Written %d constituents to %s", len(df), output_path) - return 0 + report = MaintenanceRunner().run(tasks=["sp500_constituents"], dry_run=False) + print(json.dumps(report.to_dict(), indent=2, sort_keys=True)) + return 1 if report.status == STATUS_FAILED else 0 if __name__ == "__main__": diff --git a/tests/test_maintenance.py b/tests/test_maintenance.py new file mode 100644 index 0000000..72f497d --- /dev/null +++ b/tests/test_maintenance.py @@ -0,0 +1,154 @@ +"""Tests for the System Maintenance metadata refresh module.""" + +from __future__ import annotations + +import csv +from pathlib import Path + +import pytest + +from web import create_app +from web.maintenance.runner import MaintenanceRunner +from web.maintenance.validators import validate_constituent_rows + + +@pytest.fixture +def app(monkeypatch): + monkeypatch.setattr( + "web.services.ServiceManager._start_market_events_refresh", + lambda self: None, + ) + monkeypatch.setattr("web.routes.api_connection.is_accepted", lambda: True) + return create_app({"TESTING": True, "LOGIN_DISABLED": True, "WTF_CSRF_ENABLED": False}) + + +@pytest.fixture +def client(app): + return app.test_client() + + +def test_validate_constituents_rejects_duplicates(): + rows = _sp500_rows(451) + rows.append(dict(rows[0])) + + result = validate_constituent_rows(rows, market="sp500", before_count=451) + + assert not result.passed + assert any("Duplicate symbols" in err for err in result.errors) + + +def test_validate_constituents_warns_on_large_but_allowed_change(): + result = validate_constituent_rows(_sp500_rows(510), market="sp500", before_count=450) + + assert result.passed + assert any("Large count change" in warn for warn in result.warnings) + + +def test_dry_run_preserves_existing_file_and_writes_report(tmp_path): + current = _sp500_rows(450) + proposed = _sp500_rows(451) + _write_constituents(tmp_path / "data" / "sp500_constituents.csv", current) + runner = _runner(tmp_path, {"sp500_constituents": lambda: proposed}) + + report = runner.run(tasks=["sp500_constituents"], dry_run=True) + + assert report.status in {"success", "warning"} + assert _read_symbols(tmp_path / "data" / "sp500_constituents.csv") == [r["symbol"] for r in current] + assert report.report_json_path is not None + assert Path(report.report_json_path).exists() + assert report.results[0].detail["would_write"].endswith("sp500_constituents.csv") + + +def test_apply_creates_backup_and_replaces_file(tmp_path, monkeypatch): + current = _sp500_rows(450) + proposed = _sp500_rows(451) + output_path = tmp_path / "data" / "sp500_constituents.csv" + _write_constituents(output_path, current) + _disable_cache_invalidation(monkeypatch) + runner = _runner(tmp_path, {"sp500_constituents": lambda: proposed}) + + report = runner.run(tasks=["sp500_constituents"], dry_run=False) + + assert report.status in {"success", "warning"} + assert _read_symbols(output_path) == [r["symbol"] for r in proposed] + backups = list((tmp_path / "data" / "backups" / "constituents").glob("*/sp500_constituents.csv")) + assert backups, "Expected timestamped backup before apply replacement" + assert report.results[0].detail["cache_invalidated"] is True + + +def test_validation_failure_preserves_existing_file(tmp_path, monkeypatch): + current = _sp500_rows(450) + invalid = _sp500_rows(1) + output_path = tmp_path / "data" / "sp500_constituents.csv" + _write_constituents(output_path, current) + _disable_cache_invalidation(monkeypatch) + runner = _runner(tmp_path, {"sp500_constituents": lambda: invalid}) + + report = runner.run(tasks=["sp500_constituents"], dry_run=False) + + assert report.status == "failed" + assert _read_symbols(output_path) == [r["symbol"] for r in current] + assert not (tmp_path / "data" / "backups" / "constituents").exists() + + +def test_market_events_dry_run_is_metadata_only(tmp_path): + runner = _runner(tmp_path, {}) + + report = runner.run(tasks=["market_events"], dry_run=True) + + assert report.results[0].status == "skipped" + assert "Dry-run does not update market-event DB rows" in report.results[0].warnings[0] + + +def test_maintenance_page_loads(client): + response = client.get("/maintenance") + + assert response.status_code == 200 + assert b"System Maintenance" in response.data + + +def test_maintenance_status_api_loads(client): + response = client.get("/api/maintenance/status") + + assert response.status_code == 200 + data = response.get_json() + assert "constituents" in data + + +def _runner(tmp_path, source_loaders): + return MaintenanceRunner( + repo_root=tmp_path, + source_loaders=source_loaders, + report_dir=tmp_path / "reports" / "maintenance", + ) + + +def _disable_cache_invalidation(monkeypatch): + def fake_invalidate(self, task_name, result): + result.detail["cache_invalidated"] = True + monkeypatch.setattr(MaintenanceRunner, "_invalidate_screener_cache", fake_invalidate) + + +def _sp500_rows(count): + return [ + { + "symbol": f"T{i:03d}", + "security": f"Test Company {i}", + "sector": "Technology", + "sub_industry": "Software", + } + for i in range(count) + ] + + +def _write_constituents(path, rows): + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", newline="", encoding="utf-8") as fh: + writer = csv.DictWriter(fh, fieldnames=["symbol", "security", "sector", "sub_industry"]) + writer.writeheader() + writer.writerows(rows) + + +def _read_symbols(path): + with path.open(newline="", encoding="utf-8") as fh: + return [row["symbol"] for row in csv.DictReader(fh)] diff --git a/web/__init__.py b/web/__init__.py index 6eeabcf..d8133b0 100644 --- a/web/__init__.py +++ b/web/__init__.py @@ -95,6 +95,7 @@ def inject_status_bar(): from web.routes.sti_screener import bp as sti_screener_bp from web.routes.hsi_screener import bp as hsi_screener_bp from web.routes.autonomous_trading import bp as autonomous_trading_bp + from web.routes.maintenance import bp as maintenance_bp app.register_blueprint(dashboard_bp) app.register_blueprint(strategies_bp) @@ -112,6 +113,7 @@ def inject_status_bar(): app.register_blueprint(sti_screener_bp) app.register_blueprint(hsi_screener_bp) app.register_blueprint(autonomous_trading_bp) + app.register_blueprint(maintenance_bp) # ---- JSON API blueprints ---- # Session-authenticated API requests remain CSRF-protected and the web @@ -139,6 +141,7 @@ def inject_status_bar(): from web.routes.api_autonomous import bp as api_autonomous_bp from web.routes.api_trading_readiness import bp as api_trading_readiness_bp from web.routes.api_autonomous_evidence import bp as api_autonomous_evidence_bp + from web.routes.api_maintenance import bp as api_maintenance_bp # Patch the default autonomous market provider so the existing SPY gate # receives VIX values as an additional regime/sizing safeguard. Operator @@ -154,6 +157,7 @@ def inject_status_bar(): api_market_events_bp, api_stock_analysis_bp, api_sp500_screener_bp, api_sti_screener_bp, api_hsi_screener_bp, api_autonomous_bp, api_trading_readiness_bp, api_autonomous_evidence_bp, + api_maintenance_bp, ] for api_bp in api_blueprints: app.register_blueprint(api_bp) diff --git a/web/maintenance/__init__.py b/web/maintenance/__init__.py new file mode 100644 index 0000000..2d7c838 --- /dev/null +++ b/web/maintenance/__init__.py @@ -0,0 +1,3 @@ +"""System maintenance package for market metadata refresh and hygiene.""" + +__all__ = [] diff --git a/web/maintenance/__main__.py b/web/maintenance/__main__.py new file mode 100644 index 0000000..a7b7248 --- /dev/null +++ b/web/maintenance/__main__.py @@ -0,0 +1,64 @@ +"""CLI entrypoint for system maintenance. + +Usage examples: + + python -m web.maintenance run --dry-run + python -m web.maintenance run --task hsi_constituents --apply + python -m web.maintenance validate +""" + +from __future__ import annotations + +import argparse +import json +import sys + +from web.maintenance.runner import MaintenanceRunner +from web.maintenance.tasks import STATUS_FAILED + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(prog="python -m web.maintenance") + subparsers = parser.add_subparsers(dest="command", required=True) + + run_parser = subparsers.add_parser("run", help="Run maintenance tasks") + run_parser.add_argument("--task", action="append", dest="tasks", help="Task to run; can be repeated") + run_parser.add_argument("--dry-run", action="store_true", default=False, help="Preview changes without writes") + run_parser.add_argument("--apply", action="store_true", default=False, help="Apply validated changes") + run_parser.add_argument("--symbol", action="append", dest="symbols", help="Event symbol to refresh; can be repeated") + run_parser.add_argument("--days", type=int, default=28, help="Market-events days ahead") + run_parser.add_argument("--allow-large-change", action="store_true", help="Allow >25%% constituent count change") + + validate_parser = subparsers.add_parser("validate", help="Validate local metadata files only") + validate_parser.add_argument("--json", action="store_true", dest="json_output", help="Print full JSON report") + + args = parser.parse_args(argv) + runner = MaintenanceRunner() + + if args.command == "validate": + report = runner.run(tasks=["metadata_validation"], dry_run=True) + if args.json_output: + print(json.dumps(report.to_dict(), indent=2, sort_keys=True)) + else: + print(f"{report.report_id}: {report.status}") + return 1 if report.status == STATUS_FAILED else 0 + + dry_run = True + if args.apply: + dry_run = False + elif args.dry_run: + dry_run = True + + report = runner.run( + tasks=args.tasks, + dry_run=dry_run, + event_symbols=args.symbols, + days_ahead=args.days, + allow_large_change=args.allow_large_change, + ) + print(json.dumps(report.to_dict(), indent=2, sort_keys=True)) + return 1 if report.status == STATUS_FAILED else 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/web/maintenance/reports.py b/web/maintenance/reports.py new file mode 100644 index 0000000..1084e8a --- /dev/null +++ b/web/maintenance/reports.py @@ -0,0 +1,117 @@ +"""JSON and Markdown report helpers for maintenance runs.""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any, Dict, List, Optional + +from web.maintenance.tasks import MaintenanceRunReport + + +DEFAULT_REPORT_DIR = Path("reports") / "maintenance" + + +def write_report(report: MaintenanceRunReport, report_dir: Path) -> tuple[Path, Path]: + """Write JSON and Markdown reports and return their paths.""" + report_dir.mkdir(parents=True, exist_ok=True) + json_path = report_dir / f"{report.report_id}.json" + md_path = report_dir / f"{report.report_id}.md" + + report.report_json_path = str(json_path) + report.report_markdown_path = str(md_path) + + json_path.write_text( + json.dumps(report.to_dict(), indent=2, sort_keys=True, default=str), + encoding="utf-8", + ) + md_path.write_text(render_markdown_report(report), encoding="utf-8") + return json_path, md_path + + +def render_markdown_report(report: MaintenanceRunReport) -> str: + """Render a concise human-readable Markdown summary.""" + lines = [ + f"# Maintenance Report: {report.report_id}", + "", + f"- Status: `{report.status}`", + f"- Dry run: `{report.dry_run}`", + f"- Started: `{report.started_at}`", + f"- Finished: `{report.finished_at}`", + f"- Duration: `{report.duration_seconds}s`", + "", + "## Task Summary", + "", + "| Task | Status | Before | After | Added | Removed | Warnings | Errors |", + "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: |", + ] + for result in report.results: + lines.append( + "| {task} | `{status}` | {before} | {after} | {added} | {removed} | {warnings} | {errors} |".format( + task=result.task, + status=result.status, + before=_fmt_count(result.before_count), + after=_fmt_count(result.after_count), + added=len(result.added), + removed=len(result.removed), + warnings=len(result.warnings), + errors=len(result.errors) + len(result.validation.errors), + ) + ) + + for result in report.results: + lines.extend(["", f"## {result.task}", ""]) + lines.append(f"- Source: `{result.source or 'n/a'}`") + lines.append(f"- Status: `{result.status}`") + if result.added: + lines.append("- Added: " + ", ".join(result.added[:50])) + if result.removed: + lines.append("- Removed: " + ", ".join(result.removed[:50])) + for warning in result.warnings: + lines.append(f"- Warning: {warning}") + for error in result.validation.errors + result.errors: + lines.append(f"- Error: {error}") + lines.append("") + return "\n".join(lines) + + +def list_reports(report_dir: Path, *, limit: int = 20) -> List[Dict[str, Any]]: + """Return recent report summaries from newest to oldest.""" + if not report_dir.exists(): + return [] + reports = [] + for path in sorted(report_dir.glob("maintenance_*.json"), reverse=True)[:limit]: + try: + payload = json.loads(path.read_text(encoding="utf-8")) + except Exception: + continue + reports.append({ + "report_id": payload.get("report_id") or path.stem, + "status": payload.get("status"), + "dry_run": payload.get("dry_run"), + "started_at": payload.get("started_at"), + "finished_at": payload.get("finished_at"), + "duration_seconds": payload.get("duration_seconds"), + "warnings_count": len(payload.get("warnings") or []), + "errors_count": len(payload.get("errors") or []), + "path": str(path), + }) + return reports + + +def read_report(report_dir: Path, report_id: str) -> Optional[Dict[str, Any]]: + """Read one JSON report by report id.""" + safe_id = "".join(ch for ch in report_id if ch.isalnum() or ch in {"_", "-"}) + if not safe_id: + return None + path = report_dir / f"{safe_id}.json" + if not path.exists(): + return None + try: + return json.loads(path.read_text(encoding="utf-8")) + except Exception: + return None + + +def _fmt_count(value: Any) -> str: + return "-" if value is None else str(value) diff --git a/web/maintenance/runner.py b/web/maintenance/runner.py new file mode 100644 index 0000000..e07e3ce --- /dev/null +++ b/web/maintenance/runner.py @@ -0,0 +1,371 @@ +"""System maintenance runner for market metadata refresh and hygiene. + +This module is deliberately metadata-only. It reads/writes constituent CSVs, +backup files, maintenance reports, and delegates market event refresh to the +existing market-events service. It must not import order placement or execution +adapters. +""" + +from __future__ import annotations + +import csv +import logging +import os +import shutil +import tempfile +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Callable, Dict, Iterable, List, Mapping, Optional, Sequence + +from web.maintenance import reports +from web.maintenance.tasks import ( + STATUS_FAILED, + STATUS_PARTIAL_FAILURE, + STATUS_SKIPPED, + STATUS_SUCCESS, + MaintenanceRunReport, + MaintenanceTaskResult, + ValidationResult, +) +from web.maintenance.validators import validate_constituent_rows + +logger = logging.getLogger(__name__) + +ConstituentLoader = Callable[[], List[Dict[str, str]]] + +_DEFAULT_TASKS = ("sp500_constituents", "sti_constituents", "hsi_constituents", "market_events") + +_CONSTITUENT_TASKS = { + "sp500_constituents": { + "market": "sp500", + "filename": "sp500_constituents.csv", + "source_module": "web.maintenance.sources.sp500", + "source_url_attr": "SOURCE_URL", + }, + "sti_constituents": { + "market": "sti", + "filename": "sti_constituents.csv", + "source_module": "web.maintenance.sources.sti", + "source_url_attr": "SOURCE_URL", + }, + "hsi_constituents": { + "market": "hsi", + "filename": "hsi_constituents.csv", + "source_module": "web.maintenance.sources.hsi", + "source_url_attr": "SOURCE_URL", + }, +} + + +class MaintenanceRunner: + """Run dry-run/apply maintenance tasks and write audit reports.""" + + def __init__( + self, + *, + repo_root: Optional[Path] = None, + source_loaders: Optional[Dict[str, ConstituentLoader]] = None, + report_dir: Optional[Path] = None, + ) -> None: + self.repo_root = repo_root or Path(__file__).resolve().parents[2] + self.data_dir = self.repo_root / "data" + self.backup_root = self.data_dir / "backups" / "constituents" + self.report_dir = report_dir or (self.repo_root / reports.DEFAULT_REPORT_DIR) + self._source_loaders = source_loaders or {} + + def run( + self, + *, + tasks: Optional[Sequence[str]] = None, + dry_run: bool = True, + event_symbols: Optional[Sequence[str]] = None, + days_ahead: int = 28, + allow_large_change: bool = False, + ) -> MaintenanceRunReport: + """Run selected maintenance tasks and write reports. + + Dry-run is the safe default. Apply mode is required to write refreshed + constituent files or update market-event DB rows. + """ + started = time.monotonic() + task_names = list(tasks or _DEFAULT_TASKS) + report = MaintenanceRunReport( + report_id=_new_report_id(), + dry_run=dry_run, + ) + + for task_name in task_names: + if task_name in _CONSTITUENT_TASKS: + result = self._run_constituent_task( + task_name, + dry_run=dry_run, + allow_large_change=allow_large_change, + ) + elif task_name == "market_events": + result = self._run_market_events_task( + dry_run=dry_run, + event_symbols=event_symbols, + days_ahead=days_ahead, + ) + elif task_name == "metadata_validation": + result = self._run_metadata_validation_task(dry_run=dry_run) + else: + task_started = time.monotonic() + result = MaintenanceTaskResult(task=task_name, status=STATUS_FAILED, dry_run=dry_run) + result.errors.append(f"Unknown maintenance task: {task_name}") + result.finish(started_monotonic=task_started, now_monotonic=time.monotonic()) + report.results.append(result) + + report.finalize(started_monotonic=started, now_monotonic=time.monotonic()) + reports.write_report(report, self.report_dir) + return report + + def get_status(self) -> Dict[str, object]: + """Return current maintenance status for the dashboard.""" + constituent_status = [] + for task_name, cfg in _CONSTITUENT_TASKS.items(): + path = self.data_dir / str(cfg["filename"]) + rows = _read_csv_rows(path) if path.exists() else [] + row_count = len(rows) + mtime = datetime.fromtimestamp(path.stat().st_mtime, timezone.utc).isoformat() if path.exists() else None + validation = validate_constituent_rows( + rows, + market=str(cfg["market"]), + before_count=row_count, + ) if path.exists() else ValidationResult(status=STATUS_FAILED, errors=["File not found"]) + constituent_status.append({ + "task": task_name, + "file": str(path), + "row_count": row_count, + "last_modified_at": mtime, + "validation": validation.to_dict(), + }) + return { + "constituents": constituent_status, + "reports": reports.list_reports(self.report_dir, limit=5), + } + + def list_reports(self, *, limit: int = 20) -> List[Dict[str, object]]: + return reports.list_reports(self.report_dir, limit=limit) + + def read_report(self, report_id: str) -> Optional[Dict[str, object]]: + return reports.read_report(self.report_dir, report_id) + + # ------------------------------------------------------------------ + # Task implementations + # ------------------------------------------------------------------ + + def _run_constituent_task( + self, + task_name: str, + *, + dry_run: bool, + allow_large_change: bool, + ) -> MaintenanceTaskResult: + started = time.monotonic() + cfg = _CONSTITUENT_TASKS[task_name] + result = MaintenanceTaskResult(task=task_name, dry_run=dry_run, source=self._source_url(task_name)) + output_path = self.data_dir / str(cfg["filename"]) + current_rows = _read_csv_rows(output_path) + result.before_count = len(current_rows) + + try: + proposed_rows = self._fetch_constituent_rows(task_name) + result.after_count = len(proposed_rows) + result.added, result.removed = _symbol_diff(current_rows, proposed_rows) + result.validation = validate_constituent_rows( + proposed_rows, + market=str(cfg["market"]), + before_count=result.before_count, + allow_large_change=allow_large_change, + ) + if not result.validation.passed: + result.status = STATUS_FAILED + result.errors.append("Validation failed; existing file preserved") + return result + if dry_run: + result.detail["would_write"] = str(output_path) + return result + + backup_path = self._backup_existing_file(output_path) + self._write_csv_atomically(output_path, proposed_rows) + result.detail["backup_path"] = str(backup_path) if backup_path else None + result.detail["written_path"] = str(output_path) + self._invalidate_screener_cache(task_name, result) + return result + except Exception as exc: + logger.error("Constituent task %s failed: %s", task_name, exc, exc_info=True) + result.status = STATUS_FAILED + result.errors.append(str(exc)) + return result + finally: + result.finish(started_monotonic=started, now_monotonic=time.monotonic()) + + def _run_metadata_validation_task(self, *, dry_run: bool) -> MaintenanceTaskResult: + started = time.monotonic() + result = MaintenanceTaskResult(task="metadata_validation", dry_run=dry_run, source="local-files") + try: + for task_name, cfg in _CONSTITUENT_TASKS.items(): + path = self.data_dir / str(cfg["filename"]) + rows = _read_csv_rows(path) + validation = validate_constituent_rows(rows, market=str(cfg["market"]), before_count=len(rows)) + detail_key = str(cfg["market"]) + result.detail[detail_key] = validation.to_dict() + if validation.errors: + result.errors.extend(f"{task_name}: {err}" for err in validation.errors) + if validation.warnings: + result.warnings.extend(f"{task_name}: {warn}" for warn in validation.warnings) + result.status = STATUS_FAILED if result.errors else STATUS_SUCCESS + return result + finally: + result.finish(started_monotonic=started, now_monotonic=time.monotonic()) + + def _run_market_events_task( + self, + *, + dry_run: bool, + event_symbols: Optional[Sequence[str]], + days_ahead: int, + ) -> MaintenanceTaskResult: + started = time.monotonic() + result = MaintenanceTaskResult(task="market_events", dry_run=dry_run, source="data.market_events") + symbols = sorted({str(s).upper() for s in (event_symbols or []) if s}) + result.detail["symbols"] = symbols + result.detail["days_ahead"] = days_ahead + try: + if dry_run: + result.status = STATUS_SKIPPED + result.warnings.append("Dry-run does not update market-event DB rows; run apply mode to refresh events") + return result + + from data.market_events import get_market_events_service + + svc = get_market_events_service() + summary = svc.refresh(portfolio_symbols=symbols, force=True, days_ahead=days_ahead) + result.detail["summary"] = summary + result.after_count = int(summary.get("total_upserted") or 0) + total_errors = int(summary.get("total_errors") or 0) + if summary.get("status") == "failed": + result.status = STATUS_FAILED + result.errors.append(str(summary.get("error") or "Market events refresh failed")) + elif summary.get("status") == "partial_failure" or total_errors: + result.status = STATUS_PARTIAL_FAILURE + result.warnings.append("One or more market-event providers failed; see provider_results") + return result + except Exception as exc: + logger.error("Market events task failed: %s", exc, exc_info=True) + result.status = STATUS_FAILED + result.errors.append(str(exc)) + return result + finally: + result.finish(started_monotonic=started, now_monotonic=time.monotonic()) + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + def _fetch_constituent_rows(self, task_name: str) -> List[Dict[str, str]]: + if task_name in self._source_loaders: + return _normalise_rows(self._source_loaders[task_name]()) + module = _import_module(str(_CONSTITUENT_TASKS[task_name]["source_module"])) + return _normalise_rows(module.fetch_constituents()) + + def _source_url(self, task_name: str) -> Optional[str]: + try: + module = _import_module(str(_CONSTITUENT_TASKS[task_name]["source_module"])) + return str(getattr(module, str(_CONSTITUENT_TASKS[task_name]["source_url_attr"]))) + except Exception: + return None + + def _backup_existing_file(self, output_path: Path) -> Optional[Path]: + if not output_path.exists(): + return None + timestamped_backup_dir = self.backup_root / datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f") + timestamped_backup_dir.mkdir(parents=True, exist_ok=True) + backup_path = timestamped_backup_dir / output_path.name + shutil.copy2(output_path, backup_path) + return backup_path + + def _write_csv_atomically(self, output_path: Path, rows: Sequence[Mapping[str, object]]) -> None: + output_path.parent.mkdir(parents=True, exist_ok=True) + fieldnames = _fieldnames(rows) + fd, tmp_name = tempfile.mkstemp(prefix=output_path.name + ".", suffix=".tmp", dir=str(output_path.parent)) + tmp_path = Path(tmp_name) + try: + with os.fdopen(fd, "w", newline="", encoding="utf-8") as fh: + writer = csv.DictWriter(fh, fieldnames=fieldnames) + writer.writeheader() + for row in rows: + writer.writerow({field: row.get(field, "") for field in fieldnames}) + os.replace(tmp_path, output_path) + finally: + if tmp_path.exists(): + tmp_path.unlink() + + def _invalidate_screener_cache(self, task_name: str, result: MaintenanceTaskResult) -> None: + try: + if task_name == "sp500_constituents": + from web.sp500_screener_service import sp500_screener_service + sp500_screener_service.invalidate_cache() + elif task_name == "sti_constituents": + from web.sti_screener_service import sti_screener_service + sti_screener_service.invalidate_cache() + elif task_name == "hsi_constituents": + from web.hsi_screener_service import hsi_screener_service + hsi_screener_service.invalidate_cache() + result.detail.setdefault("cache_invalidated", True) + except Exception as exc: + result.warnings.append(f"Cache invalidation failed: {exc}") + + +# ---------------------------------------------------------------------- +# Module helpers +# ---------------------------------------------------------------------- + + +def _new_report_id() -> str: + return "maintenance_" + datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f") + + +def _utc_now() -> datetime: + return datetime.now(timezone.utc) + + +def _read_csv_rows(path: Path) -> List[Dict[str, str]]: + if not path.exists(): + return [] + with path.open(newline="", encoding="utf-8") as fh: + return [dict(row) for row in csv.DictReader(fh)] + + +def _normalise_rows(rows: Iterable[Mapping[str, object]]) -> List[Dict[str, str]]: + normalized: List[Dict[str, str]] = [] + for row in rows: + item = {str(k): "" if v is None else str(v).strip() for k, v in row.items()} + if item.get("symbol"): + item["symbol"] = item["symbol"].upper() + for col in ("security", "sector", "sub_industry"): + item.setdefault(col, "") + normalized.append(item) + return normalized + + +def _fieldnames(rows: Sequence[Mapping[str, object]]) -> List[str]: + base = ["symbol"] + if any("display_symbol" in row for row in rows): + base.append("display_symbol") + base.extend(["security", "sector", "sub_industry"]) + extras = sorted({str(k) for row in rows for k in row.keys()} - set(base)) + return base + extras + + +def _symbol_diff(before: Sequence[Mapping[str, object]], after: Sequence[Mapping[str, object]]) -> tuple[List[str], List[str]]: + before_symbols = {str(row.get("symbol") or "").strip().upper() for row in before if row.get("symbol")} + after_symbols = {str(row.get("symbol") or "").strip().upper() for row in after if row.get("symbol")} + return sorted(after_symbols - before_symbols), sorted(before_symbols - after_symbols) + + +def _import_module(module_name: str): + import importlib + return importlib.import_module(module_name) diff --git a/web/maintenance/sources/__init__.py b/web/maintenance/sources/__init__.py new file mode 100644 index 0000000..33e21b6 --- /dev/null +++ b/web/maintenance/sources/__init__.py @@ -0,0 +1 @@ +"""External metadata sources used by the maintenance runner.""" diff --git a/web/maintenance/sources/hsi.py b/web/maintenance/sources/hsi.py new file mode 100644 index 0000000..ea961b7 --- /dev/null +++ b/web/maintenance/sources/hsi.py @@ -0,0 +1,65 @@ +"""Hang Seng Index constituent source adapter.""" + +from __future__ import annotations + +import re +from io import StringIO +from typing import Dict, List, Optional, Tuple + +SOURCE_URL = "https://en.wikipedia.org/wiki/Hang_Seng_Index" + + +def normalise_hk_ticker(raw: object) -> Optional[Tuple[str, str]]: + """Convert a source ticker cell into (symbol, display_symbol).""" + text = str(raw or "").strip().upper() + if not text: + return None + match = re.search(r"(\d{1,5})", text) + if not match: + return None + code = match.group(1).zfill(4) + return f"{code}.HK", code + + +def fetch_constituents() -> List[Dict[str, str]]: + """Fetch and normalize HSI constituents for app-compatible CSV output.""" + try: + import pandas as pd + import requests + except ImportError as exc: + raise RuntimeError("pandas and requests are required to refresh HSI constituents") from exc + + resp = requests.get(SOURCE_URL, headers={"User-Agent": "tws_robot/1.0"}, timeout=30) + resp.raise_for_status() + tables = pd.read_html(StringIO(resp.text)) + + source_df = None + for df in tables: + cols = {str(c).strip().lower(): c for c in df.columns} + if "ticker" in cols and "name" in cols: + source_df = df + break + if source_df is None: + raise RuntimeError("Could not find expected HSI constituents table with Ticker/Name columns") + + cols = {str(c).strip().lower(): c for c in source_df.columns} + ticker_col = cols["ticker"] + name_col = cols["name"] + sub_index_col = cols.get("sub-index") + + rows: List[Dict[str, str]] = [] + for _, rec in source_df.iterrows(): + normalised = normalise_hk_ticker(rec.get(ticker_col)) + if normalised is None: + continue + symbol, display_symbol = normalised + rows.append({ + "symbol": symbol, + "display_symbol": display_symbol, + "security": str(rec.get(name_col) or "").strip(), + "sector": str(rec.get(sub_index_col) or "").strip() if sub_index_col is not None else "", + "sub_industry": "", + }) + + rows_by_symbol = {row["symbol"]: row for row in rows} + return [rows_by_symbol[symbol] for symbol in sorted(rows_by_symbol)] diff --git a/web/maintenance/sources/sp500.py b/web/maintenance/sources/sp500.py new file mode 100644 index 0000000..59402ad --- /dev/null +++ b/web/maintenance/sources/sp500.py @@ -0,0 +1,34 @@ +"""S&P 500 constituent source adapter.""" + +from __future__ import annotations + +from typing import Dict, List + +SOURCE_URL = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies" + + +def fetch_constituents() -> List[Dict[str, str]]: + """Fetch and normalize S&P 500 constituents for app-compatible CSV output.""" + try: + import pandas as pd + except ImportError as exc: + raise RuntimeError("pandas is required to refresh S&P 500 constituents") from exc + + tables = pd.read_html(SOURCE_URL) + if not tables: + raise RuntimeError("No tables found on S&P 500 source page") + + df = tables[0] + required = ["Symbol", "Security", "GICS Sector", "GICS Sub-Industry"] + missing = [col for col in required if col not in df.columns] + if missing: + raise RuntimeError("S&P 500 source table missing columns: " + ", ".join(missing)) + + out = df[required].copy() + out.columns = ["symbol", "security", "sector", "sub_industry"] + out["symbol"] = out["symbol"].astype(str).str.strip().str.upper().str.replace(".", "-", regex=False) + out["security"] = out["security"].astype(str).str.strip() + out["sector"] = out["sector"].astype(str).str.strip() + out["sub_industry"] = out["sub_industry"].astype(str).str.strip() + out = out.drop_duplicates(subset=["symbol"]).sort_values(by=["symbol"]).reset_index(drop=True) + return out.to_dict(orient="records") diff --git a/web/maintenance/sources/sti.py b/web/maintenance/sources/sti.py new file mode 100644 index 0000000..4021011 --- /dev/null +++ b/web/maintenance/sources/sti.py @@ -0,0 +1,70 @@ +"""Straits Times Index constituent source adapter.""" + +from __future__ import annotations + +import re +from typing import Dict, List, Optional + +SOURCE_URL = "https://en.wikipedia.org/wiki/Straits_Times_Index" + + +def _normalise_sgx_symbol(raw: object) -> Optional[tuple[str, str]]: + text = str(raw or "").strip().upper() + if not text: + return None + # Common source cells may contain "D05", "D05.SI", or extra footnote text. + match = re.search(r"\b([A-Z0-9]{1,10})(?:\.SI)?\b", text) + if not match: + return None + display = match.group(1) + return f"{display}.SI", display + + +def fetch_constituents() -> List[Dict[str, str]]: + """Fetch and normalize STI constituents for app-compatible CSV output. + + Wikipedia table layouts change from time to time, so this adapter searches + for the first table with recognizable symbol and company/name columns. + """ + try: + import pandas as pd + except ImportError as exc: + raise RuntimeError("pandas is required to refresh STI constituents") from exc + + tables = pd.read_html(SOURCE_URL) + for df in tables: + cols = {str(c).strip().lower(): c for c in df.columns} + symbol_col = _first_col(cols, ("ticker", "symbol", "stock code", "code")) + name_col = _first_col(cols, ("company", "name", "constituent")) + sector_col = _first_col(cols, ("sector", "industry")) + if symbol_col is None or name_col is None: + continue + + rows: List[Dict[str, str]] = [] + for _, rec in df.iterrows(): + normalised = _normalise_sgx_symbol(rec.get(symbol_col)) + if normalised is None: + continue + symbol, display_symbol = normalised + rows.append({ + "symbol": symbol, + "display_symbol": display_symbol, + "security": str(rec.get(name_col) or "").strip(), + "sector": str(rec.get(sector_col) or "").strip() if sector_col is not None else "", + "sub_industry": "", + }) + if rows: + rows_by_symbol = {row["symbol"]: row for row in rows} + return [rows_by_symbol[symbol] for symbol in sorted(rows_by_symbol)] + + raise RuntimeError("Could not find expected STI constituents table") + + +def _first_col(cols: Dict[str, object], candidates: tuple[str, ...]) -> object | None: + for candidate in candidates: + if candidate in cols: + return cols[candidate] + for name, original in cols.items(): + if any(candidate in name for candidate in candidates): + return original + return None diff --git a/web/maintenance/tasks.py b/web/maintenance/tasks.py new file mode 100644 index 0000000..2468a6b --- /dev/null +++ b/web/maintenance/tasks.py @@ -0,0 +1,132 @@ +"""Structured result models for the system maintenance module. + +The maintenance subsystem is intentionally metadata-only. These lightweight +models make task output easy to serialize into API responses, JSON reports, and +Markdown summaries without coupling the runner to Flask or trading services. +""" + +from __future__ import annotations + +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + + +STATUS_SUCCESS = "success" +STATUS_FAILED = "failed" +STATUS_SKIPPED = "skipped" +STATUS_WARNING = "warning" +STATUS_PARTIAL_FAILURE = "partial_failure" + + +def utc_now_iso() -> str: + """Return a timezone-aware UTC timestamp suitable for reports.""" + return datetime.now(timezone.utc).isoformat() + + +@dataclass +class ValidationResult: + """Validation outcome for one maintenance artifact.""" + + status: str = STATUS_SUCCESS + warnings: List[str] = field(default_factory=list) + errors: List[str] = field(default_factory=list) + detail: Dict[str, Any] = field(default_factory=dict) + + @property + def passed(self) -> bool: + return not self.errors + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +@dataclass +class MaintenanceTaskResult: + """Result returned by each maintenance task. + + Keep this object deliberately explicit so future UI/API code can display + counts, diffs, sources, warnings, and errors without parsing logs. + """ + + task: str + status: str = STATUS_SUCCESS + dry_run: bool = True + source: Optional[str] = None + started_at: str = field(default_factory=utc_now_iso) + finished_at: Optional[str] = None + duration_seconds: float = 0.0 + before_count: Optional[int] = None + after_count: Optional[int] = None + added: List[str] = field(default_factory=list) + removed: List[str] = field(default_factory=list) + validation: ValidationResult = field(default_factory=ValidationResult) + warnings: List[str] = field(default_factory=list) + errors: List[str] = field(default_factory=list) + detail: Dict[str, Any] = field(default_factory=dict) + + @property + def ok(self) -> bool: + return self.status == STATUS_SUCCESS and not self.errors and self.validation.passed + + def finish(self, *, started_monotonic: float, now_monotonic: float) -> None: + self.finished_at = utc_now_iso() + self.duration_seconds = round(max(0.0, now_monotonic - started_monotonic), 3) + if self.validation.errors and self.status == STATUS_SUCCESS: + self.status = STATUS_FAILED + if self.errors and self.status == STATUS_SUCCESS: + self.status = STATUS_FAILED + if self.validation.warnings: + self.warnings.extend(w for w in self.validation.warnings if w not in self.warnings) + + def to_dict(self) -> Dict[str, Any]: + data = asdict(self) + data["validation"] = self.validation.to_dict() + data["ok"] = self.ok + return data + + +@dataclass +class MaintenanceRunReport: + """Top-level report produced by a maintenance run.""" + + report_id: str + dry_run: bool + status: str = STATUS_SUCCESS + started_at: str = field(default_factory=utc_now_iso) + finished_at: Optional[str] = None + duration_seconds: float = 0.0 + results: List[MaintenanceTaskResult] = field(default_factory=list) + warnings: List[str] = field(default_factory=list) + errors: List[str] = field(default_factory=list) + report_json_path: Optional[str] = None + report_markdown_path: Optional[str] = None + + def finalize(self, *, started_monotonic: float, now_monotonic: float) -> None: + self.finished_at = utc_now_iso() + self.duration_seconds = round(max(0.0, now_monotonic - started_monotonic), 3) + self.warnings = [w for r in self.results for w in r.warnings] + self.errors = [e for r in self.results for e in list(r.errors) + list(r.validation.errors)] + if any(r.status == STATUS_FAILED for r in self.results): + self.status = STATUS_FAILED + elif any(r.status == STATUS_PARTIAL_FAILURE for r in self.results): + self.status = STATUS_PARTIAL_FAILURE + elif any(r.warnings for r in self.results): + self.status = STATUS_WARNING + else: + self.status = STATUS_SUCCESS + + def to_dict(self) -> Dict[str, Any]: + return { + "report_id": self.report_id, + "dry_run": self.dry_run, + "status": self.status, + "started_at": self.started_at, + "finished_at": self.finished_at, + "duration_seconds": self.duration_seconds, + "results": [r.to_dict() for r in self.results], + "warnings": list(self.warnings), + "errors": list(self.errors), + "report_json_path": self.report_json_path, + "report_markdown_path": self.report_markdown_path, + } diff --git a/web/maintenance/validators.py b/web/maintenance/validators.py new file mode 100644 index 0000000..44ce8f5 --- /dev/null +++ b/web/maintenance/validators.py @@ -0,0 +1,126 @@ +"""Validation helpers for system maintenance artifacts.""" + +from __future__ import annotations + +import re +from typing import Dict, Iterable, List, Mapping, Optional, Sequence + +from web.maintenance.tasks import STATUS_FAILED, STATUS_SUCCESS, ValidationResult + + +REQUIRED_CONSTITUENT_COLUMNS = ("symbol", "security", "sector", "sub_industry") + +_MARKET_RULES = { + "sp500": { + "min_count": 450, + "symbol_re": re.compile(r"^[A-Z0-9]{1,10}(-[A-Z0-9]{1,5})?$"), + "label": "S&P 500", + }, + "sti": { + "min_count": 25, + "symbol_re": re.compile(r"^[A-Z0-9]{1,10}\.SI$"), + "label": "STI", + }, + "hsi": { + "min_count": 70, + "symbol_re": re.compile(r"^\d{4,5}\.HK$"), + "label": "HSI", + }, +} + + +def validate_constituent_rows( + rows: Sequence[Mapping[str, object]], + *, + market: str, + before_count: Optional[int] = None, + allow_large_change: bool = False, +) -> ValidationResult: + """Validate normalized constituent rows before they are applied. + + The function deliberately returns all errors/warnings instead of raising so + the Maintenance UI can surface a useful operator-facing report. + """ + rules = _MARKET_RULES.get(market) + if rules is None: + return ValidationResult( + status=STATUS_FAILED, + errors=[f"Unknown constituent market: {market}"], + ) + + warnings: List[str] = [] + errors: List[str] = [] + after_count = len(rows) + + if after_count == 0: + errors.append(f"{rules['label']} source produced no rows") + + if after_count < int(rules["min_count"]): + errors.append( + f"{rules['label']} row count {after_count} is below minimum threshold {rules['min_count']}" + ) + + missing_columns = _missing_required_columns(rows, REQUIRED_CONSTITUENT_COLUMNS) + if missing_columns: + errors.append("Missing required columns: " + ", ".join(sorted(missing_columns))) + + symbols = [str(row.get("symbol") or "").strip().upper() for row in rows] + blank_symbols = [idx + 1 for idx, sym in enumerate(symbols) if not sym] + if blank_symbols: + errors.append(f"Blank symbols found at rows: {blank_symbols[:10]}") + + duplicates = sorted(_duplicates(sym for sym in symbols if sym)) + if duplicates: + errors.append("Duplicate symbols found: " + ", ".join(duplicates[:20])) + + symbol_re = rules["symbol_re"] + invalid_symbols = [sym for sym in symbols if sym and not symbol_re.match(sym)] + if invalid_symbols: + errors.append("Invalid symbol format: " + ", ".join(invalid_symbols[:20])) + + if before_count and before_count > 0: + pct_change = abs(after_count - before_count) / before_count + if pct_change > 0.25 and not allow_large_change: + errors.append( + f"Suspicious count change: {before_count} -> {after_count} ({pct_change:.1%})" + ) + elif pct_change > 0.10: + warnings.append( + f"Large count change: {before_count} -> {after_count} ({pct_change:.1%})" + ) + + status = STATUS_SUCCESS if not errors else STATUS_FAILED + return ValidationResult( + status=status, + warnings=warnings, + errors=errors, + detail={ + "market": market, + "before_count": before_count, + "after_count": after_count, + "minimum_count": rules["min_count"], + "required_columns": list(REQUIRED_CONSTITUENT_COLUMNS), + }, + ) + + +def _missing_required_columns( + rows: Sequence[Mapping[str, object]], + required_columns: Iterable[str], +) -> List[str]: + if not rows: + return list(required_columns) + seen = set() + for row in rows: + seen.update(str(key) for key in row.keys()) + return [col for col in required_columns if col not in seen] + + +def _duplicates(values: Iterable[str]) -> List[str]: + seen = set() + duplicates = set() + for value in values: + if value in seen: + duplicates.add(value) + seen.add(value) + return sorted(duplicates) diff --git a/web/routes/api_maintenance.py b/web/routes/api_maintenance.py new file mode 100644 index 0000000..ba06593 --- /dev/null +++ b/web/routes/api_maintenance.py @@ -0,0 +1,127 @@ +"""System Maintenance API. + +GET /api/maintenance/status +POST /api/maintenance/run +GET /api/maintenance/reports +GET /api/maintenance/reports/ +""" + +from __future__ import annotations + +import logging +from typing import Iterable, List + +from flask import Blueprint, jsonify, request + +from web.maintenance.runner import MaintenanceRunner +from web.services import get_services + +logger = logging.getLogger(__name__) + +bp = Blueprint("api_maintenance", __name__, url_prefix="/api/maintenance") + + +def _runner() -> MaintenanceRunner: + return MaintenanceRunner() + + +@bp.route("/status", methods=["GET"]) +def status(): + """Return constituent file health and recent report summaries.""" + try: + return jsonify(_runner().get_status()) + except Exception as exc: + logger.error("Maintenance status failed: %s", exc, exc_info=True) + return jsonify({"error": "Maintenance status could not be loaded."}), 500 + + +@bp.route("/run", methods=["POST"]) +def run(): + """Run selected maintenance tasks. + + Request JSON: + tasks: list[str] + dry_run: bool, default True + event_symbols: list[str], optional + days_ahead: int, default 28 + allow_large_change: bool, default False + include_portfolio_symbols: bool, default True: when True and the market_events + task is included, portfolio and strategy symbols are appended to event_symbols + """ + payload = request.get_json(silent=True) or {} + tasks = payload.get("tasks") or None + if isinstance(tasks, str): + tasks = [tasks] + if tasks is not None and not isinstance(tasks, list): + return jsonify({"error": "tasks must be a string or list of strings"}), 400 + + dry_run = bool(payload.get("dry_run", True)) + days_ahead = _bounded_int(payload.get("days_ahead"), default=28, minimum=1, maximum=90) + event_symbols = _csv_or_list(payload.get("event_symbols")) + include_portfolio = bool(payload.get("include_portfolio_symbols", True)) + if include_portfolio and (tasks is None or "market_events" in tasks): + event_symbols.extend(_portfolio_symbols()) + + try: + report = _runner().run( + tasks=tasks, + dry_run=dry_run, + event_symbols=sorted(set(event_symbols)), + days_ahead=days_ahead, + allow_large_change=bool(payload.get("allow_large_change", False)), + ) + status_code = 200 if report.status != "failed" else 422 + return jsonify(report.to_dict()), status_code + except Exception as exc: + logger.error("Maintenance run failed: %s", exc, exc_info=True) + return jsonify({"error": "Maintenance run could not be completed."}), 500 + + +@bp.route("/reports", methods=["GET"]) +def reports(): + """Return recent maintenance report summaries.""" + limit = _bounded_int(request.args.get("limit"), default=20, minimum=1, maximum=100) + return jsonify({"reports": _runner().list_reports(limit=limit)}) + + +@bp.route("/reports/", methods=["GET"]) +def report_detail(report_id: str): + """Return one maintenance report by id.""" + report = _runner().read_report(report_id) + if report is None: + return jsonify({"error": "Report not found"}), 404 + return jsonify(report) + + +def _bounded_int(value, *, default: int, minimum: int, maximum: int) -> int: + try: + parsed = int(value) + except (TypeError, ValueError): + parsed = default + return max(minimum, min(parsed, maximum)) + + +def _csv_or_list(value) -> List[str]: + if value is None: + return [] + if isinstance(value, str): + raw_items: Iterable[str] = value.split(",") + elif isinstance(value, list): + raw_items = (str(item) for item in value) + else: + return [] + return [item.strip().upper() for item in raw_items if item and item.strip()] + + +def _portfolio_symbols() -> List[str]: + symbols = set() + try: + app_svc = get_services() + for sym in app_svc.get_positions(): + symbols.add(str(sym).upper()) + for strategy in app_svc.strategy_registry.get_all_strategies(): + for sym in (strategy.config.symbols or []): + symbols.add(str(sym).upper()) + except Exception: + pass + return sorted(symbols) diff --git a/web/routes/maintenance.py b/web/routes/maintenance.py new file mode 100644 index 0000000..93f3ec4 --- /dev/null +++ b/web/routes/maintenance.py @@ -0,0 +1,17 @@ +"""System Maintenance page route.""" + +from __future__ import annotations + +from flask import Blueprint, render_template + +bp = Blueprint("maintenance", __name__, url_prefix="/maintenance") + + +@bp.route("") +def index(): + """Render the System Maintenance console.""" + return render_template( + "maintenance/index.html", + title="System Maintenance", + active_page="maintenance", + ) diff --git a/web/templates/maintenance/index.html b/web/templates/maintenance/index.html new file mode 100644 index 0000000..bb014cc --- /dev/null +++ b/web/templates/maintenance/index.html @@ -0,0 +1,133 @@ +{% extends "base.html" %} +{% block title %}{{ title }}{% endblock %} +{% block content %} +

🛠️ System Maintenance

+

+ Refresh market metadata safely. Dry-run previews changes; apply mode writes only validated metadata and reports. +

+ +
+ + + + + +
+ +
+

Metadata Status

+
Loading maintenance status…
+
+ +
+

Latest Run Result

+
No run yet.
+
+ +
+

Recent Reports

+
Loading reports…
+
+{% endblock %} + +{% block extra_scripts %} + +{% endblock %}