diff --git a/.claude/plans/docs/05-config-reference.md b/.claude/plans/docs/05-config-reference.md new file mode 100644 index 000000000..2ec297e88 --- /dev/null +++ b/.claude/plans/docs/05-config-reference.md @@ -0,0 +1,28 @@ +# 05 — OPAL Config Reference (private) + +Private, internal supplement to the public operator docs under `documentation/docs/`. Tracks +`OPAL_*` env vars added or clarified by the OPAL Server Git Fixes work, with the declaring +`file:line` so contributors can jump straight to the `Confi` declaration. Each key maps to an +`OPAL_` env var (the `OPAL_` prefix is added once by the component's `Confi(prefix="OPAL_")` +instantiation — the bare name is what appears in the table). + +## 4. opal-server keys + +| Env var | Type | Default | Purpose | Declared at | +|---|---|---|---|---| +| `OPAL_SCOPES_GIT_FETCH_TIMEOUT` | float (seconds) | `120.0` | Hard timeout for a single scope git clone/fetch. On timeout the operation is logged and skipped (retried next cycle), so one unreachable repo can never block boot or other scopes *indefinitely*. `0` = no timeout. | `packages/opal-server/opal_server/config.py:196-202` | +| `OPAL_SCOPES_GIT_MAX_WORKERS` | int | `10` | Size of the dedicated `ThreadPoolExecutor` for scope git operations, which also bounds how many scopes are synced concurrently. Isolating git work keeps a hung fetch from starving bundle serving and other server work that uses the default executor. | `packages/opal-server/opal_server/config.py:203-209` | + +> **Caveat (timeout is soft, not a hard kill).** `OPAL_SCOPES_GIT_FETCH_TIMEOUT` is enforced via +> `asyncio.wait`, which unblocks the event loop and the awaiting coroutine — but the underlying +> pygit2 call keeps running on its pool thread until the OS network timeout. The dedicated pool +> (`OPAL_SCOPES_GIT_MAX_WORKERS`) bounds and isolates those lingering threads so they cannot affect +> bundle serving or other scopes, and a per-repo in-flight guard prevents a second git op from +> touching the same (non-thread-safe) pygit2 repo while the first is still lingering. Hard-kill via +> subprocess is out of scope. See spec §6. +> +> **Boot / concurrency.** Scope syncs run concurrently, bounded by `OPAL_SCOPES_GIT_MAX_WORKERS`, so +> a slow/offline repo only holds its own slot for up to the timeout rather than serially delaying the +> whole pass. With more offline repos than workers, boot/poll can still take +> `ceil(offline / workers) × timeout`; the pool workers are daemon threads so a lingering op never +> blocks process shutdown. diff --git a/packages/opal-server/opal_server/config.py b/packages/opal-server/opal_server/config.py index 58341ff37..1e1d47f11 100644 --- a/packages/opal-server/opal_server/config.py +++ b/packages/opal-server/opal_server/config.py @@ -193,6 +193,22 @@ class OpalServerConfig(Confi): 0, description="The timeout for cloning the policy repository (0 means wait forever)", ) + SCOPES_GIT_FETCH_TIMEOUT = confi.float( + "SCOPES_GIT_FETCH_TIMEOUT", + 120.0, + description="Hard timeout in seconds for a single scope git clone/fetch. " + "On timeout the operation is logged and skipped (retried next cycle), so " + "one unreachable repo can never block boot or other scopes indefinitely " + "(0 = no timeout).", + ) + SCOPES_GIT_MAX_WORKERS = confi.int( + "SCOPES_GIT_MAX_WORKERS", + 10, + description="Size of the dedicated thread pool for scope git operations, " + "which also bounds how many scopes are synced concurrently. Isolating git " + "work keeps a hung fetch from starving bundle serving and other server " + "work that uses the default executor.", + ) LEADER_LOCK_FILE_PATH = confi.str( "LEADER_LOCK_FILE_PATH", "/tmp/opal_server_leader.lock", diff --git a/packages/opal-server/opal_server/git_fetcher.py b/packages/opal-server/opal_server/git_fetcher.py index e52083c98..5795fd0ff 100644 --- a/packages/opal-server/opal_server/git_fetcher.py +++ b/packages/opal-server/opal_server/git_fetcher.py @@ -2,8 +2,14 @@ import codecs import datetime import hashlib +import os import shutil +import threading import time +import weakref +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import thread as cf_thread +from functools import partial from pathlib import Path from typing import Optional, cast @@ -11,7 +17,6 @@ import pygit2 from ddtrace import tracer from git import Repo -from opal_common.async_utils import run_sync from opal_common.git_utils.bundle_maker import BundleMaker from opal_common.logger import logger from opal_common.schemas.policy import PolicyBundle @@ -33,6 +38,197 @@ reference_is_valid_name, ) +_git_executor: Optional[ThreadPoolExecutor] = None + +# Source ids whose scope git op (clone/fetch) is still running on a pool thread +# — including one that already exceeded its timeout but whose blocking pygit2 +# call has not yet returned. Guarded by a lock because it is cleared from the +# pool thread (see ``run_in_git_executor``) and read/written from the event +# loop. Used to guarantee at most one live git op per repository, since pygit2 +# ``Repository`` objects are not thread-safe. +_git_busy: set = set() +_git_busy_lock = threading.Lock() + + +class _DaemonThreadPoolExecutor(ThreadPoolExecutor): + """A ``ThreadPoolExecutor`` whose worker threads are daemon threads. + + A scope git op can stay blocked in a libgit2 network call well past our + soft timeout. With the stdlib's non-daemon workers, ``concurrent.futures``' + atexit handler would ``join()`` such a thread and hang interpreter shutdown + until the OS network timeout fires. Daemon workers let the process exit + promptly; abandoning an in-flight fetch at exit is safe (libgit2 stages + objects in a temp pack and swaps refs atomically under lockfiles, and a + half-written clone dir is detected as invalid and re-cloned on next boot). + + Only thread creation is customised, mirroring CPython's + ``_adjust_thread_count``. If a future CPython changes the internals we rely + on, we fall back to the stdlib (non-daemon) behaviour. + """ + + def _adjust_thread_count(self) -> None: # pragma: no cover - thread mgmt + if not hasattr(cf_thread, "_worker") or not hasattr( + cf_thread, "_threads_queues" + ): + return super()._adjust_thread_count() + # If idle threads are available, don't spin up new ones. + if self._idle_semaphore.acquire(timeout=0): + return + + def weakref_cb(_, q=self._work_queue): + q.put(None) + + num_threads = len(self._threads) + if num_threads < self._max_workers: + thread_name = "%s_%d" % (self._thread_name_prefix or self, num_threads) + t = threading.Thread( + name=thread_name, + target=cf_thread._worker, + args=( + weakref.ref(self, weakref_cb), + self._work_queue, + self._initializer, + self._initargs, + ), + daemon=True, + ) + t.start() + self._threads.add(t) + cf_thread._threads_queues[t] = self._work_queue + + +def _get_git_executor() -> ThreadPoolExecutor: + """Lazily build the dedicated pool for scope git operations. + + Isolated from the default executor so a hung clone/fetch can never starve + bundle serving or other server work. Workers are daemon threads so a + lingering (timed-out) git op cannot block interpreter shutdown. + + ``SCOPES_GIT_MAX_WORKERS`` is read once on first use; the executor is then + cached for the process lifetime. It is reset after ``fork`` (see + ``_reset_git_executor_after_fork``) because a ``ThreadPoolExecutor`` + inherited across a fork has no live worker threads in the child. + """ + global _git_executor + if _git_executor is None: + _git_executor = _DaemonThreadPoolExecutor( + max_workers=opal_server_config.SCOPES_GIT_MAX_WORKERS, + thread_name_prefix="opal-git", + ) + return _git_executor + + +def shutdown_git_executor() -> None: + """Drop the dedicated git pool and clear in-flight state. + + Called at the end of the pre-fork ``preload_scopes`` so the gunicorn master + does not carry idle git-pool threads (or stale in-flight markers) into the + forked workers. Workers lazily rebuild their own pool on first use. + """ + global _git_executor + executor, _git_executor = _git_executor, None + if executor is not None: + executor.shutdown(wait=False, cancel_futures=True) + with _git_busy_lock: + _git_busy.clear() + + +def _reset_git_executor_after_fork() -> None: + """Reset the git pool + in-flight state in a freshly forked child. + + A ``ThreadPoolExecutor`` created in the parent is inherited *broken* by the + child: its worker threads do not survive ``fork``, yet its bookkeeping makes + ``_adjust_thread_count`` skip spawning live workers, so every submitted task + would sit queued forever. Dropping the reference forces the child to build + its own working pool. The in-flight markers are stale in the child too (no + thread will ever clear them), so clear them. + """ + global _git_executor + _git_executor = None + with _git_busy_lock: + _git_busy.clear() + + +if hasattr(os, "register_at_fork"): + os.register_at_fork(after_in_child=_reset_git_executor_after_fork) + + +def _mark_git_op_started(key: str) -> None: + with _git_busy_lock: + _git_busy.add(key) + + +def _mark_git_op_done(key: str) -> None: + with _git_busy_lock: + _git_busy.discard(key) + + +def git_op_in_flight(key: str) -> bool: + """True while a git op for ``key`` is still running on a pool thread. + + Stays True during the "lingering" window after a timeout, until the + blocking pygit2 call actually returns. + """ + with _git_busy_lock: + return key in _git_busy + + +def _consume_future_result(fut) -> None: + # A future left running after its awaiter timed out is never awaited again; + # retrieve its outcome so asyncio doesn't log "exception never retrieved". + if not fut.cancelled(): + try: + fut.exception() + except Exception: + pass + + +async def run_in_git_executor(func, *args, timeout: float, busy_key=None, **kwargs): + """Run a blocking git call on the dedicated pool with a hard timeout. + + Raises the builtin ``TimeoutError`` when the call exceeds ``timeout`` + seconds (``timeout <= 0`` means no limit). NOTE: the timeout unblocks the + event loop and the awaiting coroutine, but the underlying pygit2 call keeps + running on its pool thread until the OS network timeout; the dedicated pool + keeps that lingering thread isolated. + + When ``busy_key`` is given it is marked in-flight for the *entire real + duration* of the call — including any lingering time after a timeout — and + cleared only when the blocking call actually returns (on the pool thread). + Callers use ``git_op_in_flight`` to avoid starting a second git op against + the same repository while a timed-out one is still running. + """ + loop = asyncio.get_running_loop() + + def _runner(): + try: + return func(*args, **kwargs) + finally: + if busy_key is not None: + _mark_git_op_done(busy_key) + + if busy_key is not None: + _mark_git_op_started(busy_key) + try: + fut = loop.run_in_executor(_get_git_executor(), _runner) + except BaseException: + if busy_key is not None: + _mark_git_op_done(busy_key) + raise + + if not (timeout and timeout > 0): + return await fut + + # Use asyncio.wait (not wait_for) so a timeout does NOT cancel the future: + # the pool thread runs to completion and clears busy_key, and a still-queued + # task isn't silently dropped. The done-callback retrieves the eventual + # result to avoid an "exception never retrieved" warning. + fut.add_done_callback(_consume_future_result) + done, _pending = await asyncio.wait({fut}, timeout=timeout) + if not done: + raise TimeoutError(f"git operation exceeded {timeout}s") + return fut.result() + class PolicyFetcherCallbacks: async def on_update(self, old_head: Optional[str], head: str): @@ -169,6 +365,17 @@ async def fetch_and_notify_on_changes( """ repo_lock = await self._get_repo_lock() async with repo_lock: + if git_op_in_flight(self._source_id): + # A previous git op for this repo exceeded its timeout and is + # still running on a pool thread. pygit2 Repository objects are + # not thread-safe, so skip this cycle rather than touch the same + # repo concurrently; the next cycle retries once it finishes. + logger.warning( + "Skipping sync for {url}: a previous git operation is still " + "running after its timeout.", + url=self._source.url, + ) + return with tracer.trace( "git_policy_fetcher.fetch_and_notify_on_changes", resource=self._scope_id, @@ -187,13 +394,32 @@ async def fetch_and_notify_on_changes( logger.debug( f"Fetching remote (force_fetch={force_fetch}): {self._remote} ({self._source.url})" ) + started = datetime.datetime.now() + try: + await run_in_git_executor( + repo.remotes[self._remote].fetch, + callbacks=self._auth_callbacks, + timeout=opal_server_config.SCOPES_GIT_FETCH_TIMEOUT, + busy_key=self._source_id, + ) + except TimeoutError as exc: + # Expected when a repo is unreachable: log cleanly + # (no traceback) and skip, matching the clone path. + # repos_last_fetched stays stale so the next cycle + # retries and force_fetch is not wrongly suppressed. + logger.error( + "Timed out fetching {url}, skipping: {err}", + url=self._source.url, + err=repr(exc), + ) + return + # Record the fetch *start* time, but only now that it + # has succeeded: a completion timestamp could wrongly + # suppress a force_fetch whose req_time falls within an + # in-flight fetch (see _was_fetched_after). GitPolicyFetcher.repos_last_fetched[ self._source_id - ] = datetime.datetime.now() - await run_sync( - repo.remotes[self._remote].fetch, - callbacks=self._auth_callbacks, - ) + ] = started logger.debug(f"Fetch completed: {self._source.url}") # New commits might be present because of a previous fetch made by another scope @@ -204,7 +430,10 @@ async def fetch_and_notify_on_changes( logger.warning( "Deleting invalid repo: {path}", path=self._repo_path ) - shutil.rmtree(self._repo_path) + # ignore_errors: a partial dir left by an abandoned + # (timed-out) clone may still be written to; a failed + # delete self-heals via the clone below (or next cycle). + shutil.rmtree(self._repo_path, ignore_errors=True) else: logger.info("Repo not found at {path}", path=self._repo_path) @@ -222,14 +451,20 @@ async def _clone(self): path=self._repo_path, ) try: - repo: Repository = await run_sync( + repo: Repository = await run_in_git_executor( clone_repository, self._source.url, str(self._repo_path), callbacks=self._auth_callbacks, + timeout=opal_server_config.SCOPES_GIT_FETCH_TIMEOUT, + busy_key=self._source_id, + ) + except (pygit2.GitError, TimeoutError) as exc: + logger.error( + "Could not clone repo at {url}: {err}", + url=self._source.url, + err=repr(exc), ) - except pygit2.GitError: - logger.exception(f"Could not clone repo at {self._source.url}") else: logger.info(f"Clone completed: {self._source.url}") await self._notify_on_changes(repo) diff --git a/packages/opal-server/opal_server/scopes/service.py b/packages/opal-server/opal_server/scopes/service.py index d3df4972f..8aef8d630 100644 --- a/packages/opal-server/opal_server/scopes/service.py +++ b/packages/opal-server/opal_server/scopes/service.py @@ -1,3 +1,4 @@ +import asyncio import datetime import shutil from functools import partial @@ -12,6 +13,7 @@ from opal_common.schemas.policy import PolicyUpdateMessageNotification from opal_common.schemas.policy_source import GitPolicyScopeSource from opal_common.topics.publisher import ScopedServerSideTopicPublisher +from opal_server.config import opal_server_config from opal_server.git_fetcher import GitPolicyFetcher, PolicyFetcherCallbacks from opal_server.policy.watcher.callbacks import ( create_policy_update, @@ -194,34 +196,58 @@ async def sync_scopes(self, only_poll_updates=False, notify_on_changes=True): f"OPAL Scopes: syncing {len(scopes)} scopes in the background (polling updates: {only_poll_updates})" ) - fetched_source_ids = set() - skipped_scopes = [] + # Partition into distinct repos (cloned/fetched once, with priority + # so every repo is pulled asap) and the scopes that merely reuse an + # already-handled repo (checked for changes only). + unique_scopes = [] + duplicate_scopes = [] + seen_source_ids = set() for scope in scopes: src_id = GitPolicyFetcher.source_id(scope.policy) + if src_id in seen_source_ids: + duplicate_scopes.append(scope) + else: + seen_source_ids.add(src_id) + unique_scopes.append(scope) + + # Bound concurrency to the dedicated git pool so one unreachable repo + # only stalls its own slot (for the fetch timeout), not the whole + # boot/poll pass. Phase 1 clones/fetches every distinct repo; phase 2 + # then checks the duplicates against those now-present repos. + semaphore = asyncio.Semaphore( + max(1, opal_server_config.SCOPES_GIT_MAX_WORKERS) + ) + await self._sync_scopes_concurrently( + unique_scopes, + semaphore, + force_fetch=True, + notify_on_changes=notify_on_changes, + ) + await self._sync_scopes_concurrently( + duplicate_scopes, + semaphore, + force_fetch=False, + notify_on_changes=notify_on_changes, + ) - # Give priority to scopes that have a unique url per shard (so we'll clone all repos asap) - if src_id in fetched_source_ids: - skipped_scopes.append(scope) - continue - - try: - await self.sync_scope( - scope=scope, - force_fetch=True, - notify_on_changes=notify_on_changes, - ) - except Exception as e: - logger.exception(f"sync_scope failed for {scope.scope_id}") + async def _sync_scopes_concurrently( + self, scopes, semaphore, *, force_fetch, notify_on_changes + ): + """Sync ``scopes`` concurrently, bounded by ``semaphore``. - fetched_source_ids.add(src_id) + Each scope's failure is logged and isolated so one bad repo + never fails the whole pass. + """ - for scope in skipped_scopes: - # No need to refetch the same repo, just check for changes + async def _sync_one(scope): + async with semaphore: try: await self.sync_scope( scope=scope, - force_fetch=False, + force_fetch=force_fetch, notify_on_changes=notify_on_changes, ) - except Exception as e: + except Exception: logger.exception(f"sync_scope failed for {scope.scope_id}") + + await asyncio.gather(*(_sync_one(scope) for scope in scopes)) diff --git a/packages/opal-server/opal_server/scopes/task.py b/packages/opal-server/opal_server/scopes/task.py index 83b2b10f0..10ac85b74 100644 --- a/packages/opal-server/opal_server/scopes/task.py +++ b/packages/opal-server/opal_server/scopes/task.py @@ -6,6 +6,7 @@ from fastapi_websocket_pubsub import Topic from opal_common.logger import logger from opal_server.config import opal_server_config +from opal_server.git_fetcher import shutdown_git_executor from opal_server.policy.watcher.task import BasePolicyWatcherTask from opal_server.redis_utils import RedisDB from opal_server.scopes.scope_repository import ScopeRepository @@ -82,4 +83,9 @@ def preload_scopes(): ) asyncio.run(service.sync_scopes(notify_on_changes=False)) + # Release the git pool built during preload so the gunicorn master + # does not carry pool threads into forked workers (they would be + # dead in the child). Workers rebuild their own pool on first use. + shutdown_git_executor() + logger.warning("Finished preloading repo clones for scopes.") diff --git a/packages/opal-server/opal_server/tests/fetch_timeout_test.py b/packages/opal-server/opal_server/tests/fetch_timeout_test.py new file mode 100644 index 000000000..b8bde71e1 --- /dev/null +++ b/packages/opal-server/opal_server/tests/fetch_timeout_test.py @@ -0,0 +1,24 @@ +import time + +import pytest +from opal_server.config import opal_server_config +from opal_server.git_fetcher import run_in_git_executor + + +@pytest.mark.asyncio +async def test_hanging_git_op_raises_timeout(monkeypatch): + """A clone/fetch that hangs must surface TimeoutError, not block + forever.""" + monkeypatch.setattr(opal_server_config, "SCOPES_GIT_FETCH_TIMEOUT", 0.2) + + def _hang(): + # Short enough that the lingering pool thread doesn't delay teardown, + # but well above the 0.2s timeout under test. + time.sleep(1) + + start = time.monotonic() + with pytest.raises(TimeoutError): + await run_in_git_executor( + _hang, timeout=opal_server_config.SCOPES_GIT_FETCH_TIMEOUT + ) + assert time.monotonic() - start < 2, "wait_for did not unblock promptly" diff --git a/packages/opal-server/opal_server/tests/git_executor_test.py b/packages/opal-server/opal_server/tests/git_executor_test.py new file mode 100644 index 000000000..24c83fbaf --- /dev/null +++ b/packages/opal-server/opal_server/tests/git_executor_test.py @@ -0,0 +1,74 @@ +import asyncio +import threading +import time + +import pytest +from opal_server.config import OpalServerConfig +from opal_server.git_fetcher import git_op_in_flight, run_in_git_executor + + +def test_git_resilience_config_defaults(monkeypatch): + # Don't let an ambient OPAL_* env var in CI/dev shadow the declared defaults. + monkeypatch.delenv("OPAL_SCOPES_GIT_FETCH_TIMEOUT", raising=False) + monkeypatch.delenv("OPAL_SCOPES_GIT_MAX_WORKERS", raising=False) + clean = OpalServerConfig(prefix="OPAL_") + assert clean.SCOPES_GIT_FETCH_TIMEOUT == 120.0 + assert clean.SCOPES_GIT_MAX_WORKERS == 10 + + +@pytest.mark.asyncio +async def test_run_in_git_executor_returns_value(): + result = await run_in_git_executor(lambda: 21 * 2, timeout=5) + assert result == 42 + + +@pytest.mark.asyncio +async def test_run_in_git_executor_times_out(): + with pytest.raises(TimeoutError): + await run_in_git_executor(lambda: time.sleep(1), timeout=0.1) + + +@pytest.mark.asyncio +async def test_zero_timeout_means_no_limit(): + result = await run_in_git_executor(lambda: "ok", timeout=0) + assert result == "ok" + + +def test_git_op_in_flight_false_for_unknown_key(): + assert git_op_in_flight("no-such-source") is False + + +@pytest.mark.asyncio +async def test_busy_key_stays_in_flight_until_call_returns(): + """A timed-out op must remain 'in flight' until its blocking call actually + returns, so a second op for the same repo is not started concurrently.""" + started = threading.Event() + release = threading.Event() + key = "busy-source-id" + + def _block(): + started.set() + release.wait(5) + + # The op exceeds its timeout but keeps running on the pool thread. + with pytest.raises(TimeoutError): + await run_in_git_executor(_block, timeout=0.1, busy_key=key) + + assert started.wait(2) + # Still lingering on the pool thread -> guarded as in-flight. + assert git_op_in_flight(key) is True + + # Once the blocking call returns, the marker clears (from the pool thread). + release.set() + for _ in range(200): + if not git_op_in_flight(key): + break + await asyncio.sleep(0.01) + assert git_op_in_flight(key) is False + + +@pytest.mark.asyncio +async def test_busy_key_cleared_after_success(): + key = "ok-source-id" + assert await run_in_git_executor(lambda: 1, timeout=5, busy_key=key) == 1 + assert git_op_in_flight(key) is False