Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions roar/execution/framework/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
"roar.backends.osmo.plugin",
"roar.backends.local.plugin",
)
# Builtin backend plugin modules skipped during discovery because their
# compiled deps failed to import (e.g. a wrong-ABI wheel under a cross-Python
# `roar run`). Maps module name -> import error string. Diagnostics only.
_skipped_builtin_backend_imports: dict[str, str] = {}
# Well-known job-environment markers for built-in backends. Each marker
# is also declared on its backend's ``ExecutionPolicyAdapter`` (single
# source of truth at the policy level — see consistency tests in
Expand Down Expand Up @@ -290,7 +294,16 @@ def _ensure_execution_backends_discovered() -> None:

def _load_builtin_execution_backends() -> None:
for module_name in _BUILTIN_EXECUTION_BACKEND_MODULES:
module = importlib.import_module(module_name)
try:
module = importlib.import_module(module_name)
except ImportError as exc:
# A builtin backend whose compiled deps can't import — e.g. a
# wrong-ABI wheel (pydantic_core, cryptography, ...) under a
# cross-Python `roar run` — must not take down discovery for the
# other backends or crash the traced workload. Skip it; it's simply
# absent from the registry. Recorded for diagnostics/tests.
_skipped_builtin_backend_imports[module_name] = str(exc)
continue
register = getattr(module, "register", None)
if not callable(register):
raise TypeError(
Expand All @@ -302,7 +315,15 @@ def _load_builtin_execution_backends() -> None:

def _load_entrypoint_execution_backends() -> None:
for entry_point in _iter_execution_backend_entrypoints():
payload = entry_point.load()
try:
payload = entry_point.load()
except ImportError as exc:
# Same resilience as the builtin loader: an entry-point backend
# whose compiled deps can't import (wrong-ABI wheel under a
# cross-Python `roar run`) is skipped rather than crashing
# discovery and the traced workload.
_skipped_builtin_backend_imports[entry_point.value] = str(exc)
continue
_register_entrypoint_payload(payload)


Expand Down
10 changes: 10 additions & 0 deletions roar/execution/framework/runtime_imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ def disable_backend_dispatch(self) -> None:
"""
self._backend_dispatch_disabled = True

def enable_backend_dispatch(self) -> None:
"""Re-enable backend dispatch after a prior :meth:`disable_backend_dispatch`.

Used by the sitecustomize gate: on an ABI mismatch it disables dispatch
*before* attempting an in-process runtime repair (so the repair's own
imports don't trigger a wrong-ABI backend load), then re-enables here
once an ABI-matched runtime has been made reachable.
"""
self._backend_dispatch_disabled = False

def resolve_selected_backend(self) -> ExecutionBackend | None:
backend_name = str(self._environ.get(ROAR_EXECUTION_BACKEND_ENV) or "").strip()
if not backend_name:
Expand Down
86 changes: 70 additions & 16 deletions roar/execution/runtime/inject/sitecustomize.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ def _prepend_roar_runtime_pythonpath() -> None:
_prepend_roar_runtime_pythonpath()

from roar.execution.framework.runtime_imports import RuntimeImportController
from roar.execution.runtime.inject.support import matching_compiled_pydantic_core
from roar.execution.runtime.inject.support import (
SuppressTracking,
apply_runtime_gate,
matching_compiled_pydantic_core,
)
from roar.execution.runtime.inject.tracker import RuntimeInjectionTracker

LOG_FILE = os.environ.get("ROAR_LOG_FILE")
Expand All @@ -57,24 +61,74 @@ def _prepend_roar_runtime_pythonpath() -> None:
_runtime_tracker.install()


def _repair_runtime_in_process(expected_soabi: str) -> bool:
"""Install + prepend an ABI-matched runtime tree for *this* interpreter.

Returns True if a matching compiled ``pydantic_core`` is reachable
afterwards. This runs inside the real traced process, so the ABI is known
for certain no matter how Python was launched — the launch-time prewarm
only fires for a direct ``python`` target, so wrapper launches (torchrun,
``uv run``, shell scripts) repair here instead. Honors
``ROAR_RUNTIME_INSTALL=skip`` (via ``ensure_runtime``) and degrades quietly
on any failure; the caller then leaves backend dispatch disabled.

The caller must disable backend dispatch *before* invoking this: the imports
below pass through the tracker's patched ``__import__``, which would
otherwise trigger backend discovery (loading the Ray/OSMO plugin → importing
the wrong-ABI ``pydantic_core`` → the very crash we are repairing). Tracking
is suppressed so the repair's own file I/O doesn't land in workload lineage.
"""
try:
with SuppressTracking():
from roar import __version__ as roar_version
from roar.execution.runtime.lazy_install import ensure_runtime

tree = ensure_runtime(
target_python=sys.executable,
target_abi=sys.implementation.cache_tag,
bundled_abi=None,
roar_version=roar_version,
)
except Exception:
return False
if tree is None:
return False
tree_str = str(tree)
if tree_str not in sys.path:
sys.path.insert(0, tree_str)
return matching_compiled_pydantic_core(sys.path, expected_soabi)


def _runtime_gate_degrade_message(running_abi: tuple[int, int]) -> str:
return (
f"roar: no ABI-matched runtime found for Python {running_abi[0]}.{running_abi[1]}.\n"
f" Backend integrations (Ray, OSMO) are disabled for this run.\n"
f" File I/O is still captured.\n"
f" Fix one of:\n"
f" - Install roar in this Python: pip install roar-cli\n"
f" - Reinstall roar-cli under matching Python:\n"
f" uv tool install --python python{running_abi[0]}.{running_abi[1]} "
f"roar-cli --reinstall\n"
)


if os.environ.get("ROAR_WRAP") == "1":
_running_abi = (sys.version_info.major, sys.version_info.minor)
_expected_soabi = f"cpython-{_running_abi[0]}{_running_abi[1]}"
if not matching_compiled_pydantic_core(sys.path, _expected_soabi):
sys.stderr.write(
f"roar: no ABI-matched runtime found for Python "
f"{_running_abi[0]}.{_running_abi[1]}.\n"
f" Backend integrations (Ray, OSMO) are disabled for this run.\n"
f" File I/O is still captured.\n"
f" Fix one of:\n"
f" - Install roar in this Python: pip install roar-cli\n"
f" - Reinstall roar-cli under matching Python:\n"
f" uv tool install --python python{_running_abi[0]}.{_running_abi[1]} "
f"roar-cli --reinstall\n"
)
_runtime_import_controller.disable_backend_dispatch()
else:
_runtime_import_controller.initialize_selected_backend()

def _emit_runtime_gate_degrade() -> None:
sys.stderr.write(_runtime_gate_degrade_message(_running_abi))

# Backend interception (Ray, OSMO) is a primary job of the injection, so on
# an ABI mismatch try to repair in-process before giving up on it. The
# disable-before-repair / enable-only-on-success ordering lives in
# apply_runtime_gate (unit-tested in test_inject_support.py).
apply_runtime_gate(
_runtime_import_controller,
matched=matching_compiled_pydantic_core(sys.path, _expected_soabi),
repair=lambda: _repair_runtime_in_process(_expected_soabi),
on_degrade=_emit_runtime_gate_degrade,
)


atexit.register(_runtime_tracker.write_log)
39 changes: 39 additions & 0 deletions roar/execution/runtime/inject/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,49 @@
import shutil
import sys
import threading
from collections.abc import Callable
from typing import Protocol

_roar_suppress = threading.local()


class _BackendDispatchController(Protocol):
"""The slice of ``RuntimeImportController`` the runtime gate drives."""

def disable_backend_dispatch(self) -> None: ...
def enable_backend_dispatch(self) -> None: ...
def initialize_selected_backend(self) -> None: ...


def apply_runtime_gate(
controller: _BackendDispatchController,
*,
matched: bool,
repair: Callable[[], bool],
on_degrade: Callable[[], None],
) -> None:
"""Decide backend dispatch for a traced process based on ABI match.

- ``matched`` (bundled/co-installed deps are the right ABI): enable backends.
- mismatch: disable dispatch **first** — so the repair's own imports can't
trigger a wrong-ABI backend load (the original cross-Python crash) — then
attempt ``repair``; re-enable and initialize **only** if repair made
ABI-matched deps reachable, otherwise ``on_degrade``.

Extracted from ``sitecustomize`` so the ordering (disable-before-repair,
enable-only-on-success) is unit-testable without importing the module body.
"""
if matched:
controller.initialize_selected_backend()
return
controller.disable_backend_dispatch()
if repair():
controller.enable_backend_dispatch()
controller.initialize_selected_backend()
else:
on_degrade()


def bundled_abi_tag(inject_dir: str) -> str | None:
"""Return the cpython ABI tag of roar's bundled compiled deps, or None.

Expand Down
101 changes: 99 additions & 2 deletions roar/execution/runtime/lazy_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,24 @@
import sys
import tempfile
import time
from collections.abc import Iterator
from pathlib import Path

try:
import fcntl
except ImportError: # non-POSIX; locking degrades to best-effort (no lock)
fcntl = None # type: ignore[assignment]

# Deps backend dispatch needs in the traced Python. Kept short — pip/uv
# resolves transitive deps. Unpinned: roar tolerates any pydantic 2.x.
_RUNTIME_DEPS: tuple[str, ...] = ("pydantic", "blake3")

_STAMP_FILENAME = "roar_runtime.json"
_INSTALL_TIMEOUT_SECONDS = 180
# A waiter must outlast a winner that is mid-install, so the lock timeout sits
# above the install timeout — otherwise a waiter could give up while the winner
# is still legitimately working.
_LOCK_TIMEOUT_SECONDS = _INSTALL_TIMEOUT_SECONDS + 30


def runtime_cache_root() -> Path:
Expand Down Expand Up @@ -100,6 +110,7 @@ def install_runtime(
text=True,
timeout=_INSTALL_TIMEOUT_SECONDS,
check=False,
env=_clean_subprocess_env(),
)
except (OSError, subprocess.SubprocessError) as exc:
sys.stderr.write(f"🦖 install failed: {exc}\n")
Expand Down Expand Up @@ -133,6 +144,82 @@ def install_runtime(
return is_runtime_cached(abi_tag, roar_version)


def _clean_subprocess_env() -> dict[str, str]:
"""Env for the installer subprocess with roar's injection stripped out.

The in-process repair path (``sitecustomize``) can call ``install_runtime``
from *inside* a traced process, where ``ROAR_WRAP=1`` and roar's inject dir
is on ``PYTHONPATH``. Left in place, the installer's own Python would
re-inject roar into itself — recursion and polluted lineage. Dropping the
wrap flag and the inject dir makes the installer run as a plain, untraced
process. Harmless when called from the (already-clean) launch-time path.
"""
env = dict(os.environ)
env.pop("ROAR_WRAP", None)
env.pop("ROAR_RUNTIME_PYTHONPATH", None)
env.pop("ROAR_RUNTIME_PYTHONPATH_ACTIVE", None)

inject_dir = os.path.realpath(str(Path(__file__).resolve().parent / "inject"))
pythonpath = env.get("PYTHONPATH")
if pythonpath:
kept = [
entry
for entry in pythonpath.split(os.pathsep)
if entry and os.path.realpath(entry) != inject_dir
]
if kept:
env["PYTHONPATH"] = os.pathsep.join(kept)
else:
env.pop("PYTHONPATH", None)
return env


@contextlib.contextmanager
def _install_lock(abi_tag: str, timeout: float = _LOCK_TIMEOUT_SECONDS) -> Iterator[bool]:
"""Hold an exclusive cross-process lock for installing one ABI tree.

Yields ``True`` once the lock is held, or ``False`` if it could not be
acquired within ``timeout`` (the caller should then degrade to the gate
rather than block forever). Best-effort: with no ``fcntl`` (non-POSIX) the
lock is a no-op and we yield ``True``.

Used to collapse the torchrun "thundering herd": one worker per GPU can
reach the installer at the same instant on a cold cache; the lock lets the
winner install while the rest wait, then re-check the cache and reuse it.
"""
if fcntl is None:
yield True
return
try:
cache_root = runtime_cache_root()
cache_root.mkdir(parents=True, exist_ok=True)
except OSError:
# Can't even make the cache dir; let install_runtime surface the error.
yield True
return

lock_path = cache_root / f".{abi_tag}.lock"
fd = os.open(str(lock_path), os.O_CREAT | os.O_RDWR, 0o644)
acquired = False
deadline = time.monotonic() + timeout
try:
while True:
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
acquired = True
break
except OSError:
if time.monotonic() >= deadline:
break
time.sleep(0.2)
yield acquired
finally:
if acquired:
with contextlib.suppress(OSError):
fcntl.flock(fd, fcntl.LOCK_UN)
os.close(fd)


def _select_installer(
target_python: str, target_dir: Path, deps: tuple[str, ...]
) -> list[str] | None:
Expand Down Expand Up @@ -212,6 +299,16 @@ def ensure_runtime(
if is_runtime_cached(target_abi, roar_version):
return runtime_site_packages(target_abi)

if install_runtime(target_abi, target_python, roar_version):
return runtime_site_packages(target_abi)
# Cold cache. Under a wrapper launch (torchrun, accelerate, ...) every
# worker process reaches here at once, so serialize on a per-ABI lock and
# re-check the cache after acquiring it: the winner installs once, the rest
# find the freshly-installed tree and reuse it instead of racing N installs.
with _install_lock(target_abi) as acquired:
if is_runtime_cached(target_abi, roar_version):
return runtime_site_packages(target_abi)
if not acquired:
# Timed out waiting on another installer; degrade to the gate.
return None
if install_runtime(target_abi, target_python, roar_version):
return runtime_site_packages(target_abi)
return None
37 changes: 37 additions & 0 deletions tests/execution/framework/test_execution_planning.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,3 +411,40 @@ def _register():
assert loads == ["load"]
assert backends_first == (fake_backend,)
assert backends_second == (fake_backend,)


def test_builtin_backend_discovery_skips_unimportable_plugin(monkeypatch) -> None:
"""A builtin backend whose compiled deps can't import — e.g. a wrong-ABI
wheel (pydantic_core, cryptography, ...) under a cross-Python ``roar run`` —
is skipped rather than crashing discovery for the other backends or the
traced workload.
"""
import roar.execution.framework.registry as module

monkeypatch.setattr(module, "_registered_execution_backends", [])
monkeypatch.setattr(module, "_execution_backends_discovered", False)
monkeypatch.setattr(module, "_execution_backends_discovering", False)
monkeypatch.setattr(module, "_skipped_builtin_backend_imports", {})
monkeypatch.setattr(
module,
"_BUILTIN_EXECUTION_BACKEND_MODULES",
("roar.backends.ray.plugin", "roar.backends.local.plugin"),
)
monkeypatch.setattr(module, "_iter_execution_backend_entrypoints", lambda: ())

real_import = module.importlib.import_module

def fake_import(name: str):
if name == "roar.backends.ray.plugin":
raise ImportError("_rust.abi3.so: undefined symbol: PyType_GetName")
return real_import(name)

monkeypatch.setattr(module.importlib, "import_module", fake_import)

module._ensure_execution_backends_discovered() # must not raise

assert module._execution_backends_discovered is True
assert "roar.backends.ray.plugin" in module._skipped_builtin_backend_imports
registered = {backend.name for backend in module._registered_execution_backends}
assert "ray" not in registered # unimportable plugin skipped
assert "local" in registered # importable plugin still registered
Loading
Loading