From 245f80ba6c06a7e52449dd35cc9dfa482a297378 Mon Sep 17 00:00:00 2001 From: David Shoen Date: Tue, 23 Jun 2026 14:19:46 +0300 Subject: [PATCH 1/8] feat(opal-server): add scope git fetch timeout + dedicated pool config Co-Authored-By: Claude Opus 4.8 (1M context) --- .claude/plans/docs/05-config-reference.md | 21 +++++++++++++++++++ packages/opal-server/opal_server/config.py | 14 +++++++++++++ .../opal_server/tests/git_executor_test.py | 7 +++++++ 3 files changed, 42 insertions(+) create mode 100644 .claude/plans/docs/05-config-reference.md create mode 100644 packages/opal-server/opal_server/tests/git_executor_test.py diff --git a/.claude/plans/docs/05-config-reference.md b/.claude/plans/docs/05-config-reference.md new file mode 100644 index 000000000..19caea4c4 --- /dev/null +++ b/.claude/plans/docs/05-config-reference.md @@ -0,0 +1,21 @@ +# 05 — OPAL Config Reference (private) + +Private, internal supplement to the public operator docs under `documentation/docs/`. Tracks +`OPAL_*` env vars added or clarified by the OPAL Server Git Fixes work, with the declaring +`file:line` so contributors can jump straight to the `Confi` declaration. Each key maps to an +`OPAL_` env var (the `OPAL_` prefix is added once by the component's `Confi(prefix="OPAL_")` +instantiation — the bare name is what appears in the table). + +## 4. opal-server keys + +| Env var | Type | Default | Purpose | Declared at | +|---|---|---|---|---| +| `OPAL_SCOPES_GIT_FETCH_TIMEOUT` | float (seconds) | `120.0` | Hard timeout for a single scope git clone/fetch. On timeout the operation is abandoned and the scope is marked failed, so one unreachable repo can never block boot or other scopes. `0` = no timeout. | `packages/opal-server/opal_server/config.py:150-156` | +| `OPAL_SCOPES_GIT_MAX_WORKERS` | int | `10` | Size of the dedicated `ThreadPoolExecutor` for scope git operations. Isolating git work keeps a hung fetch from starving bundle serving and other server work that uses the default executor. | `packages/opal-server/opal_server/config.py:157-163` | + +> **Caveat (timeout is soft, not a hard kill).** `OPAL_SCOPES_GIT_FETCH_TIMEOUT` is enforced via +> `asyncio.wait_for`, which cancels the *await* — unblocking the event loop and the awaiting +> coroutine — but the underlying pygit2 call keeps running on its pool thread until the OS network +> timeout. The dedicated pool (`OPAL_SCOPES_GIT_MAX_WORKERS`) bounds and isolates those lingering +> threads so they cannot affect bundle serving or other scopes. Hard-kill via subprocess is out of +> scope. See spec §6. diff --git a/packages/opal-server/opal_server/config.py b/packages/opal-server/opal_server/config.py index 9faac9be4..82ffc305c 100644 --- a/packages/opal-server/opal_server/config.py +++ b/packages/opal-server/opal_server/config.py @@ -147,6 +147,20 @@ class OpalServerConfig(Confi): 0, description="The timeout for cloning the policy repository (0 means wait forever)", ) + SCOPES_GIT_FETCH_TIMEOUT = confi.float( + "SCOPES_GIT_FETCH_TIMEOUT", + 120.0, + description="Hard timeout in seconds for a single scope git clone/fetch. " + "On timeout the operation is abandoned and the scope is marked failed so " + "one unreachable repo can never block boot or other scopes (0 = no timeout).", + ) + SCOPES_GIT_MAX_WORKERS = confi.int( + "SCOPES_GIT_MAX_WORKERS", + 10, + description="Size of the dedicated thread pool for scope git operations. " + "Isolating git work keeps a hung fetch from starving bundle serving and " + "other server work that uses the default executor.", + ) LEADER_LOCK_FILE_PATH = confi.str( "LEADER_LOCK_FILE_PATH", "/tmp/opal_server_leader.lock", diff --git a/packages/opal-server/opal_server/tests/git_executor_test.py b/packages/opal-server/opal_server/tests/git_executor_test.py new file mode 100644 index 000000000..cfc250f75 --- /dev/null +++ b/packages/opal-server/opal_server/tests/git_executor_test.py @@ -0,0 +1,7 @@ +from opal_server.config import OpalServerConfig + + +def test_git_resilience_config_defaults(): + clean = OpalServerConfig(prefix="OPAL_") + assert clean.SCOPES_GIT_FETCH_TIMEOUT == 120.0 + assert clean.SCOPES_GIT_MAX_WORKERS == 10 From 6abbc2a74bfaf54f6b32ad51e0d096892997420b Mon Sep 17 00:00:00 2001 From: David Shoen Date: Tue, 23 Jun 2026 14:21:19 +0300 Subject: [PATCH 2/8] feat(opal-server): dedicated git thread pool with hard per-op timeout Co-Authored-By: Claude Opus 4.8 (1M context) --- .../opal-server/opal_server/git_fetcher.py | 36 +++++++++++++++++++ .../opal_server/tests/git_executor_test.py | 23 ++++++++++++ 2 files changed, 59 insertions(+) diff --git a/packages/opal-server/opal_server/git_fetcher.py b/packages/opal-server/opal_server/git_fetcher.py index e52083c98..9d3f7d70f 100644 --- a/packages/opal-server/opal_server/git_fetcher.py +++ b/packages/opal-server/opal_server/git_fetcher.py @@ -4,6 +4,8 @@ import hashlib import shutil import time +from concurrent.futures import ThreadPoolExecutor +from functools import partial from pathlib import Path from typing import Optional, cast @@ -34,6 +36,40 @@ ) +_git_executor: Optional[ThreadPoolExecutor] = None + + +def _get_git_executor() -> ThreadPoolExecutor: + """Lazily build the dedicated pool for scope git operations. + + Isolated from the default executor so a hung clone/fetch can never starve + bundle serving or other server work. + """ + global _git_executor + if _git_executor is None: + _git_executor = ThreadPoolExecutor( + max_workers=opal_server_config.SCOPES_GIT_MAX_WORKERS, + thread_name_prefix="opal-git", + ) + return _git_executor + + +async def run_in_git_executor(func, *args, timeout: float, **kwargs): + """Run a blocking git call on the dedicated pool with a hard timeout. + + Raises ``TimeoutError`` (via ``asyncio.wait_for``) when the call exceeds + ``timeout`` seconds. ``timeout <= 0`` means no limit. NOTE: the timeout + unblocks the event loop and the awaiting coroutine, but the underlying + pygit2 call keeps running on its pool thread until the OS network timeout; + the dedicated pool keeps that lingering thread isolated. + """ + loop = asyncio.get_event_loop() + fut = loop.run_in_executor(_get_git_executor(), partial(func, *args, **kwargs)) + if timeout and timeout > 0: + return await asyncio.wait_for(fut, timeout=timeout) + return await fut + + class PolicyFetcherCallbacks: async def on_update(self, old_head: Optional[str], head: str): pass diff --git a/packages/opal-server/opal_server/tests/git_executor_test.py b/packages/opal-server/opal_server/tests/git_executor_test.py index cfc250f75..9a36c624f 100644 --- a/packages/opal-server/opal_server/tests/git_executor_test.py +++ b/packages/opal-server/opal_server/tests/git_executor_test.py @@ -1,7 +1,30 @@ +import time + +import pytest + from opal_server.config import OpalServerConfig +from opal_server.git_fetcher import run_in_git_executor def test_git_resilience_config_defaults(): clean = OpalServerConfig(prefix="OPAL_") assert clean.SCOPES_GIT_FETCH_TIMEOUT == 120.0 assert clean.SCOPES_GIT_MAX_WORKERS == 10 + + +@pytest.mark.asyncio +async def test_run_in_git_executor_returns_value(): + result = await run_in_git_executor(lambda: 21 * 2, timeout=5) + assert result == 42 + + +@pytest.mark.asyncio +async def test_run_in_git_executor_times_out(): + with pytest.raises(TimeoutError): + await run_in_git_executor(lambda: time.sleep(2), timeout=0.1) + + +@pytest.mark.asyncio +async def test_zero_timeout_means_no_limit(): + result = await run_in_git_executor(lambda: "ok", timeout=0) + assert result == "ok" From ed86a9c286d8e865df12c30c108a100d37875761 Mon Sep 17 00:00:00 2001 From: David Shoen Date: Tue, 23 Jun 2026 14:24:10 +0300 Subject: [PATCH 3/8] fix(opal-server): bound scope clone/fetch with timeout, skip on hang Wire scope clone/fetch through run_in_git_executor with SCOPES_GIT_FETCH_TIMEOUT, and broaden the _clone except to catch asyncio.TimeoutError so a hung clone is logged and the scope skipped instead of crashing the caller. Drop the now-unused run_sync import. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../opal-server/opal_server/git_fetcher.py | 15 ++++++++----- .../opal_server/tests/fetch_timeout_test.py | 22 +++++++++++++++++++ 2 files changed, 32 insertions(+), 5 deletions(-) create mode 100644 packages/opal-server/opal_server/tests/fetch_timeout_test.py diff --git a/packages/opal-server/opal_server/git_fetcher.py b/packages/opal-server/opal_server/git_fetcher.py index 9d3f7d70f..c8dee329a 100644 --- a/packages/opal-server/opal_server/git_fetcher.py +++ b/packages/opal-server/opal_server/git_fetcher.py @@ -13,7 +13,6 @@ import pygit2 from ddtrace import tracer from git import Repo -from opal_common.async_utils import run_sync from opal_common.git_utils.bundle_maker import BundleMaker from opal_common.logger import logger from opal_common.schemas.policy import PolicyBundle @@ -226,9 +225,10 @@ async def fetch_and_notify_on_changes( GitPolicyFetcher.repos_last_fetched[ self._source_id ] = datetime.datetime.now() - await run_sync( + await run_in_git_executor( repo.remotes[self._remote].fetch, callbacks=self._auth_callbacks, + timeout=opal_server_config.SCOPES_GIT_FETCH_TIMEOUT, ) logger.debug(f"Fetch completed: {self._source.url}") @@ -258,14 +258,19 @@ async def _clone(self): path=self._repo_path, ) try: - repo: Repository = await run_sync( + repo: Repository = await run_in_git_executor( clone_repository, self._source.url, str(self._repo_path), callbacks=self._auth_callbacks, + timeout=opal_server_config.SCOPES_GIT_FETCH_TIMEOUT, + ) + except (pygit2.GitError, asyncio.TimeoutError) as exc: + logger.error( + "Could not clone repo at {url}: {err}", + url=self._source.url, + err=repr(exc), ) - except pygit2.GitError: - logger.exception(f"Could not clone repo at {self._source.url}") else: logger.info(f"Clone completed: {self._source.url}") await self._notify_on_changes(repo) diff --git a/packages/opal-server/opal_server/tests/fetch_timeout_test.py b/packages/opal-server/opal_server/tests/fetch_timeout_test.py new file mode 100644 index 000000000..58376ff44 --- /dev/null +++ b/packages/opal-server/opal_server/tests/fetch_timeout_test.py @@ -0,0 +1,22 @@ +import time + +import pytest + +from opal_server.config import opal_server_config +from opal_server.git_fetcher import run_in_git_executor + + +@pytest.mark.asyncio +async def test_hanging_git_op_raises_timeout(monkeypatch): + """A clone/fetch that hangs must surface TimeoutError, not block forever.""" + monkeypatch.setattr(opal_server_config, "SCOPES_GIT_FETCH_TIMEOUT", 0.2) + + def _hang(): + time.sleep(5) + + start = time.time() + with pytest.raises(TimeoutError): + await run_in_git_executor( + _hang, timeout=opal_server_config.SCOPES_GIT_FETCH_TIMEOUT + ) + assert time.time() - start < 2, "wait_for did not unblock promptly" From c043e88a10e2d1cb0180e2fbcc2a20c2bb97436c Mon Sep 17 00:00:00 2001 From: David Shoen Date: Wed, 24 Jun 2026 20:09:56 +0300 Subject: [PATCH 4/8] fix(opal-server): normalize asyncio.TimeoutError to builtin + format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On Python < 3.11 asyncio.TimeoutError is a distinct class from the builtin TimeoutError, so run_in_git_executor's wait_for timeout was not caught by `pytest.raises(TimeoutError)` — failing build (3.9)/(3.10). Normalize to the builtin TimeoutError so the documented contract holds on every supported Python, and update the _clone catch site to match. Also apply black/isort/docformatter formatting to satisfy pre-commit. Co-Authored-By: Claude Opus 4.8 (1M context) --- packages/opal-server/opal_server/git_fetcher.py | 15 ++++++++++----- .../opal_server/tests/fetch_timeout_test.py | 4 ++-- .../opal_server/tests/git_executor_test.py | 1 - 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/packages/opal-server/opal_server/git_fetcher.py b/packages/opal-server/opal_server/git_fetcher.py index c8dee329a..c549e04dd 100644 --- a/packages/opal-server/opal_server/git_fetcher.py +++ b/packages/opal-server/opal_server/git_fetcher.py @@ -34,15 +34,14 @@ reference_is_valid_name, ) - _git_executor: Optional[ThreadPoolExecutor] = None def _get_git_executor() -> ThreadPoolExecutor: """Lazily build the dedicated pool for scope git operations. - Isolated from the default executor so a hung clone/fetch can never starve - bundle serving or other server work. + Isolated from the default executor so a hung clone/fetch can never + starve bundle serving or other server work. """ global _git_executor if _git_executor is None: @@ -65,7 +64,13 @@ async def run_in_git_executor(func, *args, timeout: float, **kwargs): loop = asyncio.get_event_loop() fut = loop.run_in_executor(_get_git_executor(), partial(func, *args, **kwargs)) if timeout and timeout > 0: - return await asyncio.wait_for(fut, timeout=timeout) + try: + return await asyncio.wait_for(fut, timeout=timeout) + except asyncio.TimeoutError as exc: + # On Python < 3.11 ``asyncio.TimeoutError`` is a distinct class from + # the builtin ``TimeoutError``; normalize so callers (and the + # documented contract) can rely on the builtin everywhere. + raise TimeoutError() from exc return await fut @@ -265,7 +270,7 @@ async def _clone(self): callbacks=self._auth_callbacks, timeout=opal_server_config.SCOPES_GIT_FETCH_TIMEOUT, ) - except (pygit2.GitError, asyncio.TimeoutError) as exc: + except (pygit2.GitError, TimeoutError) as exc: logger.error( "Could not clone repo at {url}: {err}", url=self._source.url, diff --git a/packages/opal-server/opal_server/tests/fetch_timeout_test.py b/packages/opal-server/opal_server/tests/fetch_timeout_test.py index 58376ff44..b658cf45a 100644 --- a/packages/opal-server/opal_server/tests/fetch_timeout_test.py +++ b/packages/opal-server/opal_server/tests/fetch_timeout_test.py @@ -1,14 +1,14 @@ import time import pytest - from opal_server.config import opal_server_config from opal_server.git_fetcher import run_in_git_executor @pytest.mark.asyncio async def test_hanging_git_op_raises_timeout(monkeypatch): - """A clone/fetch that hangs must surface TimeoutError, not block forever.""" + """A clone/fetch that hangs must surface TimeoutError, not block + forever.""" monkeypatch.setattr(opal_server_config, "SCOPES_GIT_FETCH_TIMEOUT", 0.2) def _hang(): diff --git a/packages/opal-server/opal_server/tests/git_executor_test.py b/packages/opal-server/opal_server/tests/git_executor_test.py index 9a36c624f..8dfaf93c7 100644 --- a/packages/opal-server/opal_server/tests/git_executor_test.py +++ b/packages/opal-server/opal_server/tests/git_executor_test.py @@ -1,7 +1,6 @@ import time import pytest - from opal_server.config import OpalServerConfig from opal_server.git_fetcher import run_in_git_executor From 5dd53a2b290de7636531f4d996d75a3d9d354db9 Mon Sep 17 00:00:00 2001 From: David Shoen Date: Wed, 24 Jun 2026 20:13:14 +0300 Subject: [PATCH 5/8] fix(opal-server): address Copilot review on git resilience - run_in_git_executor: use asyncio.get_running_loop() instead of the deprecated get_event_loop() inside an async function - fetch_and_notify_on_changes: set repos_last_fetched only after a successful fetch so a timeout/error does not wrongly suppress a later force_fetch via _was_fetched_after - fetch_timeout_test: measure elapsed time with time.monotonic() Co-Authored-By: Claude Opus 4.8 (1M context) --- packages/opal-server/opal_server/git_fetcher.py | 12 ++++++++---- .../opal_server/tests/fetch_timeout_test.py | 4 ++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/packages/opal-server/opal_server/git_fetcher.py b/packages/opal-server/opal_server/git_fetcher.py index c549e04dd..3202ced81 100644 --- a/packages/opal-server/opal_server/git_fetcher.py +++ b/packages/opal-server/opal_server/git_fetcher.py @@ -61,7 +61,7 @@ async def run_in_git_executor(func, *args, timeout: float, **kwargs): pygit2 call keeps running on its pool thread until the OS network timeout; the dedicated pool keeps that lingering thread isolated. """ - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() fut = loop.run_in_executor(_get_git_executor(), partial(func, *args, **kwargs)) if timeout and timeout > 0: try: @@ -227,14 +227,18 @@ async def fetch_and_notify_on_changes( logger.debug( f"Fetching remote (force_fetch={force_fetch}): {self._remote} ({self._source.url})" ) - GitPolicyFetcher.repos_last_fetched[ - self._source_id - ] = datetime.datetime.now() await run_in_git_executor( repo.remotes[self._remote].fetch, callbacks=self._auth_callbacks, timeout=opal_server_config.SCOPES_GIT_FETCH_TIMEOUT, ) + # Only mark as fetched after the fetch actually + # succeeds. On timeout/error this stays stale so a + # later force_fetch is not wrongly suppressed by + # _was_fetched_after. + GitPolicyFetcher.repos_last_fetched[ + self._source_id + ] = datetime.datetime.now() logger.debug(f"Fetch completed: {self._source.url}") # New commits might be present because of a previous fetch made by another scope diff --git a/packages/opal-server/opal_server/tests/fetch_timeout_test.py b/packages/opal-server/opal_server/tests/fetch_timeout_test.py index b658cf45a..f2c4b8f9d 100644 --- a/packages/opal-server/opal_server/tests/fetch_timeout_test.py +++ b/packages/opal-server/opal_server/tests/fetch_timeout_test.py @@ -14,9 +14,9 @@ async def test_hanging_git_op_raises_timeout(monkeypatch): def _hang(): time.sleep(5) - start = time.time() + start = time.monotonic() with pytest.raises(TimeoutError): await run_in_git_executor( _hang, timeout=opal_server_config.SCOPES_GIT_FETCH_TIMEOUT ) - assert time.time() - start < 2, "wait_for did not unblock promptly" + assert time.monotonic() - start < 2, "wait_for did not unblock promptly" From 382cd8364b1f9d996e9ddefdc4424f84cc629c1d Mon Sep 17 00:00:00 2001 From: David Shoen Date: Wed, 24 Jun 2026 20:21:44 +0300 Subject: [PATCH 6/8] fix(opal-server): clean fetch-timeout logging, trim test thread sleeps MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The fetch path let TimeoutError propagate to sync_scope's catch-all, which logged a full traceback at ERROR level for the expected unreachable-repo case — inconsistent with the clone path's quiet logger.error. Catch TimeoutError at the fetch site and log without a traceback, then skip (repos_last_fetched stays stale so the next cycle retries). Also shorten the hanging-thread sleeps in the timeout tests so the lingering pool thread doesn't delay process teardown. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../opal-server/opal_server/git_fetcher.py | 26 +++++++++++++------ .../opal_server/tests/fetch_timeout_test.py | 4 ++- .../opal_server/tests/git_executor_test.py | 2 +- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/packages/opal-server/opal_server/git_fetcher.py b/packages/opal-server/opal_server/git_fetcher.py index 3202ced81..173e506b2 100644 --- a/packages/opal-server/opal_server/git_fetcher.py +++ b/packages/opal-server/opal_server/git_fetcher.py @@ -227,15 +227,25 @@ async def fetch_and_notify_on_changes( logger.debug( f"Fetching remote (force_fetch={force_fetch}): {self._remote} ({self._source.url})" ) - await run_in_git_executor( - repo.remotes[self._remote].fetch, - callbacks=self._auth_callbacks, - timeout=opal_server_config.SCOPES_GIT_FETCH_TIMEOUT, - ) + try: + await run_in_git_executor( + repo.remotes[self._remote].fetch, + callbacks=self._auth_callbacks, + timeout=opal_server_config.SCOPES_GIT_FETCH_TIMEOUT, + ) + except TimeoutError as exc: + # Expected when a repo is unreachable: log cleanly + # (no traceback) and skip, matching the clone path. + # repos_last_fetched stays stale so the next cycle + # retries and force_fetch is not wrongly suppressed. + logger.error( + "Timed out fetching {url}, skipping: {err}", + url=self._source.url, + err=repr(exc), + ) + return # Only mark as fetched after the fetch actually - # succeeds. On timeout/error this stays stale so a - # later force_fetch is not wrongly suppressed by - # _was_fetched_after. + # succeeds. GitPolicyFetcher.repos_last_fetched[ self._source_id ] = datetime.datetime.now() diff --git a/packages/opal-server/opal_server/tests/fetch_timeout_test.py b/packages/opal-server/opal_server/tests/fetch_timeout_test.py index f2c4b8f9d..b8bde71e1 100644 --- a/packages/opal-server/opal_server/tests/fetch_timeout_test.py +++ b/packages/opal-server/opal_server/tests/fetch_timeout_test.py @@ -12,7 +12,9 @@ async def test_hanging_git_op_raises_timeout(monkeypatch): monkeypatch.setattr(opal_server_config, "SCOPES_GIT_FETCH_TIMEOUT", 0.2) def _hang(): - time.sleep(5) + # Short enough that the lingering pool thread doesn't delay teardown, + # but well above the 0.2s timeout under test. + time.sleep(1) start = time.monotonic() with pytest.raises(TimeoutError): diff --git a/packages/opal-server/opal_server/tests/git_executor_test.py b/packages/opal-server/opal_server/tests/git_executor_test.py index 8dfaf93c7..e1c534c80 100644 --- a/packages/opal-server/opal_server/tests/git_executor_test.py +++ b/packages/opal-server/opal_server/tests/git_executor_test.py @@ -20,7 +20,7 @@ async def test_run_in_git_executor_returns_value(): @pytest.mark.asyncio async def test_run_in_git_executor_times_out(): with pytest.raises(TimeoutError): - await run_in_git_executor(lambda: time.sleep(2), timeout=0.1) + await run_in_git_executor(lambda: time.sleep(1), timeout=0.1) @pytest.mark.asyncio From bad21c1e7efb6a05c2682958ad017fba9e3844dd Mon Sep 17 00:00:00 2001 From: David Shoen Date: Wed, 1 Jul 2026 12:23:58 +0300 Subject: [PATCH 7/8] docs(opal-server): address Zeev review on git resilience MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - git_fetcher: document that the dedicated scope-git ThreadPoolExecutor reads SCOPES_GIT_MAX_WORKERS once on first use, caches for the process lifetime (not runtime-reconfigurable), and is never explicitly shut down — matches the PR3 design. - 05-config-reference: fix stale config.py line refs after the master merge shifted the keys — SCOPES_GIT_FETCH_TIMEOUT 150-156 -> 196-202, SCOPES_GIT_MAX_WORKERS 157-163 -> 203-209. Co-Authored-By: Claude Opus 4.8 (1M context) --- .claude/plans/docs/05-config-reference.md | 4 ++-- packages/opal-server/opal_server/git_fetcher.py | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/.claude/plans/docs/05-config-reference.md b/.claude/plans/docs/05-config-reference.md index 19caea4c4..e2c204d59 100644 --- a/.claude/plans/docs/05-config-reference.md +++ b/.claude/plans/docs/05-config-reference.md @@ -10,8 +10,8 @@ instantiation — the bare name is what appears in the table). | Env var | Type | Default | Purpose | Declared at | |---|---|---|---|---| -| `OPAL_SCOPES_GIT_FETCH_TIMEOUT` | float (seconds) | `120.0` | Hard timeout for a single scope git clone/fetch. On timeout the operation is abandoned and the scope is marked failed, so one unreachable repo can never block boot or other scopes. `0` = no timeout. | `packages/opal-server/opal_server/config.py:150-156` | -| `OPAL_SCOPES_GIT_MAX_WORKERS` | int | `10` | Size of the dedicated `ThreadPoolExecutor` for scope git operations. Isolating git work keeps a hung fetch from starving bundle serving and other server work that uses the default executor. | `packages/opal-server/opal_server/config.py:157-163` | +| `OPAL_SCOPES_GIT_FETCH_TIMEOUT` | float (seconds) | `120.0` | Hard timeout for a single scope git clone/fetch. On timeout the operation is abandoned and the scope is marked failed, so one unreachable repo can never block boot or other scopes. `0` = no timeout. | `packages/opal-server/opal_server/config.py:196-202` | +| `OPAL_SCOPES_GIT_MAX_WORKERS` | int | `10` | Size of the dedicated `ThreadPoolExecutor` for scope git operations. Isolating git work keeps a hung fetch from starving bundle serving and other server work that uses the default executor. | `packages/opal-server/opal_server/config.py:203-209` | > **Caveat (timeout is soft, not a hard kill).** `OPAL_SCOPES_GIT_FETCH_TIMEOUT` is enforced via > `asyncio.wait_for`, which cancels the *await* — unblocking the event loop and the awaiting diff --git a/packages/opal-server/opal_server/git_fetcher.py b/packages/opal-server/opal_server/git_fetcher.py index 173e506b2..a77a5a4e3 100644 --- a/packages/opal-server/opal_server/git_fetcher.py +++ b/packages/opal-server/opal_server/git_fetcher.py @@ -42,6 +42,10 @@ def _get_git_executor() -> ThreadPoolExecutor: Isolated from the default executor so a hung clone/fetch can never starve bundle serving or other server work. + + ``SCOPES_GIT_MAX_WORKERS`` is read once on first use; the executor is then + cached for the process lifetime — it is not runtime-reconfigurable and is + never explicitly shut down (matches the PR3 design). """ global _git_executor if _git_executor is None: From d31a0b63fcbfb418ea16f3d45a68575d0ee2b8e6 Mon Sep 17 00:00:00 2001 From: David Shoen Date: Wed, 1 Jul 2026 15:21:42 +0300 Subject: [PATCH 8/8] =?UTF-8?q?fix(opal-server):=20harden=20git=20resilien?= =?UTF-8?q?ce=20=E2=80=94=20fork-safety,=20single-flight,=20concurrent=20s?= =?UTF-8?q?ync?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses review findings on PR3 (never stuck on an offline repo): - CRITICAL: reset the dedicated git ThreadPoolExecutor after fork (os.register_at_fork) and shut it down at the end of preload_scopes. A pool built in the pre-fork gunicorn master was inherited with dead worker threads by every worker, so the leader's scope sync stalled forever (silent policy staleness). Verified with a fork repro on 3.12. - HIGH: never use the non-thread-safe pygit2 Repository from two threads. A timed-out clone/fetch keeps running on its pool thread while the per-source_id lock is released; a per-source_id in-flight guard now skips a cycle while a prior op is still lingering. run_in_git_executor switches asyncio.wait_for -> asyncio.wait so a timeout never cancels the future (the thread runs to completion and clears the in-flight marker). - HIGH: sync scopes concurrently, bounded by SCOPES_GIT_MAX_WORKERS, so one unreachable repo no longer serially blocks boot and other scopes. - MEDIUM: daemon-thread pool so a lingering git op can't block interpreter shutdown. - MEDIUM: stamp repos_last_fetched with the fetch start time on success (was completion time, which could wrongly suppress a force_fetch whose req_time falls within an in-flight fetch). - rmtree(ignore_errors) for the abandoned-clone race; harden the env-sensitive config-defaults test; correct config/doc wording ("logged and skipped" instead of "marked failed"). Co-Authored-By: Claude Opus 4.8 (1M context) --- .claude/plans/docs/05-config-reference.md | 21 +- packages/opal-server/opal_server/config.py | 12 +- .../opal-server/opal_server/git_fetcher.py | 219 ++++++++++++++++-- .../opal-server/opal_server/scopes/service.py | 66 ++++-- .../opal-server/opal_server/scopes/task.py | 6 + .../opal_server/tests/git_executor_test.py | 49 +++- 6 files changed, 315 insertions(+), 58 deletions(-) diff --git a/.claude/plans/docs/05-config-reference.md b/.claude/plans/docs/05-config-reference.md index e2c204d59..2ec297e88 100644 --- a/.claude/plans/docs/05-config-reference.md +++ b/.claude/plans/docs/05-config-reference.md @@ -10,12 +10,19 @@ instantiation — the bare name is what appears in the table). | Env var | Type | Default | Purpose | Declared at | |---|---|---|---|---| -| `OPAL_SCOPES_GIT_FETCH_TIMEOUT` | float (seconds) | `120.0` | Hard timeout for a single scope git clone/fetch. On timeout the operation is abandoned and the scope is marked failed, so one unreachable repo can never block boot or other scopes. `0` = no timeout. | `packages/opal-server/opal_server/config.py:196-202` | -| `OPAL_SCOPES_GIT_MAX_WORKERS` | int | `10` | Size of the dedicated `ThreadPoolExecutor` for scope git operations. Isolating git work keeps a hung fetch from starving bundle serving and other server work that uses the default executor. | `packages/opal-server/opal_server/config.py:203-209` | +| `OPAL_SCOPES_GIT_FETCH_TIMEOUT` | float (seconds) | `120.0` | Hard timeout for a single scope git clone/fetch. On timeout the operation is logged and skipped (retried next cycle), so one unreachable repo can never block boot or other scopes *indefinitely*. `0` = no timeout. | `packages/opal-server/opal_server/config.py:196-202` | +| `OPAL_SCOPES_GIT_MAX_WORKERS` | int | `10` | Size of the dedicated `ThreadPoolExecutor` for scope git operations, which also bounds how many scopes are synced concurrently. Isolating git work keeps a hung fetch from starving bundle serving and other server work that uses the default executor. | `packages/opal-server/opal_server/config.py:203-209` | > **Caveat (timeout is soft, not a hard kill).** `OPAL_SCOPES_GIT_FETCH_TIMEOUT` is enforced via -> `asyncio.wait_for`, which cancels the *await* — unblocking the event loop and the awaiting -> coroutine — but the underlying pygit2 call keeps running on its pool thread until the OS network -> timeout. The dedicated pool (`OPAL_SCOPES_GIT_MAX_WORKERS`) bounds and isolates those lingering -> threads so they cannot affect bundle serving or other scopes. Hard-kill via subprocess is out of -> scope. See spec §6. +> `asyncio.wait`, which unblocks the event loop and the awaiting coroutine — but the underlying +> pygit2 call keeps running on its pool thread until the OS network timeout. The dedicated pool +> (`OPAL_SCOPES_GIT_MAX_WORKERS`) bounds and isolates those lingering threads so they cannot affect +> bundle serving or other scopes, and a per-repo in-flight guard prevents a second git op from +> touching the same (non-thread-safe) pygit2 repo while the first is still lingering. Hard-kill via +> subprocess is out of scope. See spec §6. +> +> **Boot / concurrency.** Scope syncs run concurrently, bounded by `OPAL_SCOPES_GIT_MAX_WORKERS`, so +> a slow/offline repo only holds its own slot for up to the timeout rather than serially delaying the +> whole pass. With more offline repos than workers, boot/poll can still take +> `ceil(offline / workers) × timeout`; the pool workers are daemon threads so a lingering op never +> blocks process shutdown. diff --git a/packages/opal-server/opal_server/config.py b/packages/opal-server/opal_server/config.py index 9c25032a1..1e1d47f11 100644 --- a/packages/opal-server/opal_server/config.py +++ b/packages/opal-server/opal_server/config.py @@ -197,15 +197,17 @@ class OpalServerConfig(Confi): "SCOPES_GIT_FETCH_TIMEOUT", 120.0, description="Hard timeout in seconds for a single scope git clone/fetch. " - "On timeout the operation is abandoned and the scope is marked failed so " - "one unreachable repo can never block boot or other scopes (0 = no timeout).", + "On timeout the operation is logged and skipped (retried next cycle), so " + "one unreachable repo can never block boot or other scopes indefinitely " + "(0 = no timeout).", ) SCOPES_GIT_MAX_WORKERS = confi.int( "SCOPES_GIT_MAX_WORKERS", 10, - description="Size of the dedicated thread pool for scope git operations. " - "Isolating git work keeps a hung fetch from starving bundle serving and " - "other server work that uses the default executor.", + description="Size of the dedicated thread pool for scope git operations, " + "which also bounds how many scopes are synced concurrently. Isolating git " + "work keeps a hung fetch from starving bundle serving and other server " + "work that uses the default executor.", ) LEADER_LOCK_FILE_PATH = confi.str( "LEADER_LOCK_FILE_PATH", diff --git a/packages/opal-server/opal_server/git_fetcher.py b/packages/opal-server/opal_server/git_fetcher.py index a77a5a4e3..5795fd0ff 100644 --- a/packages/opal-server/opal_server/git_fetcher.py +++ b/packages/opal-server/opal_server/git_fetcher.py @@ -2,9 +2,13 @@ import codecs import datetime import hashlib +import os import shutil +import threading import time +import weakref from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import thread as cf_thread from functools import partial from pathlib import Path from typing import Optional, cast @@ -36,46 +40,194 @@ _git_executor: Optional[ThreadPoolExecutor] = None +# Source ids whose scope git op (clone/fetch) is still running on a pool thread +# — including one that already exceeded its timeout but whose blocking pygit2 +# call has not yet returned. Guarded by a lock because it is cleared from the +# pool thread (see ``run_in_git_executor``) and read/written from the event +# loop. Used to guarantee at most one live git op per repository, since pygit2 +# ``Repository`` objects are not thread-safe. +_git_busy: set = set() +_git_busy_lock = threading.Lock() + + +class _DaemonThreadPoolExecutor(ThreadPoolExecutor): + """A ``ThreadPoolExecutor`` whose worker threads are daemon threads. + + A scope git op can stay blocked in a libgit2 network call well past our + soft timeout. With the stdlib's non-daemon workers, ``concurrent.futures``' + atexit handler would ``join()`` such a thread and hang interpreter shutdown + until the OS network timeout fires. Daemon workers let the process exit + promptly; abandoning an in-flight fetch at exit is safe (libgit2 stages + objects in a temp pack and swaps refs atomically under lockfiles, and a + half-written clone dir is detected as invalid and re-cloned on next boot). + + Only thread creation is customised, mirroring CPython's + ``_adjust_thread_count``. If a future CPython changes the internals we rely + on, we fall back to the stdlib (non-daemon) behaviour. + """ + + def _adjust_thread_count(self) -> None: # pragma: no cover - thread mgmt + if not hasattr(cf_thread, "_worker") or not hasattr( + cf_thread, "_threads_queues" + ): + return super()._adjust_thread_count() + # If idle threads are available, don't spin up new ones. + if self._idle_semaphore.acquire(timeout=0): + return + + def weakref_cb(_, q=self._work_queue): + q.put(None) + + num_threads = len(self._threads) + if num_threads < self._max_workers: + thread_name = "%s_%d" % (self._thread_name_prefix or self, num_threads) + t = threading.Thread( + name=thread_name, + target=cf_thread._worker, + args=( + weakref.ref(self, weakref_cb), + self._work_queue, + self._initializer, + self._initargs, + ), + daemon=True, + ) + t.start() + self._threads.add(t) + cf_thread._threads_queues[t] = self._work_queue + def _get_git_executor() -> ThreadPoolExecutor: """Lazily build the dedicated pool for scope git operations. - Isolated from the default executor so a hung clone/fetch can never - starve bundle serving or other server work. + Isolated from the default executor so a hung clone/fetch can never starve + bundle serving or other server work. Workers are daemon threads so a + lingering (timed-out) git op cannot block interpreter shutdown. ``SCOPES_GIT_MAX_WORKERS`` is read once on first use; the executor is then - cached for the process lifetime — it is not runtime-reconfigurable and is - never explicitly shut down (matches the PR3 design). + cached for the process lifetime. It is reset after ``fork`` (see + ``_reset_git_executor_after_fork``) because a ``ThreadPoolExecutor`` + inherited across a fork has no live worker threads in the child. """ global _git_executor if _git_executor is None: - _git_executor = ThreadPoolExecutor( + _git_executor = _DaemonThreadPoolExecutor( max_workers=opal_server_config.SCOPES_GIT_MAX_WORKERS, thread_name_prefix="opal-git", ) return _git_executor -async def run_in_git_executor(func, *args, timeout: float, **kwargs): +def shutdown_git_executor() -> None: + """Drop the dedicated git pool and clear in-flight state. + + Called at the end of the pre-fork ``preload_scopes`` so the gunicorn master + does not carry idle git-pool threads (or stale in-flight markers) into the + forked workers. Workers lazily rebuild their own pool on first use. + """ + global _git_executor + executor, _git_executor = _git_executor, None + if executor is not None: + executor.shutdown(wait=False, cancel_futures=True) + with _git_busy_lock: + _git_busy.clear() + + +def _reset_git_executor_after_fork() -> None: + """Reset the git pool + in-flight state in a freshly forked child. + + A ``ThreadPoolExecutor`` created in the parent is inherited *broken* by the + child: its worker threads do not survive ``fork``, yet its bookkeeping makes + ``_adjust_thread_count`` skip spawning live workers, so every submitted task + would sit queued forever. Dropping the reference forces the child to build + its own working pool. The in-flight markers are stale in the child too (no + thread will ever clear them), so clear them. + """ + global _git_executor + _git_executor = None + with _git_busy_lock: + _git_busy.clear() + + +if hasattr(os, "register_at_fork"): + os.register_at_fork(after_in_child=_reset_git_executor_after_fork) + + +def _mark_git_op_started(key: str) -> None: + with _git_busy_lock: + _git_busy.add(key) + + +def _mark_git_op_done(key: str) -> None: + with _git_busy_lock: + _git_busy.discard(key) + + +def git_op_in_flight(key: str) -> bool: + """True while a git op for ``key`` is still running on a pool thread. + + Stays True during the "lingering" window after a timeout, until the + blocking pygit2 call actually returns. + """ + with _git_busy_lock: + return key in _git_busy + + +def _consume_future_result(fut) -> None: + # A future left running after its awaiter timed out is never awaited again; + # retrieve its outcome so asyncio doesn't log "exception never retrieved". + if not fut.cancelled(): + try: + fut.exception() + except Exception: + pass + + +async def run_in_git_executor(func, *args, timeout: float, busy_key=None, **kwargs): """Run a blocking git call on the dedicated pool with a hard timeout. - Raises ``TimeoutError`` (via ``asyncio.wait_for``) when the call exceeds - ``timeout`` seconds. ``timeout <= 0`` means no limit. NOTE: the timeout - unblocks the event loop and the awaiting coroutine, but the underlying - pygit2 call keeps running on its pool thread until the OS network timeout; - the dedicated pool keeps that lingering thread isolated. + Raises the builtin ``TimeoutError`` when the call exceeds ``timeout`` + seconds (``timeout <= 0`` means no limit). NOTE: the timeout unblocks the + event loop and the awaiting coroutine, but the underlying pygit2 call keeps + running on its pool thread until the OS network timeout; the dedicated pool + keeps that lingering thread isolated. + + When ``busy_key`` is given it is marked in-flight for the *entire real + duration* of the call — including any lingering time after a timeout — and + cleared only when the blocking call actually returns (on the pool thread). + Callers use ``git_op_in_flight`` to avoid starting a second git op against + the same repository while a timed-out one is still running. """ loop = asyncio.get_running_loop() - fut = loop.run_in_executor(_get_git_executor(), partial(func, *args, **kwargs)) - if timeout and timeout > 0: + + def _runner(): try: - return await asyncio.wait_for(fut, timeout=timeout) - except asyncio.TimeoutError as exc: - # On Python < 3.11 ``asyncio.TimeoutError`` is a distinct class from - # the builtin ``TimeoutError``; normalize so callers (and the - # documented contract) can rely on the builtin everywhere. - raise TimeoutError() from exc - return await fut + return func(*args, **kwargs) + finally: + if busy_key is not None: + _mark_git_op_done(busy_key) + + if busy_key is not None: + _mark_git_op_started(busy_key) + try: + fut = loop.run_in_executor(_get_git_executor(), _runner) + except BaseException: + if busy_key is not None: + _mark_git_op_done(busy_key) + raise + + if not (timeout and timeout > 0): + return await fut + + # Use asyncio.wait (not wait_for) so a timeout does NOT cancel the future: + # the pool thread runs to completion and clears busy_key, and a still-queued + # task isn't silently dropped. The done-callback retrieves the eventual + # result to avoid an "exception never retrieved" warning. + fut.add_done_callback(_consume_future_result) + done, _pending = await asyncio.wait({fut}, timeout=timeout) + if not done: + raise TimeoutError(f"git operation exceeded {timeout}s") + return fut.result() class PolicyFetcherCallbacks: @@ -213,6 +365,17 @@ async def fetch_and_notify_on_changes( """ repo_lock = await self._get_repo_lock() async with repo_lock: + if git_op_in_flight(self._source_id): + # A previous git op for this repo exceeded its timeout and is + # still running on a pool thread. pygit2 Repository objects are + # not thread-safe, so skip this cycle rather than touch the same + # repo concurrently; the next cycle retries once it finishes. + logger.warning( + "Skipping sync for {url}: a previous git operation is still " + "running after its timeout.", + url=self._source.url, + ) + return with tracer.trace( "git_policy_fetcher.fetch_and_notify_on_changes", resource=self._scope_id, @@ -231,11 +394,13 @@ async def fetch_and_notify_on_changes( logger.debug( f"Fetching remote (force_fetch={force_fetch}): {self._remote} ({self._source.url})" ) + started = datetime.datetime.now() try: await run_in_git_executor( repo.remotes[self._remote].fetch, callbacks=self._auth_callbacks, timeout=opal_server_config.SCOPES_GIT_FETCH_TIMEOUT, + busy_key=self._source_id, ) except TimeoutError as exc: # Expected when a repo is unreachable: log cleanly @@ -248,11 +413,13 @@ async def fetch_and_notify_on_changes( err=repr(exc), ) return - # Only mark as fetched after the fetch actually - # succeeds. + # Record the fetch *start* time, but only now that it + # has succeeded: a completion timestamp could wrongly + # suppress a force_fetch whose req_time falls within an + # in-flight fetch (see _was_fetched_after). GitPolicyFetcher.repos_last_fetched[ self._source_id - ] = datetime.datetime.now() + ] = started logger.debug(f"Fetch completed: {self._source.url}") # New commits might be present because of a previous fetch made by another scope @@ -263,7 +430,10 @@ async def fetch_and_notify_on_changes( logger.warning( "Deleting invalid repo: {path}", path=self._repo_path ) - shutil.rmtree(self._repo_path) + # ignore_errors: a partial dir left by an abandoned + # (timed-out) clone may still be written to; a failed + # delete self-heals via the clone below (or next cycle). + shutil.rmtree(self._repo_path, ignore_errors=True) else: logger.info("Repo not found at {path}", path=self._repo_path) @@ -287,6 +457,7 @@ async def _clone(self): str(self._repo_path), callbacks=self._auth_callbacks, timeout=opal_server_config.SCOPES_GIT_FETCH_TIMEOUT, + busy_key=self._source_id, ) except (pygit2.GitError, TimeoutError) as exc: logger.error( diff --git a/packages/opal-server/opal_server/scopes/service.py b/packages/opal-server/opal_server/scopes/service.py index d3df4972f..8aef8d630 100644 --- a/packages/opal-server/opal_server/scopes/service.py +++ b/packages/opal-server/opal_server/scopes/service.py @@ -1,3 +1,4 @@ +import asyncio import datetime import shutil from functools import partial @@ -12,6 +13,7 @@ from opal_common.schemas.policy import PolicyUpdateMessageNotification from opal_common.schemas.policy_source import GitPolicyScopeSource from opal_common.topics.publisher import ScopedServerSideTopicPublisher +from opal_server.config import opal_server_config from opal_server.git_fetcher import GitPolicyFetcher, PolicyFetcherCallbacks from opal_server.policy.watcher.callbacks import ( create_policy_update, @@ -194,34 +196,58 @@ async def sync_scopes(self, only_poll_updates=False, notify_on_changes=True): f"OPAL Scopes: syncing {len(scopes)} scopes in the background (polling updates: {only_poll_updates})" ) - fetched_source_ids = set() - skipped_scopes = [] + # Partition into distinct repos (cloned/fetched once, with priority + # so every repo is pulled asap) and the scopes that merely reuse an + # already-handled repo (checked for changes only). + unique_scopes = [] + duplicate_scopes = [] + seen_source_ids = set() for scope in scopes: src_id = GitPolicyFetcher.source_id(scope.policy) + if src_id in seen_source_ids: + duplicate_scopes.append(scope) + else: + seen_source_ids.add(src_id) + unique_scopes.append(scope) + + # Bound concurrency to the dedicated git pool so one unreachable repo + # only stalls its own slot (for the fetch timeout), not the whole + # boot/poll pass. Phase 1 clones/fetches every distinct repo; phase 2 + # then checks the duplicates against those now-present repos. + semaphore = asyncio.Semaphore( + max(1, opal_server_config.SCOPES_GIT_MAX_WORKERS) + ) + await self._sync_scopes_concurrently( + unique_scopes, + semaphore, + force_fetch=True, + notify_on_changes=notify_on_changes, + ) + await self._sync_scopes_concurrently( + duplicate_scopes, + semaphore, + force_fetch=False, + notify_on_changes=notify_on_changes, + ) - # Give priority to scopes that have a unique url per shard (so we'll clone all repos asap) - if src_id in fetched_source_ids: - skipped_scopes.append(scope) - continue - - try: - await self.sync_scope( - scope=scope, - force_fetch=True, - notify_on_changes=notify_on_changes, - ) - except Exception as e: - logger.exception(f"sync_scope failed for {scope.scope_id}") + async def _sync_scopes_concurrently( + self, scopes, semaphore, *, force_fetch, notify_on_changes + ): + """Sync ``scopes`` concurrently, bounded by ``semaphore``. - fetched_source_ids.add(src_id) + Each scope's failure is logged and isolated so one bad repo + never fails the whole pass. + """ - for scope in skipped_scopes: - # No need to refetch the same repo, just check for changes + async def _sync_one(scope): + async with semaphore: try: await self.sync_scope( scope=scope, - force_fetch=False, + force_fetch=force_fetch, notify_on_changes=notify_on_changes, ) - except Exception as e: + except Exception: logger.exception(f"sync_scope failed for {scope.scope_id}") + + await asyncio.gather(*(_sync_one(scope) for scope in scopes)) diff --git a/packages/opal-server/opal_server/scopes/task.py b/packages/opal-server/opal_server/scopes/task.py index 83b2b10f0..10ac85b74 100644 --- a/packages/opal-server/opal_server/scopes/task.py +++ b/packages/opal-server/opal_server/scopes/task.py @@ -6,6 +6,7 @@ from fastapi_websocket_pubsub import Topic from opal_common.logger import logger from opal_server.config import opal_server_config +from opal_server.git_fetcher import shutdown_git_executor from opal_server.policy.watcher.task import BasePolicyWatcherTask from opal_server.redis_utils import RedisDB from opal_server.scopes.scope_repository import ScopeRepository @@ -82,4 +83,9 @@ def preload_scopes(): ) asyncio.run(service.sync_scopes(notify_on_changes=False)) + # Release the git pool built during preload so the gunicorn master + # does not carry pool threads into forked workers (they would be + # dead in the child). Workers rebuild their own pool on first use. + shutdown_git_executor() + logger.warning("Finished preloading repo clones for scopes.") diff --git a/packages/opal-server/opal_server/tests/git_executor_test.py b/packages/opal-server/opal_server/tests/git_executor_test.py index e1c534c80..24c83fbaf 100644 --- a/packages/opal-server/opal_server/tests/git_executor_test.py +++ b/packages/opal-server/opal_server/tests/git_executor_test.py @@ -1,11 +1,16 @@ +import asyncio +import threading import time import pytest from opal_server.config import OpalServerConfig -from opal_server.git_fetcher import run_in_git_executor +from opal_server.git_fetcher import git_op_in_flight, run_in_git_executor -def test_git_resilience_config_defaults(): +def test_git_resilience_config_defaults(monkeypatch): + # Don't let an ambient OPAL_* env var in CI/dev shadow the declared defaults. + monkeypatch.delenv("OPAL_SCOPES_GIT_FETCH_TIMEOUT", raising=False) + monkeypatch.delenv("OPAL_SCOPES_GIT_MAX_WORKERS", raising=False) clean = OpalServerConfig(prefix="OPAL_") assert clean.SCOPES_GIT_FETCH_TIMEOUT == 120.0 assert clean.SCOPES_GIT_MAX_WORKERS == 10 @@ -27,3 +32,43 @@ async def test_run_in_git_executor_times_out(): async def test_zero_timeout_means_no_limit(): result = await run_in_git_executor(lambda: "ok", timeout=0) assert result == "ok" + + +def test_git_op_in_flight_false_for_unknown_key(): + assert git_op_in_flight("no-such-source") is False + + +@pytest.mark.asyncio +async def test_busy_key_stays_in_flight_until_call_returns(): + """A timed-out op must remain 'in flight' until its blocking call actually + returns, so a second op for the same repo is not started concurrently.""" + started = threading.Event() + release = threading.Event() + key = "busy-source-id" + + def _block(): + started.set() + release.wait(5) + + # The op exceeds its timeout but keeps running on the pool thread. + with pytest.raises(TimeoutError): + await run_in_git_executor(_block, timeout=0.1, busy_key=key) + + assert started.wait(2) + # Still lingering on the pool thread -> guarded as in-flight. + assert git_op_in_flight(key) is True + + # Once the blocking call returns, the marker clears (from the pool thread). + release.set() + for _ in range(200): + if not git_op_in_flight(key): + break + await asyncio.sleep(0.01) + assert git_op_in_flight(key) is False + + +@pytest.mark.asyncio +async def test_busy_key_cleared_after_success(): + key = "ok-source-id" + assert await run_in_git_executor(lambda: 1, timeout=5, busy_key=key) == 1 + assert git_op_in_flight(key) is False