Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .claude/plans/docs/05-config-reference.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# 05 — OPAL Config Reference (private)

Private, internal supplement to the public operator docs under `documentation/docs/`. Tracks
`OPAL_*` env vars added or clarified by the OPAL Server Git Fixes work, with the declaring
`file:line` so contributors can jump straight to the `Confi` declaration. Each key maps to an
`OPAL_<NAME>` env var (the `OPAL_` prefix is added once by the component's `Confi(prefix="OPAL_")`
instantiation — the bare name is what appears in the table).

## 4. opal-server keys

| Env var | Type | Default | Purpose | Declared at |
|---|---|---|---|---|
| `OPAL_SCOPES_GIT_FETCH_TIMEOUT` | float (seconds) | `120.0` | Hard timeout for a single scope git clone/fetch. On timeout the operation is logged and skipped (retried next cycle), so one unreachable repo can never block boot or other scopes *indefinitely*. `0` = no timeout. | `packages/opal-server/opal_server/config.py:196-202` |
| `OPAL_SCOPES_GIT_MAX_WORKERS` | int | `10` | Size of the dedicated `ThreadPoolExecutor` for scope git operations, which also bounds how many scopes are synced concurrently. Isolating git work keeps a hung fetch from starving bundle serving and other server work that uses the default executor. | `packages/opal-server/opal_server/config.py:203-209` |

> **Caveat (timeout is soft, not a hard kill).** `OPAL_SCOPES_GIT_FETCH_TIMEOUT` is enforced via
> `asyncio.wait`, which unblocks the event loop and the awaiting coroutine — but the underlying
> pygit2 call keeps running on its pool thread until the OS network timeout. The dedicated pool
> (`OPAL_SCOPES_GIT_MAX_WORKERS`) bounds and isolates those lingering threads so they cannot affect
> bundle serving or other scopes, and a per-repo in-flight guard prevents a second git op from
> touching the same (non-thread-safe) pygit2 repo while the first is still lingering. Hard-kill via
> subprocess is out of scope. See spec §6.
>
> **Boot / concurrency.** Scope syncs run concurrently, bounded by `OPAL_SCOPES_GIT_MAX_WORKERS`, so
> a slow/offline repo only holds its own slot for up to the timeout rather than serially delaying the
> whole pass. With more offline repos than workers, boot/poll can still take
> `ceil(offline / workers) × timeout`; the pool workers are daemon threads so a lingering op never
> blocks process shutdown.
16 changes: 16 additions & 0 deletions packages/opal-server/opal_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,22 @@ class OpalServerConfig(Confi):
0,
description="The timeout for cloning the policy repository (0 means wait forever)",
)
SCOPES_GIT_FETCH_TIMEOUT = confi.float(
"SCOPES_GIT_FETCH_TIMEOUT",
120.0,
description="Hard timeout in seconds for a single scope git clone/fetch. "
"On timeout the operation is logged and skipped (retried next cycle), so "
"one unreachable repo can never block boot or other scopes indefinitely "
"(0 = no timeout).",
)
SCOPES_GIT_MAX_WORKERS = confi.int(
"SCOPES_GIT_MAX_WORKERS",
10,
description="Size of the dedicated thread pool for scope git operations, "
"which also bounds how many scopes are synced concurrently. Isolating git "
"work keeps a hung fetch from starving bundle serving and other server "
"work that uses the default executor.",
)
LEADER_LOCK_FILE_PATH = confi.str(
"LEADER_LOCK_FILE_PATH",
"/tmp/opal_server_leader.lock",
Expand Down
255 changes: 245 additions & 10 deletions packages/opal-server/opal_server/git_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@
import codecs
import datetime
import hashlib
import os
import shutil
import threading
import time
import weakref
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import thread as cf_thread
from functools import partial
from pathlib import Path
from typing import Optional, cast

import aiofiles.os
import pygit2
from ddtrace import tracer
from git import Repo
from opal_common.async_utils import run_sync
from opal_common.git_utils.bundle_maker import BundleMaker
from opal_common.logger import logger
from opal_common.schemas.policy import PolicyBundle
Expand All @@ -33,6 +38,197 @@
reference_is_valid_name,
)

_git_executor: Optional[ThreadPoolExecutor] = None

# Source ids whose scope git op (clone/fetch) is still running on a pool thread
# — including one that already exceeded its timeout but whose blocking pygit2
# call has not yet returned. Guarded by a lock because it is cleared from the
# pool thread (see ``run_in_git_executor``) and read/written from the event
# loop. Used to guarantee at most one live git op per repository, since pygit2
# ``Repository`` objects are not thread-safe.
_git_busy: set = set()
_git_busy_lock = threading.Lock()


