diff --git a/roar/execution/framework/registry.py b/roar/execution/framework/registry.py index 5f70b164..5c0d584b 100644 --- a/roar/execution/framework/registry.py +++ b/roar/execution/framework/registry.py @@ -17,6 +17,10 @@ "roar.backends.osmo.plugin", "roar.backends.local.plugin", ) +# Builtin backend plugin modules skipped during discovery because their +# compiled deps failed to import (e.g. a wrong-ABI wheel under a cross-Python +# `roar run`). Maps module name -> import error string. Diagnostics only. +_skipped_builtin_backend_imports: dict[str, str] = {} # Well-known job-environment markers for built-in backends. Each marker # is also declared on its backend's ``ExecutionPolicyAdapter`` (single # source of truth at the policy level — see consistency tests in @@ -290,7 +294,16 @@ def _ensure_execution_backends_discovered() -> None: def _load_builtin_execution_backends() -> None: for module_name in _BUILTIN_EXECUTION_BACKEND_MODULES: - module = importlib.import_module(module_name) + try: + module = importlib.import_module(module_name) + except ImportError as exc: + # A builtin backend whose compiled deps can't import — e.g. a + # wrong-ABI wheel (pydantic_core, cryptography, ...) under a + # cross-Python `roar run` — must not take down discovery for the + # other backends or crash the traced workload. Skip it; it's simply + # absent from the registry. Recorded for diagnostics/tests. + _skipped_builtin_backend_imports[module_name] = str(exc) + continue register = getattr(module, "register", None) if not callable(register): raise TypeError( @@ -302,7 +315,15 @@ def _load_builtin_execution_backends() -> None: def _load_entrypoint_execution_backends() -> None: for entry_point in _iter_execution_backend_entrypoints(): - payload = entry_point.load() + try: + payload = entry_point.load() + except ImportError as exc: + # Same resilience as the builtin loader: an entry-point backend + # whose compiled deps can't import (wrong-ABI wheel under a + # cross-Python `roar run`) is skipped rather than crashing + # discovery and the traced workload. + _skipped_builtin_backend_imports[entry_point.value] = str(exc) + continue _register_entrypoint_payload(payload) diff --git a/roar/execution/framework/runtime_imports.py b/roar/execution/framework/runtime_imports.py index caca8e14..5ca66155 100644 --- a/roar/execution/framework/runtime_imports.py +++ b/roar/execution/framework/runtime_imports.py @@ -34,6 +34,16 @@ def disable_backend_dispatch(self) -> None: """ self._backend_dispatch_disabled = True + def enable_backend_dispatch(self) -> None: + """Re-enable backend dispatch after a prior :meth:`disable_backend_dispatch`. + + Used by the sitecustomize gate: on an ABI mismatch it disables dispatch + *before* attempting an in-process runtime repair (so the repair's own + imports don't trigger a wrong-ABI backend load), then re-enables here + once an ABI-matched runtime has been made reachable. + """ + self._backend_dispatch_disabled = False + def resolve_selected_backend(self) -> ExecutionBackend | None: backend_name = str(self._environ.get(ROAR_EXECUTION_BACKEND_ENV) or "").strip() if not backend_name: diff --git a/roar/execution/runtime/inject/sitecustomize.py b/roar/execution/runtime/inject/sitecustomize.py index db3b2c74..11846089 100644 --- a/roar/execution/runtime/inject/sitecustomize.py +++ b/roar/execution/runtime/inject/sitecustomize.py @@ -35,7 +35,11 @@ def _prepend_roar_runtime_pythonpath() -> None: _prepend_roar_runtime_pythonpath() from roar.execution.framework.runtime_imports import RuntimeImportController -from roar.execution.runtime.inject.support import matching_compiled_pydantic_core +from roar.execution.runtime.inject.support import ( + SuppressTracking, + apply_runtime_gate, + matching_compiled_pydantic_core, +) from roar.execution.runtime.inject.tracker import RuntimeInjectionTracker LOG_FILE = os.environ.get("ROAR_LOG_FILE") @@ -57,24 +61,74 @@ def _prepend_roar_runtime_pythonpath() -> None: _runtime_tracker.install() +def _repair_runtime_in_process(expected_soabi: str) -> bool: + """Install + prepend an ABI-matched runtime tree for *this* interpreter. + + Returns True if a matching compiled ``pydantic_core`` is reachable + afterwards. This runs inside the real traced process, so the ABI is known + for certain no matter how Python was launched — the launch-time prewarm + only fires for a direct ``python`` target, so wrapper launches (torchrun, + ``uv run``, shell scripts) repair here instead. Honors + ``ROAR_RUNTIME_INSTALL=skip`` (via ``ensure_runtime``) and degrades quietly + on any failure; the caller then leaves backend dispatch disabled. + + The caller must disable backend dispatch *before* invoking this: the imports + below pass through the tracker's patched ``__import__``, which would + otherwise trigger backend discovery (loading the Ray/OSMO plugin → importing + the wrong-ABI ``pydantic_core`` → the very crash we are repairing). Tracking + is suppressed so the repair's own file I/O doesn't land in workload lineage. + """ + try: + with SuppressTracking(): + from roar import __version__ as roar_version + from roar.execution.runtime.lazy_install import ensure_runtime + + tree = ensure_runtime( + target_python=sys.executable, + target_abi=sys.implementation.cache_tag, + bundled_abi=None, + roar_version=roar_version, + ) + except Exception: + return False + if tree is None: + return False + tree_str = str(tree) + if tree_str not in sys.path: + sys.path.insert(0, tree_str) + return matching_compiled_pydantic_core(sys.path, expected_soabi) + + +def _runtime_gate_degrade_message(running_abi: tuple[int, int]) -> str: + return ( + f"roar: no ABI-matched runtime found for Python {running_abi[0]}.{running_abi[1]}.\n" + f" Backend integrations (Ray, OSMO) are disabled for this run.\n" + f" File I/O is still captured.\n" + f" Fix one of:\n" + f" - Install roar in this Python: pip install roar-cli\n" + f" - Reinstall roar-cli under matching Python:\n" + f" uv tool install --python python{running_abi[0]}.{running_abi[1]} " + f"roar-cli --reinstall\n" + ) + + if os.environ.get("ROAR_WRAP") == "1": _running_abi = (sys.version_info.major, sys.version_info.minor) _expected_soabi = f"cpython-{_running_abi[0]}{_running_abi[1]}" - if not matching_compiled_pydantic_core(sys.path, _expected_soabi): - sys.stderr.write( - f"roar: no ABI-matched runtime found for Python " - f"{_running_abi[0]}.{_running_abi[1]}.\n" - f" Backend integrations (Ray, OSMO) are disabled for this run.\n" - f" File I/O is still captured.\n" - f" Fix one of:\n" - f" - Install roar in this Python: pip install roar-cli\n" - f" - Reinstall roar-cli under matching Python:\n" - f" uv tool install --python python{_running_abi[0]}.{_running_abi[1]} " - f"roar-cli --reinstall\n" - ) - _runtime_import_controller.disable_backend_dispatch() - else: - _runtime_import_controller.initialize_selected_backend() + + def _emit_runtime_gate_degrade() -> None: + sys.stderr.write(_runtime_gate_degrade_message(_running_abi)) + + # Backend interception (Ray, OSMO) is a primary job of the injection, so on + # an ABI mismatch try to repair in-process before giving up on it. The + # disable-before-repair / enable-only-on-success ordering lives in + # apply_runtime_gate (unit-tested in test_inject_support.py). + apply_runtime_gate( + _runtime_import_controller, + matched=matching_compiled_pydantic_core(sys.path, _expected_soabi), + repair=lambda: _repair_runtime_in_process(_expected_soabi), + on_degrade=_emit_runtime_gate_degrade, + ) atexit.register(_runtime_tracker.write_log) diff --git a/roar/execution/runtime/inject/support.py b/roar/execution/runtime/inject/support.py index b6a8c3e5..c6426ae3 100644 --- a/roar/execution/runtime/inject/support.py +++ b/roar/execution/runtime/inject/support.py @@ -6,10 +6,49 @@ import shutil import sys import threading +from collections.abc import Callable +from typing import Protocol _roar_suppress = threading.local() +class _BackendDispatchController(Protocol): + """The slice of ``RuntimeImportController`` the runtime gate drives.""" + + def disable_backend_dispatch(self) -> None: ... + def enable_backend_dispatch(self) -> None: ... + def initialize_selected_backend(self) -> None: ... + + +def apply_runtime_gate( + controller: _BackendDispatchController, + *, + matched: bool, + repair: Callable[[], bool], + on_degrade: Callable[[], None], +) -> None: + """Decide backend dispatch for a traced process based on ABI match. + + - ``matched`` (bundled/co-installed deps are the right ABI): enable backends. + - mismatch: disable dispatch **first** — so the repair's own imports can't + trigger a wrong-ABI backend load (the original cross-Python crash) — then + attempt ``repair``; re-enable and initialize **only** if repair made + ABI-matched deps reachable, otherwise ``on_degrade``. + + Extracted from ``sitecustomize`` so the ordering (disable-before-repair, + enable-only-on-success) is unit-testable without importing the module body. + """ + if matched: + controller.initialize_selected_backend() + return + controller.disable_backend_dispatch() + if repair(): + controller.enable_backend_dispatch() + controller.initialize_selected_backend() + else: + on_degrade() + + def bundled_abi_tag(inject_dir: str) -> str | None: """Return the cpython ABI tag of roar's bundled compiled deps, or None. diff --git a/roar/execution/runtime/lazy_install.py b/roar/execution/runtime/lazy_install.py index a41108a6..b5afc2f0 100644 --- a/roar/execution/runtime/lazy_install.py +++ b/roar/execution/runtime/lazy_install.py @@ -21,14 +21,24 @@ import sys import tempfile import time +from collections.abc import Iterator from pathlib import Path +try: + import fcntl +except ImportError: # non-POSIX; locking degrades to best-effort (no lock) + fcntl = None # type: ignore[assignment] + # Deps backend dispatch needs in the traced Python. Kept short — pip/uv # resolves transitive deps. Unpinned: roar tolerates any pydantic 2.x. _RUNTIME_DEPS: tuple[str, ...] = ("pydantic", "blake3") _STAMP_FILENAME = "roar_runtime.json" _INSTALL_TIMEOUT_SECONDS = 180 +# A waiter must outlast a winner that is mid-install, so the lock timeout sits +# above the install timeout — otherwise a waiter could give up while the winner +# is still legitimately working. +_LOCK_TIMEOUT_SECONDS = _INSTALL_TIMEOUT_SECONDS + 30 def runtime_cache_root() -> Path: @@ -100,6 +110,7 @@ def install_runtime( text=True, timeout=_INSTALL_TIMEOUT_SECONDS, check=False, + env=_clean_subprocess_env(), ) except (OSError, subprocess.SubprocessError) as exc: sys.stderr.write(f"🦖 install failed: {exc}\n") @@ -133,6 +144,82 @@ def install_runtime( return is_runtime_cached(abi_tag, roar_version) +def _clean_subprocess_env() -> dict[str, str]: + """Env for the installer subprocess with roar's injection stripped out. + + The in-process repair path (``sitecustomize``) can call ``install_runtime`` + from *inside* a traced process, where ``ROAR_WRAP=1`` and roar's inject dir + is on ``PYTHONPATH``. Left in place, the installer's own Python would + re-inject roar into itself — recursion and polluted lineage. Dropping the + wrap flag and the inject dir makes the installer run as a plain, untraced + process. Harmless when called from the (already-clean) launch-time path. + """ + env = dict(os.environ) + env.pop("ROAR_WRAP", None) + env.pop("ROAR_RUNTIME_PYTHONPATH", None) + env.pop("ROAR_RUNTIME_PYTHONPATH_ACTIVE", None) + + inject_dir = os.path.realpath(str(Path(__file__).resolve().parent / "inject")) + pythonpath = env.get("PYTHONPATH") + if pythonpath: + kept = [ + entry + for entry in pythonpath.split(os.pathsep) + if entry and os.path.realpath(entry) != inject_dir + ] + if kept: + env["PYTHONPATH"] = os.pathsep.join(kept) + else: + env.pop("PYTHONPATH", None) + return env + + +@contextlib.contextmanager +def _install_lock(abi_tag: str, timeout: float = _LOCK_TIMEOUT_SECONDS) -> Iterator[bool]: + """Hold an exclusive cross-process lock for installing one ABI tree. + + Yields ``True`` once the lock is held, or ``False`` if it could not be + acquired within ``timeout`` (the caller should then degrade to the gate + rather than block forever). Best-effort: with no ``fcntl`` (non-POSIX) the + lock is a no-op and we yield ``True``. + + Used to collapse the torchrun "thundering herd": one worker per GPU can + reach the installer at the same instant on a cold cache; the lock lets the + winner install while the rest wait, then re-check the cache and reuse it. + """ + if fcntl is None: + yield True + return + try: + cache_root = runtime_cache_root() + cache_root.mkdir(parents=True, exist_ok=True) + except OSError: + # Can't even make the cache dir; let install_runtime surface the error. + yield True + return + + lock_path = cache_root / f".{abi_tag}.lock" + fd = os.open(str(lock_path), os.O_CREAT | os.O_RDWR, 0o644) + acquired = False + deadline = time.monotonic() + timeout + try: + while True: + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + acquired = True + break + except OSError: + if time.monotonic() >= deadline: + break + time.sleep(0.2) + yield acquired + finally: + if acquired: + with contextlib.suppress(OSError): + fcntl.flock(fd, fcntl.LOCK_UN) + os.close(fd) + + def _select_installer( target_python: str, target_dir: Path, deps: tuple[str, ...] ) -> list[str] | None: @@ -212,6 +299,16 @@ def ensure_runtime( if is_runtime_cached(target_abi, roar_version): return runtime_site_packages(target_abi) - if install_runtime(target_abi, target_python, roar_version): - return runtime_site_packages(target_abi) + # Cold cache. Under a wrapper launch (torchrun, accelerate, ...) every + # worker process reaches here at once, so serialize on a per-ABI lock and + # re-check the cache after acquiring it: the winner installs once, the rest + # find the freshly-installed tree and reuse it instead of racing N installs. + with _install_lock(target_abi) as acquired: + if is_runtime_cached(target_abi, roar_version): + return runtime_site_packages(target_abi) + if not acquired: + # Timed out waiting on another installer; degrade to the gate. + return None + if install_runtime(target_abi, target_python, roar_version): + return runtime_site_packages(target_abi) return None diff --git a/tests/execution/framework/test_execution_planning.py b/tests/execution/framework/test_execution_planning.py index c0d0556c..4aa724c2 100644 --- a/tests/execution/framework/test_execution_planning.py +++ b/tests/execution/framework/test_execution_planning.py @@ -411,3 +411,40 @@ def _register(): assert loads == ["load"] assert backends_first == (fake_backend,) assert backends_second == (fake_backend,) + + +def test_builtin_backend_discovery_skips_unimportable_plugin(monkeypatch) -> None: + """A builtin backend whose compiled deps can't import — e.g. a wrong-ABI + wheel (pydantic_core, cryptography, ...) under a cross-Python ``roar run`` — + is skipped rather than crashing discovery for the other backends or the + traced workload. + """ + import roar.execution.framework.registry as module + + monkeypatch.setattr(module, "_registered_execution_backends", []) + monkeypatch.setattr(module, "_execution_backends_discovered", False) + monkeypatch.setattr(module, "_execution_backends_discovering", False) + monkeypatch.setattr(module, "_skipped_builtin_backend_imports", {}) + monkeypatch.setattr( + module, + "_BUILTIN_EXECUTION_BACKEND_MODULES", + ("roar.backends.ray.plugin", "roar.backends.local.plugin"), + ) + monkeypatch.setattr(module, "_iter_execution_backend_entrypoints", lambda: ()) + + real_import = module.importlib.import_module + + def fake_import(name: str): + if name == "roar.backends.ray.plugin": + raise ImportError("_rust.abi3.so: undefined symbol: PyType_GetName") + return real_import(name) + + monkeypatch.setattr(module.importlib, "import_module", fake_import) + + module._ensure_execution_backends_discovered() # must not raise + + assert module._execution_backends_discovered is True + assert "roar.backends.ray.plugin" in module._skipped_builtin_backend_imports + registered = {backend.name for backend in module._registered_execution_backends} + assert "ray" not in registered # unimportable plugin skipped + assert "local" in registered # importable plugin still registered diff --git a/tests/execution/runtime/test_inject_support.py b/tests/execution/runtime/test_inject_support.py index ff3fa906..903dd326 100644 --- a/tests/execution/runtime/test_inject_support.py +++ b/tests/execution/runtime/test_inject_support.py @@ -4,11 +4,87 @@ from roar.execution.runtime.inject.support import ( abi_minor_version, + apply_runtime_gate, bundled_abi_tag, matching_compiled_pydantic_core, ) +class _RecordingController: + """Records backend-dispatch lifecycle calls into a shared event list.""" + + def __init__(self, events: list[str]) -> None: + self._events = events + + def disable_backend_dispatch(self) -> None: + self._events.append("disable") + + def enable_backend_dispatch(self) -> None: + self._events.append("enable") + + def initialize_selected_backend(self) -> None: + self._events.append("init") + + +def test_apply_runtime_gate_matched_initializes_without_repair() -> None: + events: list[str] = [] + repair_called: list[bool] = [] + degraded: list[bool] = [] + + apply_runtime_gate( + _RecordingController(events), + matched=True, + repair=lambda: bool(repair_called.append(True)) or True, + on_degrade=lambda: degraded.append(True), + ) + + assert events == ["init"] # matched ABI → backends straight on + assert repair_called == [] # no repair attempted + assert degraded == [] + + +def test_apply_runtime_gate_disables_before_repair_then_reenables_on_success() -> None: + events: list[str] = [] + degraded: list[bool] = [] + + def repair() -> bool: + events.append("repair") + return True + + apply_runtime_gate( + _RecordingController(events), + matched=False, + repair=repair, + on_degrade=lambda: degraded.append(True), + ) + + # Critical ordering: dispatch is disabled BEFORE repair runs (so repair's + # own imports can't trigger a wrong-ABI backend load), then re-enabled and + # initialized only after a successful repair. + assert events == ["disable", "repair", "enable", "init"] + assert degraded == [] + + +def test_apply_runtime_gate_degrades_when_repair_fails() -> None: + events: list[str] = [] + degraded: list[bool] = [] + + def repair() -> bool: + events.append("repair") + return False + + apply_runtime_gate( + _RecordingController(events), + matched=False, + repair=repair, + on_degrade=lambda: degraded.append(True), + ) + + # Repair failed: dispatch stays disabled (no enable/init), degrade fires. + assert events == ["disable", "repair"] + assert degraded == [True] + + def _make_inject_layout(root: Path, compiled_pkg: str, so_filename: str) -> Path: """Create a fake site-packages tree and return the simulated inject dir.""" site_pkg = root / "site-packages" diff --git a/tests/execution/runtime/test_lazy_install.py b/tests/execution/runtime/test_lazy_install.py index 7da192d0..95341382 100644 --- a/tests/execution/runtime/test_lazy_install.py +++ b/tests/execution/runtime/test_lazy_install.py @@ -1,7 +1,10 @@ from __future__ import annotations import json +import os import subprocess +import threading +import time from pathlib import Path from unittest.mock import MagicMock @@ -275,3 +278,88 @@ def test_ensure_runtime_returns_none_on_install_failure( mode="auto", ) assert result is None + + +# --------------------------------------------------------------------------- +# Concurrency — the torchrun "thundering herd" (one worker per GPU, all hitting +# a cold cache at once must collapse to a single install, not N). +# --------------------------------------------------------------------------- + + +def test_ensure_runtime_serializes_concurrent_installs( + cache_root: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + install_calls: list[str] = [] + install_calls_lock = threading.Lock() + + def fake_install(abi_tag: str, target_python: str, roar_version: str, *_a, **_kw) -> bool: + with install_calls_lock: + install_calls.append(abi_tag) + # Widen the race window so a missing lock reliably lets every worker in. + time.sleep(0.2) + abi_dir = cache_root / abi_tag + abi_dir.mkdir(parents=True, exist_ok=True) + (abi_dir / "site-packages").mkdir(exist_ok=True) + (abi_dir / "roar_runtime.json").write_text(json.dumps({"roar_version": roar_version})) + return True + + monkeypatch.setattr(lazy_install, "install_runtime", fake_install) + + workers = 8 + start = threading.Barrier(workers) + results: list[Path | None] = [None] * workers + + def worker(idx: int) -> None: + start.wait() # release all workers into ensure_runtime simultaneously + results[idx] = lazy_install.ensure_runtime( + target_python="/usr/bin/python3.10", + target_abi="cp310", + bundled_abi="cp313", + roar_version="0.3.0", + mode="auto", + ) + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(workers)] + for t in threads: + t.start() + for t in threads: + t.join() + + # The lock winner installs once; the other 7 re-check the cache and reuse it. + assert install_calls == ["cp310"] + expected = cache_root / "cp310" / "site-packages" + assert all(r == expected for r in results) + + +# --------------------------------------------------------------------------- +# Installer subprocess env — must not re-inject roar into itself when called +# from inside a traced process (the in-process repair path). +# --------------------------------------------------------------------------- + + +def test_clean_subprocess_env_strips_roar_injection(monkeypatch: pytest.MonkeyPatch) -> None: + inject_dir = str(Path(lazy_install.__file__).resolve().parent / "inject") + monkeypatch.setenv("ROAR_WRAP", "1") + monkeypatch.setenv("ROAR_RUNTIME_PYTHONPATH", "/cache/cp310/site-packages") + monkeypatch.setenv("ROAR_RUNTIME_PYTHONPATH_ACTIVE", "/cache/cp310/site-packages") + monkeypatch.setenv("PYTHONPATH", os.pathsep.join([inject_dir, "/keep/me"])) + monkeypatch.setenv("PATH", "/usr/bin") # unrelated vars survive + + env = lazy_install._clean_subprocess_env() + + assert "ROAR_WRAP" not in env + assert "ROAR_RUNTIME_PYTHONPATH" not in env + assert "ROAR_RUNTIME_PYTHONPATH_ACTIVE" not in env + assert env["PYTHONPATH"] == "/keep/me" # inject dir dropped, rest kept + assert env["PATH"] == "/usr/bin" + + +def test_clean_subprocess_env_drops_pythonpath_when_only_inject( + monkeypatch: pytest.MonkeyPatch, +) -> None: + inject_dir = str(Path(lazy_install.__file__).resolve().parent / "inject") + monkeypatch.setenv("PYTHONPATH", inject_dir) + + env = lazy_install._clean_subprocess_env() + + assert "PYTHONPATH" not in env diff --git a/tests/execution/runtime/test_runtime_imports.py b/tests/execution/runtime/test_runtime_imports.py index b22f8646..8914767b 100644 --- a/tests/execution/runtime/test_runtime_imports.py +++ b/tests/execution/runtime/test_runtime_imports.py @@ -167,3 +167,29 @@ def test_disable_backend_dispatch_short_circuits_handle_import( assert result is None assert calls == [] assert ROAR_EXECUTION_BACKEND_ENV not in controller._environ + + +def test_enable_backend_dispatch_reenables_after_disable( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """The sitecustomize gate disables dispatch while it repairs a cross-ABI + runtime, then re-enables it once matched deps are reachable. + """ + import roar.execution.framework.runtime_imports as runtime_imports + + calls: list[str] = [] + fake_backend = _fake_backend(calls) + controller = RuntimeImportController({ROAR_EXECUTION_BACKEND_ENV: "fake"}) + monkeypatch.setattr( + runtime_imports, + "get_execution_backend", + lambda backend_name: fake_backend if backend_name == "fake" else None, + ) + + controller.disable_backend_dispatch() + controller.initialize_selected_backend() + assert calls == [] # disabled → no init + + controller.enable_backend_dispatch() + controller.initialize_selected_backend() + assert calls == ["initialize"] # re-enabled → backend initializes diff --git a/tests/integration/test_cross_python_runtime_repair.py b/tests/integration/test_cross_python_runtime_repair.py new file mode 100644 index 00000000..addce649 --- /dev/null +++ b/tests/integration/test_cross_python_runtime_repair.py @@ -0,0 +1,138 @@ +"""Integration coverage for the cross-Python in-process runtime repair. + +Reproduces the real failure mode the unit tests can only approximate: a +NON-python launcher (a shell script, standing in for ``torchrun`` / ``uv run``) +spawns several workers on a *different* CPython than the one roar's bundled deps +were built for. Each worker's ``sitecustomize`` gate must repair its own runtime +in-process (so wrapper launches work at all), and N concurrent workers must +collapse to a single install (the per-ABI lock), not a thundering herd. + +Skipped unless ``uv`` and a second CPython minor are available — real coverage +where the environment supports it (dev boxes, the integration CI lane), silent +elsewhere. The unit test ``test_apply_runtime_gate_*`` guards the gate ordering +in every CI job regardless. +""" + +from __future__ import annotations + +import json +import os +import shutil +import subprocess +import sys +import textwrap +from pathlib import Path + +import pydantic_core +import pytest + +import roar + +pytestmark = pytest.mark.integration + +_INJECT_DIR = str(Path(roar.__file__).resolve().parent / "execution" / "runtime" / "inject") +# Roar's importable root + the site-packages carrying the *current* (and so, +# for a different-ABI worker, wrong-ABI) pydantic_core — mirrors what the tracer +# puts on ROAR_RUNTIME_PYTHONPATH for a cross-Python child. +_SOURCE_ROOT = str(Path(roar.__file__).resolve().parent.parent) +_CURRENT_SITE_PACKAGES = str(Path(pydantic_core.__file__).resolve().parent.parent) + +_WORKER_PAYLOAD = textwrap.dedent( + """ + import json, sys + try: + import pydantic_core + print(json.dumps({"ok": True, "file": pydantic_core.__file__, + "tag": sys.implementation.cache_tag})) + except Exception as exc: # noqa: BLE001 + print(json.dumps({"ok": False, "err": repr(exc), + "tag": sys.implementation.cache_tag})) + """ +) + + +def _find_other_interpreter() -> str | None: + """An installed CPython whose minor differs from the test interpreter's.""" + uv = shutil.which("uv") + if not uv: + return None + current = f"{sys.version_info.major}.{sys.version_info.minor}" + for minor in ("3.10", "3.11", "3.12", "3.13"): + if minor == current: + continue + proc = subprocess.run( + [uv, "python", "find", minor], capture_output=True, text=True, check=False + ) + path = proc.stdout.strip() + if proc.returncode == 0 and path and Path(path).exists(): + return path + return None + + +def _cache_tag(interpreter: str) -> str: + proc = subprocess.run( + [interpreter, "-c", "import sys; print(sys.implementation.cache_tag)"], + capture_output=True, + text=True, + check=True, + ) + return proc.stdout.strip() + + +def test_wrapper_launch_repairs_runtime_in_process_and_installs_once(tmp_path: Path) -> None: + other = _find_other_interpreter() + if other is None: + pytest.skip("needs uv and a second installed CPython minor") + + worker_count = 4 + out_dir = tmp_path / "out" + out_dir.mkdir() + launcher = tmp_path / "launcher.sh" + # argv[0] is `bash` (the launcher), NOT python — so the launch-time ABI + # probe bails and repair must happen in-process inside each worker. + launcher.write_text( + "#!/usr/bin/env bash\nset -u\n" + 'PY="$1"; OUT="$2"; N="$3"\n' + "pids=()\n" + 'for i in $(seq 1 "$N"); do\n' + ' "$PY" -c "$PAYLOAD" >"$OUT/w$i.json" 2>"$OUT/w$i.err" &\n' + " pids+=($!)\n" + "done\n" + 'for p in "${pids[@]}"; do wait "$p"; done\n' + ) + + env = { + **os.environ, + "PYTHONPATH": os.pathsep.join([_INJECT_DIR, _SOURCE_ROOT, _CURRENT_SITE_PACKAGES]), + "ROAR_WRAP": "1", + "XDG_CACHE_HOME": str(tmp_path / "xdg"), + "PAYLOAD": _WORKER_PAYLOAD, + } + + subprocess.run( + ["bash", str(launcher), other, str(out_dir), str(worker_count)], + env=env, + check=True, + timeout=300, + ) + + results = [json.loads(p.read_text() or "{}") for p in sorted(out_dir.glob("w*.json"))] + errs = "\n".join(p.read_text() for p in sorted(out_dir.glob("w*.err"))) + + # If the install couldn't run (offline box), don't fail the suite for it. + if any( + "install failed" in (out_dir / f"w{i + 1}.err").read_text() for i in range(worker_count) + ): + pytest.skip(f"runtime install unavailable in this environment:\n{errs}") + + assert len(results) == worker_count + # Every worker imported an ABI-matched pydantic_core (not roar's wrong-ABI + # bundled copy) — i.e. the in-process repair fired despite the wrapper. + other_tag = _cache_tag(other) + for result in results: + assert result.get("ok") is True, f"worker crashed: {result}\n{errs}" + assert f"/roar/runtime/{other_tag}/" in result["file"], result + + # The per-ABI lock collapsed N concurrent installs into exactly one. + install_lines = errs.count("installing roar runtime") + assert install_lines == 1, f"expected a single install across workers, got {install_lines}"