diff --git a/.github/workflows/theseus-engine.yml b/.github/workflows/theseus-engine.yml index fa2a444..b433db7 100644 --- a/.github/workflows/theseus-engine.yml +++ b/.github/workflows/theseus-engine.yml @@ -18,18 +18,14 @@ jobs: - uses: actions/checkout@v4 - id: extract run: | - REPOS=$(python -c ' - import json - with open("theseus.config.json") as f: - config = json.load(f) - names = [r["name"] for r in config.get("repositories", [])] - print(json.dumps(names)) - ') + REPOS=$(python scripts/workflow.py discover-repos) echo "repos=$REPOS" >> "$GITHUB_OUTPUT" analyze: needs: discover-repos runs-on: ubuntu-latest + permissions: + contents: write strategy: fail-fast: false matrix: @@ -78,6 +74,7 @@ jobs: > /tmp/data-save/status.json git reset --hard HEAD 2>/dev/null || true + # Discard generated data but keep gitignored files (-fd, no -x) git clean -fd 2>/dev/null || true git fetch origin chore/monthly-data-update 2>/dev/null || true @@ -143,6 +140,33 @@ jobs: echo "has_branch=false" >> "$GITHUB_OUTPUT" fi + - name: Fix shared branch ancestry + if: steps.shared.outputs.has_branch == 'true' + run: | + if git merge-base --is-ancestor origin/main HEAD 2>/dev/null; then + echo "Shared branch has common history with main." + exit 0 + fi + echo "Shared branch has orphaned history. Rebasing onto main..." + SAVE_DIR=$(mktemp -d) + cp -r data/* "$SAVE_DIR"/ 2>/dev/null || true + git checkout origin/main + # Full reset: remove everything from index and all untracked/ignored files + git rm -rf --cached . >/dev/null 2>&1 || true + git clean -fdx >/dev/null 2>&1 || true + mkdir -p data/raw data/processed data/.status + cp -r "$SAVE_DIR"/* data/ 2>/dev/null || true + rm -rf "$SAVE_DIR" + git add data/ + git -c user.name="github-actions[bot]" \ + -c user.email="41898282+github-actions[bot]@users.noreply.github.com" \ + commit -m "fix: rebase shared data onto main" + # Force-push is safe here: create-pr is the only job at this point, + # and the orphaned branch must be replaced to share history with main + git push origin HEAD:chore/monthly-data-update --force + git fetch origin chore/monthly-data-update + git checkout -B chore/monthly-data-update origin/chore/monthly-data-update + - name: Check for status markers id: check if: steps.shared.outputs.has_branch == 'true' @@ -155,67 +179,11 @@ jobs: - name: Build PR body if: steps.check.outputs.has_data == 'true' - run: | - python << 'PYEOF' - import json, os, glob - - status_dir = "data/.status" - statuses = {} - for f in sorted(glob.glob(os.path.join(status_dir, "*.json"))): - with open(f) as fh: - s = json.load(fh) - statuses[s["repo"]] = s["status"] - - total = len(statuses) - passed = sum(1 for v in statuses.values() if v == "success") - - rows = "\n".join( - f"| {repo} | {'✅' if s == 'success' else '❌'} |" - for repo, s in sorted(statuses.items()) - ) - - header = "## Automated Theseus Data Engine Run\n\n" - table = "| Repo | Status |\n|------|--------|\n" - total_row = f"| **Total** | **{passed}/{total} completed** |\n\n" - footer = ("This pull request contains the latest pre-computed " - "persistence data for the tracked repositories.\n\n" - "**Trigger:** Monthly Schedule / Workflow Dispatch") - body = header + table + rows + "\n" + total_row + footer - with open("pr-body.md", "w") as f: - f.write(body) - PYEOF + run: python scripts/workflow.py build-pr-body - name: Validate graph files if: steps.check.outputs.has_data == 'true' - run: | - python << 'PYEOF' - import json, glob, sys - - files = sorted(glob.glob("data/processed/*.json")) - if not files: - print("No processed files found to validate.") - sys.exit(1) - - errors = 0 - for f in files: - try: - with open(f) as fh: - data = json.load(fh) - assert "snapshots" in data, f"Missing snapshots in {f}" - assert "fossils" in data, f"Missing fossils in {f}" - for snap in data["snapshots"]: - assert "snapshot_date" in snap, f"Missing snapshot_date in {f}" - assert "composition" in snap, f"Missing composition in {f}" - print(f" ✓ {f}") - except (json.JSONDecodeError, AssertionError, KeyError) as e: - print(f" ✗ {f}: {e}") - errors += 1 - - if errors: - print(f"Validation failed: {errors} error(s)") - sys.exit(1) - print("All graph files validated.") - PYEOF + run: python scripts/workflow.py validate-graph-files - name: Create or update pull request if: steps.check.outputs.has_data == 'true' diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index 48a706f..3268c4a 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -12,13 +12,23 @@ jobs: runs-on: ubuntu-latest steps: - - name: Checkout code - uses: actions/checkout@v4 + - uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4 + with: + persist-credentials: false - - name: Setup Python and Poetry - uses: ./.github/actions/setup-python-poetry + - name: Set up Python + uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5 with: - poetry-install-args: --with dev + python-version: "3.12" + cache: pip + + - name: Install Poetry + run: pipx install poetry + shell: bash + + - name: Install dependencies + run: poetry install --with dev + shell: bash - name: Run linter run: poetry run pylint scripts/ --output-format=colorized diff --git a/pyproject.toml b/pyproject.toml index b30ea22..83114ac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,9 @@ dev = [ "pylint>=4.0.5,<5.0.0" ] +[tool.pylint] +init-hook = "import sys; sys.path.insert(0, 'scripts')" + [tool.pylint.format] max-line-length = 120 diff --git a/scripts/__init__.py b/scripts/__init__.py index e69de29..7009efe 100644 --- a/scripts/__init__.py +++ b/scripts/__init__.py @@ -0,0 +1,14 @@ +""" +Ensure the scripts directory is on sys.path for sibling imports. + +When ``scripts`` is imported as a package (e.g. ``from scripts._blame import ...`` +from tests), this adds the package directory to ``sys.path`` so that subsequent +``import _path_guard`` calls from sibling modules resolve correctly. +""" + +import sys +from pathlib import Path + +_SCRIPTS_DIR = str(Path(__file__).resolve().parent) +if _SCRIPTS_DIR not in sys.path: + sys.path.insert(0, _SCRIPTS_DIR) diff --git a/scripts/_blame.py b/scripts/_blame.py index 9fb1c46..6c04314 100644 --- a/scripts/_blame.py +++ b/scripts/_blame.py @@ -42,17 +42,12 @@ import concurrent.futures import logging -import os -import sys import threading from collections import defaultdict from datetime import datetime, timezone from pathlib import Path -# Ensure sibling imports work in all invocation contexts -_SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) -if _SCRIPTS_DIR not in sys.path: - sys.path.insert(0, _SCRIPTS_DIR) +import _path_guard # noqa: F401 # pylint: disable=unused-import from _utils import run_command @@ -269,7 +264,7 @@ def blame_year_counts(self, files: list[str]) -> dict[str, int]: logger.info(" Blaming %d files (%d workers)...", len(files), self.max_workers) age_distribution: dict[str, int] = defaultdict(int) - def _accumulate(file_path: str, raw_output: str) -> None: + def _accumulate(_file_path: str, raw_output: str) -> None: for year, count in parse_blame_year_counts(raw_output).items(): age_distribution[year] += count diff --git a/scripts/_data_io.py b/scripts/_data_io.py index ba2c21f..f0f7482 100644 --- a/scripts/_data_io.py +++ b/scripts/_data_io.py @@ -43,13 +43,12 @@ import json import logging -import os +from pathlib import Path logger = logging.getLogger(__name__) -# TODO: Move away from OS to Pathlib -def load_snapshot_data(file_path: str) -> dict: +def load_snapshot_data(file_path: str | Path) -> dict: """ Load snapshot data from a JSON file, normalising to ``{snapshots, fossils}``. @@ -59,12 +58,12 @@ def load_snapshot_data(file_path: str) -> dict: :param file_path: Path to the JSON data file. :return: Dictionary with ``snapshots`` (list) and ``fossils`` (dict) keys. """ - if not os.path.exists(file_path): + path = Path(file_path) + if not path.exists(): return {"snapshots": [], "fossils": {}} try: - with open(file_path, "r", encoding="utf-8") as f: - data = json.load(f) + data = json.loads(path.read_text(encoding="utf-8")) if isinstance(data, list): return {"snapshots": data, "fossils": {}} if isinstance(data, dict): @@ -81,20 +80,19 @@ def load_snapshot_data(file_path: str) -> dict: return {"snapshots": [], "fossils": {}} -# TODO: Move away from OS to Pathlib -def save_snapshot_data(file_path: str, snapshots: list[dict], fossils: dict) -> None: +def save_snapshot_data(file_path: str | Path, snapshots: list[dict], fossils: dict) -> None: """ Atomically write snapshot data to a minified JSON file. Writes to a ``.tmp`` sibling first, then atomically replaces the target - via ``os.replace`` to prevent file corruption on crash. + to prevent file corruption on crash. :param file_path: Destination path. :param snapshots: List of snapshot objects. :param fossils: Fossil dictionary (``genesis`` + ``survivor`` keys). """ - tmp_path = file_path + ".tmp" + path = Path(file_path) + tmp_path = path.with_suffix(path.suffix + ".tmp") data = {"snapshots": snapshots, "fossils": fossils} - with open(tmp_path, "w", encoding="utf-8") as f: - json.dump(data, f, separators=(",", ":")) - os.replace(tmp_path, file_path) + tmp_path.write_text(json.dumps(data, separators=(",", ":")), encoding="utf-8") + tmp_path.replace(path) diff --git a/scripts/_path_guard.py b/scripts/_path_guard.py new file mode 100644 index 0000000..89348a4 --- /dev/null +++ b/scripts/_path_guard.py @@ -0,0 +1,17 @@ +""" +Ensure the scripts directory is on sys.path for sibling imports. + +Every script in this directory should ``import _path_guard`` (with a +``# noqa: F401`` comment if the linter complains) before importing any +sibling module. This is a no-op when the script's directory is already +on ``sys.path`` (the normal case for ``python scripts/foo.py``), but +guarantees correctness for ``python -m scripts.foo`` and test-runner +invocations. +""" + +import sys +from pathlib import Path + +_SCRIPTS_DIR = str(Path(__file__).resolve().parent) +if _SCRIPTS_DIR not in sys.path: + sys.path.insert(0, _SCRIPTS_DIR) diff --git a/scripts/_utils.py b/scripts/_utils.py index 2a1beab..dda0dd0 100644 --- a/scripts/_utils.py +++ b/scripts/_utils.py @@ -215,6 +215,7 @@ def remove_path(path: str) -> None: ["cmd", "/c", "rd", "/s", "/q", path], capture_output=True, timeout=30, + check=False, ) if not os.path.exists(path): return @@ -226,6 +227,7 @@ def remove_path(path: str) -> None: ["rm", "-rf", path], capture_output=True, timeout=30, + check=False, ) if not os.path.exists(path): return @@ -233,7 +235,7 @@ def remove_path(path: str) -> None: pass # Fallback: retry with shutil.rmtree, fixing permissions on each retry - def handle_remove_readonly(func, path, _exc_info): + def _handle_remove_readonly(func, path, _exc): try: current_mode = os.stat(path).st_mode os.chmod( @@ -248,7 +250,7 @@ def handle_remove_readonly(func, path, _exc_info): for attempt in range(3): try: - shutil.rmtree(path, onerror=handle_remove_readonly) + shutil.rmtree(path, onexc=_handle_remove_readonly) break except Exception: # noqa: BLE001 if attempt < 2: diff --git a/scripts/add_fossils.py b/scripts/add_fossils.py index f08bf33..ce6c29e 100644 --- a/scripts/add_fossils.py +++ b/scripts/add_fossils.py @@ -51,14 +51,10 @@ import argparse import logging -import os import sys from pathlib import Path -# Ensure sibling imports work in all invocation contexts -_SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) -if _SCRIPTS_DIR not in sys.path: - sys.path.insert(0, _SCRIPTS_DIR) +import _path_guard # noqa: F401 # pylint: disable=unused-import from _blame import BlameRunner, _blank_fossil from _data_io import load_snapshot_data, save_snapshot_data @@ -273,21 +269,29 @@ def get_survivor_fossil(repo_path: str | Path) -> dict: # --------------------------------------------------------------------------- -# Full backfill driver +# Shared repo-iteration helper # --------------------------------------------------------------------------- -def backfill_fossils(data_dir: str, repo_urls: dict[str, str]) -> bool: +def _process_each_repo( + data_dir: str, + repo_urls: dict[str, str], + process_fn, + log_prefix: str = "Processing", +) -> bool: """ - For every repo JSON in ``data_dir``, recompute both fossils without - touching snapshot data. + Iterate over every JSON file in ``data_dir/raw/``, clone/fetch each repo, + call *process_fn* with ``(json_file, snapshots, existing_fossils, local_repo, repo_name)``, + then clean up the temp directory. - Always forces a fresh recompute of both genesis and survivor for every - repository. + Handles iteration, cloning, fetch, temp-dir cleanup, and error logging. + *process_fn* should return ``None`` for success or a string error message. :param data_dir: Path to the ``data/`` directory. :param repo_urls: ``{repo_name: clone_url}`` mapping. - :return: ``True`` if any errors occurred, ``False`` otherwise. + :param process_fn: Callback taking ``(json_file, snapshots, existing_fossils, local_repo, repo_name)``. + :param log_prefix: Prefix for log messages (e.g. ``"Processing"``, ``"Checking survivor for"``). + :return: ``True`` if any errors occurred. """ data_path = Path(data_dir) / "raw" had_failures = False @@ -302,17 +306,19 @@ def backfill_fossils(data_dir: str, repo_urls: dict[str, str]) -> bool: logger.warning("No URL found for '%s', skipping.", repo_name) continue - logger.info("━━━ Processing: %s ━━━", repo_name) + logger.info("━━━ %s: %s ━━━", log_prefix, repo_name) data = load_snapshot_data(str(json_file)) snapshots = data["snapshots"] + existing_fossils = data.get("fossils", {}) + if not snapshots: logger.warning(" No snapshots found in %s, skipping.", json_file.name) continue - temp_dir = Path(f"./temp_fossil_repos_{repo_name}") - temp_dir.mkdir(exist_ok=True) - local_repo = temp_dir + base_temp = Path("./temp_fossil_repos") + base_temp.mkdir(exist_ok=True) + local_repo = base_temp / repo_name if not local_repo.exists(): logger.info(" Cloning %s...", repo_url) @@ -325,153 +331,131 @@ def backfill_fossils(data_dir: str, repo_urls: dict[str, str]) -> bool: logger.warning(" Fetch failed (continuing with local): %s", e) try: - genesis = get_genesis_fossil(local_repo) - survivor = get_survivor_fossil(local_repo) - fossils = {"genesis": genesis, "survivor": survivor} - - if not genesis.get("file"): - logger.warning(" ⚠ Genesis fossil is empty for %s", repo_name) - if not survivor.get("file"): - logger.warning(" ⚠ Survivor fossil is empty for %s", repo_name) - if genesis.get("commit") == survivor.get("commit") and genesis.get("file"): - logger.warning( - " ⚠ Genesis and Survivor share the same commit (%s) " - "— may indicate the repo was never fully rewritten.", - genesis["commit"], - ) - - logger.info( - " Genesis → %s | %s:%s | %s", - genesis.get("year"), - genesis.get("file"), - genesis.get("line"), - genesis.get("commit"), - ) - logger.info( - " Survivor → %s | %s:%s | %s", - survivor.get("year"), - survivor.get("file"), - survivor.get("line"), - survivor.get("commit"), - ) - - save_snapshot_data(str(json_file), snapshots, fossils) - logger.info(" ✓ Successfully wrote fossils for %s", repo_name) - + error = process_fn(json_file, snapshots, existing_fossils, local_repo, repo_name) + if error: + logger.error(" ✗ %s", error) + had_failures = True + else: + logger.info(" ✓ %s", repo_name) except Exception as e: # noqa: BLE001 - logger.error(" ✗ Error computing fossils for %s: %s", repo_name, e) + logger.error(" ✗ Error processing %s: %s", repo_name, e) had_failures = True - if temp_dir.exists(): - remove_path(str(temp_dir)) + if local_repo.exists(): + remove_path(str(local_repo)) return had_failures # --------------------------------------------------------------------------- -# Incremental survivor-only update (used by GitHub Actions) +# Full backfill driver # --------------------------------------------------------------------------- -def update_survivor_fossils(data_dir: str, repo_urls: dict[str, str]) -> bool: +def _backfill_one( + json_file: Path, snapshots: list, _fossils: dict, local_repo: Path, repo_name: str +) -> str | None: + """Compute both fossils for a single repo; return error string or ``None``.""" + genesis = get_genesis_fossil(local_repo) + survivor = get_survivor_fossil(local_repo) + new_fossils = {"genesis": genesis, "survivor": survivor} + + if not genesis.get("file"): + logger.warning(" ⚠ Genesis fossil is empty for %s", repo_name) + if not survivor.get("file"): + logger.warning(" ⚠ Survivor fossil is empty for %s", repo_name) + if genesis.get("commit") == survivor.get("commit") and genesis.get("file"): + logger.warning( + " ⚠ Genesis and Survivor share the same commit (%s) " + "— may indicate the repo was never fully rewritten.", + genesis["commit"], + ) + + logger.info( + " Genesis → %s | %s:%s | %s", + genesis.get("year"), genesis.get("file"), genesis.get("line"), genesis.get("commit"), + ) + logger.info( + " Survivor → %s | %s:%s | %s", + survivor.get("year"), survivor.get("file"), survivor.get("line"), survivor.get("commit"), + ) + + save_snapshot_data(str(json_file), snapshots, new_fossils) + + +def backfill_fossils(data_dir: str, repo_urls: dict[str, str]) -> bool: """ - Refresh only the Survivor (Living) fossil for each repo. + For every repo JSON in ``data_dir``, recompute both fossils without + touching snapshot data. - Skips writing to disk if the fossil's ``file:line:commit`` has not changed. - Designed to run on every monthly cron tick so the living fossil stays - current even when no new snapshots are being added. + Always forces a fresh recompute of both genesis and survivor for every + repository. :param data_dir: Path to the ``data/`` directory. :param repo_urls: ``{repo_name: clone_url}`` mapping. :return: ``True`` if any errors occurred, ``False`` otherwise. """ - data_path = Path(data_dir) / "raw" - - updated_count = 0 - had_failures = False - - for json_file in sorted(data_path.glob("*.json")): - if json_file.name == "manifest.json": - continue - - repo_name = json_file.stem.replace("_data", "") - repo_url = repo_urls.get(repo_name) - if not repo_url: - logger.warning("No URL found for '%s', skipping.", repo_name) - continue - - logger.info("━━━ Checking survivor for: %s ━━━", repo_name) - - data = load_snapshot_data(str(json_file)) - snapshots = data["snapshots"] - existing_fossils = data.get("fossils", {}) - - if not snapshots: - logger.warning(" No snapshots found in %s, skipping.", json_file.name) - continue + return _process_each_repo(data_dir, repo_urls, _backfill_one, log_prefix="Processing") - existing_survivor = existing_fossils.get("survivor", {}) - temp_dir = Path(f"./temp_fossil_repos_{repo_name}") - temp_dir.mkdir(exist_ok=True) - local_repo = temp_dir +# --------------------------------------------------------------------------- +# Incremental survivor-only update (used by GitHub Actions) +# --------------------------------------------------------------------------- - if not local_repo.exists(): - logger.info(" Cloning %s...", repo_url) - run_command(["git", "clone", repo_url, str(local_repo)]) - else: - logger.info(" Fetching latest...") - try: - run_command(["git", "fetch", "--all"], cwd=str(local_repo)) - except RuntimeError as e: - logger.warning(" Fetch failed (continuing with local): %s", e) - try: - new_survivor = get_survivor_fossil(local_repo) +def _update_survivor_one( + json_file: Path, snapshots: list, existing_fossils: dict, + local_repo: Path, repo_name: str, +) -> str | None: + """Update survivor fossil for a single repo; return error string or ``None``.""" + existing_survivor = existing_fossils.get("survivor", {}) + new_survivor = get_survivor_fossil(local_repo) - old_identity = _fossil_identity(existing_survivor) - new_identity = _fossil_identity(new_survivor) - metadata_changed = existing_survivor.get("view_commit") != new_survivor.get( - "view_commit" - ) + old_identity = _fossil_identity(existing_survivor) + new_identity = _fossil_identity(new_survivor) + metadata_changed = existing_survivor.get("view_commit") != new_survivor.get("view_commit") - if old_identity == new_identity and not metadata_changed: - logger.info( - " ✓ Survivor unchanged: %s:%s (commit %s) — skipping write.", - new_survivor.get("file"), - new_survivor.get("line"), - new_survivor.get("commit"), - ) - continue + if old_identity == new_identity and not metadata_changed: + logger.info( + " ✓ Survivor unchanged: %s:%s (commit %s) — skipping write.", + new_survivor.get("file"), + new_survivor.get("line"), + new_survivor.get("commit"), + ) + return None + + logger.info(" ↻ Survivor updated for %s:", repo_name) + logger.info( + " OLD: %s:%s @ %s", + existing_survivor.get("file"), + existing_survivor.get("line"), + existing_survivor.get("commit"), + ) + logger.info( + " NEW: %s:%s @ %s", + new_survivor.get("file"), + new_survivor.get("line"), + new_survivor.get("commit"), + ) - logger.info(" ↻ Survivor updated for %s:", repo_name) - logger.info( - " OLD: %s:%s @ %s", - existing_survivor.get("file"), - existing_survivor.get("line"), - existing_survivor.get("commit"), - ) - logger.info( - " NEW: %s:%s @ %s", - new_survivor.get("file"), - new_survivor.get("line"), - new_survivor.get("commit"), - ) + updated_fossils = {**existing_fossils, "survivor": new_survivor} + save_snapshot_data(str(json_file), snapshots, updated_fossils) + return None - updated_fossils = {**existing_fossils, "survivor": new_survivor} - save_snapshot_data(str(json_file), snapshots, updated_fossils) - logger.info(" ✓ Wrote updated survivor for %s", repo_name) - updated_count += 1 - except Exception as e: # noqa: BLE001 - logger.error(" ✗ Error updating survivor for %s: %s", repo_name, e) - had_failures = True +def update_survivor_fossils(data_dir: str, repo_urls: dict[str, str]) -> bool: + """ + Refresh only the Survivor (Living) fossil for each repo. - if temp_dir.exists(): - remove_path(str(temp_dir)) + Skips writing to disk if the fossil's ``file:line:commit`` has not changed. + Designed to run on every monthly cron tick so the living fossil stays + current even when no new snapshots are being added. - logger.info("\nSurvivor update complete. %d repo(s) updated.", updated_count) - return had_failures + :param data_dir: Path to the ``data/`` directory. + :param repo_urls: ``{repo_name: clone_url}`` mapping. + :return: ``True`` if any errors occurred, ``False`` otherwise. + """ + return _process_each_repo(data_dir, repo_urls, _update_survivor_one, log_prefix="Checking survivor for") # --------------------------------------------------------------------------- diff --git a/scripts/analyse_repository.py b/scripts/analyse_repository.py index c8bd371..3a8dff1 100644 --- a/scripts/analyse_repository.py +++ b/scripts/analyse_repository.py @@ -24,6 +24,7 @@ """ import argparse +import concurrent.futures import logging import os import sys @@ -31,10 +32,7 @@ from collections import defaultdict from itertools import groupby -# Ensure sibling imports work in all invocation contexts -_SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) -if _SCRIPTS_DIR not in sys.path: - sys.path.insert(0, _SCRIPTS_DIR) +import _path_guard # noqa: F401 # pylint: disable=unused-import from _blame import BlameRunner from _data_io import load_snapshot_data, save_snapshot_data @@ -44,6 +42,7 @@ get_default_branch, get_tracked_files, load_config, + remove_path, run_command, ) @@ -285,6 +284,141 @@ def _filter_snapshots( return result +def _ensure_repo_ready(repo_slug: str, repo_name: str, temp_repo_path: str) -> None: + """Clone or fetch the repository so it's ready for analysis.""" + if not os.path.exists(temp_repo_path): + clone_repository(repo_slug, temp_repo_path) + return + + logger.info("Repository %s already exists locally. Fetching latest...", repo_name) + run_command(["git", "fetch", "--all"], cwd=temp_repo_path) + default_branch = get_default_branch(temp_repo_path) + if default_branch == "HEAD": + raise RuntimeError( + f"[{repo_name}] Cannot determine default branch after fetch. " + "Tried: main, master, develop, origin/HEAD." + ) + run_command( + ["git", "checkout", "-B", default_branch, f"origin/{default_branch}"], + cwd=temp_repo_path, + ) + run_command(["git", "pull"], cwd=temp_repo_path) + + +def _find_baseline( + historical_snapshots: list[dict], first_new_period: str | None, +) -> tuple[str, dict[str, dict[str, int]]] | None: + """ + Find the best incremental-blame baseline from historical snapshots. + + Uses the most recent snapshot whose ``snapshot_date`` is strictly earlier + than *first_new_period*. Falls back to the newest snapshot when no + earlier snapshot exists or when *first_new_period* is ``None``. + + :param historical_snapshots: Existing snapshots on disk. + :param first_new_period: The ``YYYY-MM`` period of the first new snapshot. + :return: ``(commit_hash, file_compositions)`` tuple or ``None``. + """ + if not historical_snapshots: + return None + + if first_new_period: + for snap in reversed(historical_snapshots): + if snap["snapshot_date"] < first_new_period: + commit = snap.get("commit_hash", "") + comp = snap.get("file_compositions") + if commit and comp: + return (commit, comp) + + last_hist = historical_snapshots[-1] + commit = last_hist.get("commit_hash", "") + comp = last_hist.get("file_compositions") + if commit and comp: + return (commit, comp) + return None + + +def _process_snapshots_by_year( + repo_name: str, + temp_repo_path: str, + new_snapshots: list[tuple[str, str]], + historical_snapshots: list[dict], + output_json_path: str, + existing_fossils: dict, +) -> None: + """ + Process new snapshots year-by-year, writing intermediate results after + each year to prevent data loss on crash. + """ + snapshots_by_year = groupby(new_snapshots, key=lambda x: x[0][:4]) + total_new_data = [] + + first_new_period = new_snapshots[0][0] if new_snapshots else None + baseline = _find_baseline(historical_snapshots, first_new_period) + prev_file_data = baseline + if prev_file_data: + logger.info( + "[%s] Using incremental blame from %s", + repo_name, + historical_snapshots[-1]["snapshot_date"], + ) + + for year, year_snapshots in snapshots_by_year: + year_snapshots_list = list(year_snapshots) + year_data = [] + year_start = time.perf_counter() + + logger.info( + "[%s] Processing year %s: %d snapshots", + repo_name, year, len(year_snapshots_list), + ) + + for idx, (period, commit) in enumerate(year_snapshots_list, 1): + logger.info( + "[%s] [%s] Processing %s (%d/%d) — Commit: %s", + repo_name, year, period, idx, len(year_snapshots_list), commit[:7], + ) + + snapshot_start = time.perf_counter() + distribution, file_compositions = analyze_single_snapshot( + temp_repo_path, commit, prev_file_data + ) + snapshot_elapsed = time.perf_counter() - snapshot_start + + prev_file_data = (commit, file_compositions) + + logger.info( + "[%s] [%s] Completed %s in %.2f seconds (%d total lines)", + repo_name, year, period, snapshot_elapsed, + sum(distribution.values()), + ) + + year_data.append({ + "snapshot_date": period, + "commit_hash": commit, + "composition": distribution, + "file_compositions": file_compositions, + }) + + total_new_data.extend(year_data) + year_elapsed = time.perf_counter() - year_start + + new_periods = {s["snapshot_date"] for s in total_new_data} + existing_filtered = [ + s for s in historical_snapshots + if s["snapshot_date"] not in new_periods + ] + final_snapshots = existing_filtered + total_new_data + final_snapshots.sort(key=lambda x: x["snapshot_date"]) + + save_snapshot_data(output_json_path, final_snapshots, existing_fossils) + + logger.info( + "[%s] Completed year %s in %.2f seconds. Wrote %d snapshots.", + repo_name, year, year_elapsed, len(final_snapshots), + ) + + def process_repository( repo_slug: str, data_dir: str, reprocess: str | None = None ) -> None: @@ -304,24 +438,7 @@ def process_repository( output_json_path = os.path.join(data_dir, "raw", f"{repo_name}_data.json") try: - if not os.path.exists(temp_repo_path): - clone_repository(repo_slug, temp_repo_path) - else: - logger.info( - "Repository %s already exists locally. Fetching latest...", repo_name - ) - run_command(["git", "fetch", "--all"], cwd=temp_repo_path) - default_branch = get_default_branch(temp_repo_path) - if default_branch == "HEAD": - raise RuntimeError( - f"[{repo_name}] Cannot determine default branch after fetch. " - "Tried: main, master, develop, origin/HEAD." - ) - run_command( - ["git", "checkout", "-B", default_branch, f"origin/{default_branch}"], - cwd=temp_repo_path, - ) - run_command(["git", "pull"], cwd=temp_repo_path) + _ensure_repo_ready(repo_slug, repo_name, temp_repo_path) state = load_snapshot_data(output_json_path) historical_snapshots = state["snapshots"] @@ -344,92 +461,12 @@ def process_repository( len(new_snapshots), ) - snapshots_by_year = groupby(new_snapshots, key=lambda x: x[0][:4]) - total_new_data = [] - - # Find the previous snapshot for incremental blame baseline - prev_file_data: tuple[str, dict[str, dict[str, int]]] | None = None - if historical_snapshots: - last_hist = historical_snapshots[-1] - hist_commit = last_hist.get("commit_hash", "") - hist_compositions = last_hist.get("file_compositions") - if hist_commit and hist_compositions: - prev_file_data = (hist_commit, hist_compositions) - logger.info( - "[%s] Using incremental blame from %s", - repo_name, - last_hist["snapshot_date"], - ) - - for year, year_snapshots in snapshots_by_year: - year_snapshots_list = list(year_snapshots) - year_data = [] - year_start = time.perf_counter() - - logger.info( - "[%s] Processing year %s: %d snapshots", - repo_name, - year, - len(year_snapshots_list), - ) - - for idx, (period, commit) in enumerate(year_snapshots_list, 1): - logger.info( - "[%s] [%s] Processing %s (%d/%d) — Commit: %s", - repo_name, - year, - period, - idx, - len(year_snapshots_list), - commit[:7], - ) - - snapshot_start = time.perf_counter() - distribution, file_compositions = analyze_single_snapshot( - temp_repo_path, commit, prev_file_data - ) - snapshot_elapsed = time.perf_counter() - snapshot_start - - # Prepare prev_file_data for the next iteration - prev_file_data = (commit, file_compositions) - - logger.info( - "[%s] [%s] Completed %s in %.2f seconds (%d total lines)", - repo_name, - year, - period, - snapshot_elapsed, - sum(distribution.values()), - ) - - year_data.append( - { - "snapshot_date": period, - "commit_hash": commit, - "composition": distribution, - "file_compositions": file_compositions, - } - ) - - total_new_data.extend(year_data) - year_elapsed = time.perf_counter() - year_start - - final_snapshots = historical_snapshots + total_new_data - final_snapshots.sort(key=lambda x: x["snapshot_date"]) - - save_snapshot_data(output_json_path, final_snapshots, existing_fossils) - - logger.info( - "[%s] Completed year %s in %.2f seconds. Wrote %d snapshots.", - repo_name, - year, - year_elapsed, - len(final_snapshots), - ) + _process_snapshots_by_year( + repo_name, temp_repo_path, new_snapshots, + historical_snapshots, output_json_path, existing_fossils, + ) finally: - from _utils import remove_path - remove_path(temp_repo_path) @@ -496,8 +533,6 @@ def main() -> None: if args.reprocess: logger.info("Re-processing period: %s", args.reprocess) - import concurrent.futures - max_top_level_workers = min( len(selected_targets), int(os.getenv("MAX_TOP_LEVEL_WORKERS", os.cpu_count() or 1)), diff --git a/scripts/cleanup_data.py b/scripts/cleanup_data.py index b3e3a4a..05b5b09 100644 --- a/scripts/cleanup_data.py +++ b/scripts/cleanup_data.py @@ -9,14 +9,10 @@ import json import logging -import os import sys from pathlib import Path -# Ensure sibling imports work in all invocation contexts -_SCRIPTS_DIR = Path(__file__).resolve().parent -if str(_SCRIPTS_DIR) not in sys.path: - sys.path.insert(0, str(_SCRIPTS_DIR)) +import _path_guard # noqa: F401 # pylint: disable=unused-import from _data_io import load_snapshot_data, save_snapshot_data from _utils import load_config diff --git a/scripts/run_pipeline.py b/scripts/run_pipeline.py index 338fc09..4924845 100644 --- a/scripts/run_pipeline.py +++ b/scripts/run_pipeline.py @@ -30,10 +30,7 @@ import sys import time -# Ensure sibling imports work in all invocation contexts -_SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) -if _SCRIPTS_DIR not in sys.path: - sys.path.insert(0, _SCRIPTS_DIR) +import _path_guard # noqa: F401 # pylint: disable=unused-import from _utils import load_config from cleanup_data import cleanup_data as run_cleanup diff --git a/scripts/workflow.py b/scripts/workflow.py new file mode 100644 index 0000000..26caefe --- /dev/null +++ b/scripts/workflow.py @@ -0,0 +1,112 @@ +""" +CLI helpers for theseus-engine.yml workflow steps. + +Usage: + python scripts/workflow.py discover-repos + poetry run python scripts/workflow.py build-pr-body + poetry run python scripts/workflow.py validate-graph-files +""" + +import json +import sys +from pathlib import Path + +import _path_guard # noqa: F401 # pylint: disable=unused-import + + +def discover_repos(config_path: str = "theseus.config.json") -> list[str]: + """Return list of repository names from the config file.""" + with open(config_path, encoding="utf-8") as f: + config = json.load(f) + return [r["name"] for r in config.get("repositories", [])] + + +def build_pr_body( + status_dir: str = "data/.status", out_file: str = "pr-body.md" +) -> None: + """Read status markers and write the PR summary markdown body.""" + status_dir_path = Path(status_dir) + if not status_dir_path.is_dir(): + return + + statuses: dict[str, str] = {} + for f in sorted(status_dir_path.glob("*.json")): + with open(f, encoding="utf-8") as fh: + s = json.load(fh) + statuses[s["repo"]] = s["status"] + + total = len(statuses) + passed = sum(1 for v in statuses.values() if v == "success") + + rows = "\n".join( + f"| {repo} | {'✅' if s == 'success' else '❌'} |" + for repo, s in sorted(statuses.items()) + ) + + header = "## Automated Theseus Data Engine Run\n\n" + table = "| Repo | Status |\n|------|--------|\n" + total_row = f"| **Total** | **{passed}/{total} completed** |\n\n" + footer = ( + "This pull request contains the latest pre-computed " + "persistence data for the tracked repositories.\n\n" + "**Trigger:** Monthly Schedule / Workflow Dispatch" + ) + body = header + table + rows + "\n" + total_row + footer + Path(out_file).write_text(body, encoding="utf-8") + + +def validate_graph_files(data_dir: str = "data/processed") -> None: + """Validate all graph JSON files. Exits non-zero on failure.""" + processed_path = Path(data_dir) + files = sorted(processed_path.glob("*.json")) + + if not files: + print("No processed files found to validate.") + sys.exit(1) + + errors = 0 + for f in files: + try: + data = json.loads(f.read_text(encoding="utf-8")) + if "snapshots" not in data: + raise ValueError(f"Missing snapshots in {f}") + if "fossils" not in data: + raise ValueError(f"Missing fossils in {f}") + for snap in data["snapshots"]: + if "snapshot_date" not in snap: + raise ValueError(f"Missing snapshot_date in {f}") + if "composition" not in snap: + raise ValueError(f"Missing composition in {f}") + print(f" {f.name}") + except (json.JSONDecodeError, ValueError, KeyError) as e: + print(f" {f.name}: {e}") + errors += 1 + + if errors: + print(f"Validation failed: {errors} error(s)") + sys.exit(1) + print("All graph files validated.") + + +def main() -> None: + """CLI entry point: dispatch to subcommand.""" + command = sys.argv[1] if len(sys.argv) > 1 else "" + + if command == "discover-repos": + names = discover_repos() + print(json.dumps(names)) + elif command == "build-pr-body": + build_pr_body() + elif command == "validate-graph-files": + validate_graph_files() + else: + print(f"Usage: python {sys.argv[0]} ", file=sys.stderr) + print( + "Commands: discover-repos, build-pr-body, validate-graph-files", + file=sys.stderr, + ) + sys.exit(1) + + +if __name__ == "__main__": + main()