class _DaemonThreadPoolExecutor(ThreadPoolExecutor):
"""A ``ThreadPoolExecutor`` whose worker threads are daemon threads.

A scope git op can stay blocked in a libgit2 network call well past our
soft timeout. With the stdlib's non-daemon workers, ``concurrent.futures``'
atexit handler would ``join()`` such a thread and hang interpreter shutdown
until the OS network timeout fires. Daemon workers let the process exit
promptly; abandoning an in-flight fetch at exit is safe (libgit2 stages
objects in a temp pack and swaps refs atomically under lockfiles, and a
half-written clone dir is detected as invalid and re-cloned on next boot).

Only thread creation is customised, mirroring CPython's
``_adjust_thread_count``. If a future CPython changes the internals we rely
on, we fall back to the stdlib (non-daemon) behaviour.
"""

def _adjust_thread_count(self) -> None: # pragma: no cover - thread mgmt
if not hasattr(cf_thread, "_worker") or not hasattr(
cf_thread, "_threads_queues"
):
return super()._adjust_thread_count()
# If idle threads are available, don't spin up new ones.
if self._idle_semaphore.acquire(timeout=0):
return

def weakref_cb(_, q=self._work_queue):
q.put(None)

num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = "%s_%d" % (self._thread_name_prefix or self, num_threads)
t = threading.Thread(
name=thread_name,
target=cf_thread._worker,
args=(
weakref.ref(self, weakref_cb),
self._work_queue,
self._initializer,
self._initargs,
),
daemon=True,
)
t.start()
self._threads.add(t)
cf_thread._threads_queues[t] = self._work_queue


def _get_git_executor() -> ThreadPoolExecutor:
"""Lazily build the dedicated pool for scope git operations.

Isolated from the default executor so a hung clone/fetch can never starve
bundle serving or other server work. Workers are daemon threads so a
lingering (timed-out) git op cannot block interpreter shutdown.

``SCOPES_GIT_MAX_WORKERS`` is read once on first use; the executor is then
cached for the process lifetime. It is reset after ``fork`` (see
``_reset_git_executor_after_fork``) because a ``ThreadPoolExecutor``
inherited across a fork has no live worker threads in the child.
"""
global _git_executor
if _git_executor is None:
_git_executor = _DaemonThreadPoolExecutor(
max_workers=opal_server_config.SCOPES_GIT_MAX_WORKERS,
thread_name_prefix="opal-git",
)
return _git_executor


def shutdown_git_executor() -> None:
"""Drop the dedicated git pool and clear in-flight state.

Called at the end of the pre-fork ``preload_scopes`` so the gunicorn master
does not carry idle git-pool threads (or stale in-flight markers) into the
forked workers. Workers lazily rebuild their own pool on first use.
"""
global _git_executor
executor, _git_executor = _git_executor, None
if executor is not None:
executor.shutdown(wait=False, cancel_futures=True)
with _git_busy_lock:
_git_busy.clear()


def _reset_git_executor_after_fork() -> None:
"""Reset the git pool + in-flight state in a freshly forked child.

A ``ThreadPoolExecutor`` created in the parent is inherited *broken* by the
child: its worker threads do not survive ``fork``, yet its bookkeeping makes
``_adjust_thread_count`` skip spawning live workers, so every submitted task
would sit queued forever. Dropping the reference forces the child to build
its own working pool. The in-flight markers are stale in the child too (no
thread will ever clear them), so clear them.
"""
global _git_executor
_git_executor = None
with _git_busy_lock:
_git_busy.clear()


if hasattr(os, "register_at_fork"):
os.register_at_fork(after_in_child=_reset_git_executor_after_fork)


def _mark_git_op_started(key: str) -> None:
with _git_busy_lock:
_git_busy.add(key)


def _mark_git_op_done(key: str) -> None:
with _git_busy_lock:
_git_busy.discard(key)


def git_op_in_flight(key: str) -> bool:
"""True while a git op for ``key`` is still running on a pool thread.

Stays True during the "lingering" window after a timeout, until the
blocking pygit2 call actually returns.
"""
with _git_busy_lock:
return key in _git_busy


def _consume_future_result(fut) -> None:
# A future left running after its awaiter timed out is never awaited again;
# retrieve its outcome so asyncio doesn't log "exception never retrieved".
if not fut.cancelled():
try:
fut.exception()
except Exception:
pass


async def run_in_git_executor(func, *args, timeout: float, busy_key=None, **kwargs):
"""Run a blocking git call on the dedicated pool with a hard timeout.

Raises the builtin ``TimeoutError`` when the call exceeds ``timeout``
seconds (``timeout <= 0`` means no limit). NOTE: the timeout unblocks the
event loop and the awaiting coroutine, but the underlying pygit2 call keeps
running on its pool thread until the OS network timeout; the dedicated pool
keeps that lingering thread isolated.

When ``busy_key`` is given it is marked in-flight for the *entire real
duration* of the call — including any lingering time after a timeout — and
cleared only when the blocking call actually returns (on the pool thread).
Callers use ``git_op_in_flight`` to avoid starting a second git op against
the same repository while a timed-out one is still running.
"""
loop = asyncio.get_running_loop()

def _runner():
try:
return func(*args, **kwargs)
finally:
if busy_key is not None:
_mark_git_op_done(busy_key)

if busy_key is not None:
_mark_git_op_started(busy_key)
try:
fut = loop.run_in_executor(_get_git_executor(), _runner)
except BaseException:
if busy_key is not None:
_mark_git_op_done(busy_key)
raise

if not (timeout and timeout > 0):
return await fut

# Use asyncio.wait (not wait_for) so a timeout does NOT cancel the future:
# the pool thread runs to completion and clears busy_key, and a still-queued
# task isn't silently dropped. The done-callback retrieves the eventual
# result to avoid an "exception never retrieved" warning.
fut.add_done_callback(_consume_future_result)
done, _pending = await asyncio.wait({fut}, timeout=timeout)
if not done:
raise TimeoutError(f"git operation exceeded {timeout}s")
return fut.result()


class PolicyFetcherCallbacks:
async def on_update(self, old_head: Optional[str], head: str):
Expand Down Expand Up @@ -169,6 +365,17 @@ async def fetch_and_notify_on_changes(
"""
repo_lock = await self._get_repo_lock()
async with repo_lock:
if git_op_in_flight(self._source_id):
# A previous git op for this repo exceeded its timeout and is
# still running on a pool thread. pygit2 Repository objects are
# not thread-safe, so skip this cycle rather than touch the same
# repo concurrently; the next cycle retries once it finishes.
logger.warning(
"Skipping sync for {url}: a previous git operation is still "
"running after its timeout.",
url=self._source.url,
)
return
with tracer.trace(
"git_policy_fetcher.fetch_and_notify_on_changes",
resource=self._scope_id,
Expand All @@ -187,13 +394,32 @@ async def fetch_and_notify_on_changes(
logger.debug(
f"Fetching remote (force_fetch={force_fetch}): {self._remote} ({self._source.url})"
)
started = datetime.datetime.now()
try:
await run_in_git_executor(
repo.remotes[self._remote].fetch,
callbacks=self._auth_callbacks,
timeout=opal_server_config.SCOPES_GIT_FETCH_TIMEOUT,
busy_key=self._source_id,
)
except TimeoutError as exc:
# Expected when a repo is unreachable: log cleanly
# (no traceback) and skip, matching the clone path.
# repos_last_fetched stays stale so the next cycle
# retries and force_fetch is not wrongly suppressed.
logger.error(
"Timed out fetching {url}, skipping: {err}",
url=self._source.url,
err=repr(exc),
)
return
# Record the fetch *start* time, but only now that it
# has succeeded: a completion timestamp could wrongly
# suppress a force_fetch whose req_time falls within an
# in-flight fetch (see _was_fetched_after).
GitPolicyFetcher.repos_last_fetched[
self._source_id
] = datetime.datetime.now()
await run_sync(
repo.remotes[self._remote].fetch,
callbacks=self._auth_callbacks,
)
] = started
logger.debug(f"Fetch completed: {self._source.url}")

# New commits might be present because of a previous fetch made by another scope
Expand All @@ -204,7 +430,10 @@ async def fetch_and_notify_on_changes(
logger.warning(
"Deleting invalid repo: {path}", path=self._repo_path
)
shutil.rmtree(self._repo_path)
# ignore_errors: a partial dir left by an abandoned
# (timed-out) clone may still be written to; a failed
# delete self-heals via the clone below (or next cycle).
shutil.rmtree(self._repo_path, ignore_errors=True)
else:
logger.info("Repo not found at {path}", path=self._repo_path)

Expand All @@ -222,14 +451,20 @@ async def _clone(self):
path=self._repo_path,
)
try:
repo: Repository = await run_sync(
repo: Repository = await run_in_git_executor(
clone_repository,
self._source.url,
str(self._repo_path),
callbacks=self._auth_callbacks,
timeout=opal_server_config.SCOPES_GIT_FETCH_TIMEOUT,
busy_key=self._source_id,
)
except (pygit2.GitError, TimeoutError) as exc:
logger.error(
"Could not clone repo at {url}: {err}",
url=self._source.url,
err=repr(exc),
)
except pygit2.GitError:
logger.exception(f"Could not clone repo at {self._source.url}")
else:
logger.info(f"Clone completed: {self._source.url}")
await self._notify_on_changes(repo)
Expand Down
Loading
Loading