diff --git a/.claude/plans/docs/05-config-reference.md b/.claude/plans/docs/05-config-reference.md new file mode 100644 index 000000000..3b331f56b --- /dev/null +++ b/.claude/plans/docs/05-config-reference.md @@ -0,0 +1,22 @@ +# 05 — OPAL Config Reference (private) + +Private, internal supplement to the public operator docs under `documentation/docs/`. Tracks +`OPAL_*` env vars added or clarified by the OPAL Server Git Fixes work, with the declaring +`file:line` so contributors can jump straight to the `Confi` declaration. Each key maps to an +`OPAL_` env var (the `OPAL_` prefix is added once by the component's `Confi(prefix="OPAL_")` +instantiation — the bare name is what appears in the table). + +## 4. opal-server keys + +| Env var | Type | Default | Purpose | Declared at | +|---|---|---|---|---| +| `OPAL_SCOPES_GIT_FETCH_TIMEOUT` | float (seconds) | `120.0` | Hard timeout for a single scope git clone/fetch. On timeout the operation is abandoned and the scope is marked failed, so one unreachable repo can never block boot or other scopes. `0` = no timeout. | `packages/opal-server/opal_server/config.py:150-156` | +| `OPAL_SCOPES_GIT_MAX_WORKERS` | int | `10` | Size of the dedicated `ThreadPoolExecutor` for scope git operations. Isolating git work keeps a hung fetch from starving bundle serving and other server work that uses the default executor. | `packages/opal-server/opal_server/config.py:157-163` | +| `OPAL_SCOPES_SYNC_CONCURRENCY` | int | `10` | Maximum number of scope git repositories fetched concurrently during `sync_scopes` (boot and periodic). Bounds parallel loading so a fleet of repos loads ~N at a time instead of serially. Higher is faster but uses more memory, file descriptors, and network connections at once; tune down for many pods × many sources. | `packages/opal-server/opal_server/config.py:438-444` | + +> **Caveat (timeout is soft, not a hard kill).** `OPAL_SCOPES_GIT_FETCH_TIMEOUT` is enforced via +> `asyncio.wait_for`, which cancels the *await* — unblocking the event loop and the awaiting +> coroutine — but the underlying pygit2 call keeps running on its pool thread until the OS network +> timeout. The dedicated pool (`OPAL_SCOPES_GIT_MAX_WORKERS`) bounds and isolates those lingering +> threads so they cannot affect bundle serving or other scopes. Hard-kill via subprocess is out of +> scope. See spec §6. diff --git a/.gitignore b/.gitignore index 77faef34a..c3224ec1c 100644 --- a/.gitignore +++ b/.gitignore @@ -137,3 +137,6 @@ dmypy.json *.iml .DS_Store + +# Private Claude Code working artifacts (plans/specs) — never commit +.claude/ diff --git a/app-tests/git-leak/README.md b/app-tests/git-leak/README.md new file mode 100644 index 000000000..aabbb4657 --- /dev/null +++ b/app-tests/git-leak/README.md @@ -0,0 +1,71 @@ +# OPAL git-leak / resilience test bed + +Reproduces (as failing tests) the four issues fixed by PR2–PR5: memory leak, +offline-repo hang, slow serial boot, broadcaster no-reconnect. + +Every assertion is driven through `GET /internal/git-fetcher-cache-stats`, which +**this PR (PR1) adds** — it does not exist on `master`. So the suite runs against +*this branch*: the leak/offline tests fail here *until PR2/PR3 land*, then go +green. Run against true `master` they would all error at setup on the missing +endpoint, not "fail for the targeted bug." + +## Stack +- `opal_server` (single worker, scopes on, Postgres broadcaster, built from `docker/Dockerfile`) +- `redis`, `postgres`, `gitea` (+ one-shot `gitea-admin` and `seed` sidecars) +- `blackhole` (alpine/socat: accepts TCP then never answers — the offline repo) + +Only `opal_server` (`:7002`) and `gitea` (`:13000` on the host) are published; +Postgres and `blackhole` are internal to the compose network. + +## Helpers (`helpers.py`) +- `OpalServerClient` — drive opal over HTTP (`stats`, `put_scope`, `delete_scope`, + `refresh_all`, `get_scope_policy`, `list_scope_ids`, `delete_all_scopes`). +- `GiteaAdmin` — host-side Gitea admin client (`list_repos`, `repo_exists`, + `create_repo`, `delete_repo`); also exposed as the `gitea_admin` pytest fixture. +- `make_repo_unreachable(name)` — git URL on the `blackhole` sidecar (completes + the TCP handshake, never answers) so the clone hangs for the offline-repo test. +- `bounce_postgres(down_seconds)` — stop Postgres, then `up -d --wait` it back to + simulate a broadcaster outage and await readiness before the recovery poll. + +## Run +```bash +cd app-tests/git-leak +python -m pytest -v --boot-scopes=50 # full set +python -m pytest test_leak.py -v --boot-scopes=20 # just the leak gates +``` +Useful flags: `--boot-scopes=N` (any N), `--keep-stack` (skip teardown), +env `BOOT_TARGET_SECONDS=120` (tighten the boot gate). + +## Expected behavior +The churn leak test (`test_churn_releases_caches`) and the offline-repo test +FAIL on this branch *without the PR2/PR3 fix* — they target unfixed bugs and +become the regression gates for PR2/PR3, flipping green when those land. The +boot test passes but fails when `BOOT_TARGET_SECONDS` is set low (PR4's gate). + +Two tests are guards that PASS rather than reproducing a current failure: +- `test_repeat_sync_does_not_grow` — clone paths are keyed by the repo URL, so + re-syncing identical scopes reuses cache entries and the cache *counts* can't + grow for any implementation; the load-bearing assertion is therefore on RSS, + guarding against a regression that leaks per-sync allocations. +- `test_server_recovers_after_postgres_bounce` — when the broadcaster drops, the + worker is respawned by gunicorn and the broadcaster reconnects once Postgres + is back; the test PUTs a fresh scope post-bounce and asserts it syncs, proving + the broadcast path (not just HTTP) recovered. + +## Requires +Docker + docker compose v2, plus host Python with `pytest pytest-timeout requests GitPython`. + +## Notes +- Auth is disabled in the stack: `OPAL_AUTH_PUBLIC_KEY` is left unset so the JWT + verifier is disabled and the harness can call scope routes without minting JWTs. + Local test bed only; never a production setting. (The `/internal` endpoint is + registered with the same `JWTAuthenticator` dependency as the other routes, so + it is protected when JWT verification is enabled and open only here.) +- The server runs a **single** uvicorn worker. The `GitPolicyFetcher` caches read + by `/internal/git-fetcher-cache-stats` are per-process, so a multi-worker stack + would make a round-robin read miss the worker that fetched and let a `== 0` + drain assertion pass falsely. One worker makes every cache read deterministic; + the leak/boot/offline bugs all reproduce single-worker. +- First-sync of a fresh scope takes the clone path, which fills only `repo_locks`; + `repos` / `repos_last_fetched` are filled by the discover/fetch path on a second + sync, so the load helpers issue a `refresh_all()` before asserting on `repos`. diff --git a/app-tests/git-leak/conftest.py b/app-tests/git-leak/conftest.py new file mode 100644 index 000000000..446376e3f --- /dev/null +++ b/app-tests/git-leak/conftest.py @@ -0,0 +1,86 @@ +import os +import shutil + +import pytest +from helpers import ( + HEALTHY_PROBE_REPO, + GiteaAdmin, + OpalServerClient, + compose, + list_seeded_repos, +) + + +def pytest_addoption(parser): + parser.addoption( + "--boot-scopes", + action="store", + default="50", + help="number of repos to seed/boot (default 50)", + ) + parser.addoption( + "--keep-stack", + action="store_true", + default=False, + help="do not tear the compose stack down after the run", + ) + + +@pytest.fixture(scope="session") +def repo_count(request) -> int: + return int(request.config.getoption("--boot-scopes")) + + +@pytest.fixture(scope="session") +def stack(request, repo_count): + # Defense-in-depth: this docker-compose suite is already excluded from the + # repo's default `pytest` run via `testpaths = packages` in pytest.ini, so + # the unit-test CI matrix never collects it. If it is ever collected in an + # environment without docker, skip cleanly instead of erroring. + if shutil.which("docker") is None: + pytest.skip("docker (compose) is required for the git-leak test bed") + os.environ["REPO_COUNT"] = str(repo_count) + # build + start infra; seed runs to completion then exits + compose("up", "-d", "--build") + # block until seeding sidecar has finished creating repos. compose() raises + # (with output) if the seed container exited non-zero, so a hard seed + # failure surfaces here rather than as a confusing later test failure. + compose("wait", "seed") + # Verify the seed actually produced all N repos before any test runs: a + # partial seed would otherwise look like a server bug when the load gate + # can't reach N. Fail loudly with the gap. + # include the reserved probe repo the resilience test relies on, so a + # partial seed of it is caught here too rather than as a later test failure + expected = set(list_seeded_repos(repo_count)) | {HEALTHY_PROBE_REPO} + present = set(GiteaAdmin().list_repos()) + missing = expected - present + assert not missing, ( + f"seed incomplete: {len(missing)}/{repo_count} repos missing " + f"(e.g. {sorted(missing)[:5]})" + ) + client = OpalServerClient() + client.wait_healthy() + yield client + if not request.config.getoption("--keep-stack"): + compose("down", "-v") + + +@pytest.fixture() +def opal(stack) -> OpalServerClient: + # The compose stack is session-scoped (one server for the whole run), but + # scopes must not leak between tests: clone paths are keyed by repo URL, so + # a scope left behind by one test shares a cache entry with any later test + # using the same seeded repo and would pollute its drain assertions. + # + # Delete every scope the *server* currently knows (not just this client's + # tracked set) at setup, so a scope orphaned by a prior failed test can't + # contaminate this one; then again on teardown. + stack.delete_all_scopes() + yield stack + stack.delete_all_scopes() + + +@pytest.fixture(scope="session") +def gitea_admin(stack) -> GiteaAdmin: + """Host-side Gitea admin client (depends on `stack` so Gitea is up).""" + return GiteaAdmin() diff --git a/app-tests/git-leak/docker-compose.yml b/app-tests/git-leak/docker-compose.yml new file mode 100644 index 000000000..a863faed4 --- /dev/null +++ b/app-tests/git-leak/docker-compose.yml @@ -0,0 +1,126 @@ +name: opal-git-leak-test + +services: + redis: + image: redis:7-alpine + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 2s + timeout: 3s + retries: 30 + + postgres: + image: postgres:16-alpine + environment: + POSTGRES_USER: opal + POSTGRES_PASSWORD: opal + POSTGRES_DB: opal + # not published to the host: only opal_server reaches it over the compose + # network, and bounce_postgres() uses `docker compose stop/start`. Publishing + # 5432 would collide with any Postgres already running on the host. + healthcheck: + test: ["CMD-SHELL", "pg_isready -U opal"] + interval: 2s + timeout: 3s + retries: 30 + + gitea: + image: gitea/gitea:1.21 + environment: + GITEA__security__INSTALL_LOCK: "true" + GITEA__server__ROOT_URL: "http://gitea:3000/" + GITEA__database__DB_TYPE: "sqlite3" + # published on 13000 (not 3000) for the host-side GiteaAdmin helper; the + # uncommon port avoids the usual :3000 clash. opal_server and the seed + # sidecar still reach it over the compose network via http://gitea:3000. + ports: + - "13000:3000" + volumes: + - gitea-data:/data + healthcheck: + test: ["CMD-SHELL", "wget -qO- http://localhost:3000/api/v1/version || exit 1"] + interval: 3s + timeout: 5s + retries: 40 + + gitea-admin: + # creates the admin user once gitea is healthy + image: gitea/gitea:1.21 + depends_on: + gitea: + condition: service_healthy + user: git + entrypoint: ["/bin/sh", "-c"] + # Tolerate only the idempotent "already exists" case; any other failure + # must abort so `seed` (which depends on this completing) doesn't run + # against a Gitea with no admin user and fail with a confusing 401. + command: + - > + out=$$(gitea admin user create --username opaladmin --password opaladmin + --email admin@example.com --admin --must-change-password=false + --config /data/gitea/conf/app.ini 2>&1); rc=$$?; + echo "$$out"; + if [ $$rc -ne 0 ] && ! echo "$$out" | grep -qi "already exist"; then + exit $$rc; + fi + volumes: + - gitea-data:/data + restart: "no" + + blackhole: + # Accepts the TCP handshake then never answers — a clone connects and + # blocks reading the git smart-HTTP response, holding the fetch executor. + # Deterministic, unlike a TEST-NET-1 address which many networks reject + # fast with ICMP-unreachable (so the clone would fail fast, not hang). + image: alpine/socat:1.8.0.3 + command: ["TCP-LISTEN:80,fork,reuseaddr", "SYSTEM:sleep 3600"] + + seed: + build: ./seed + depends_on: + gitea: + condition: service_healthy + gitea-admin: + condition: service_completed_successfully + environment: + GITEA_URL: "http://gitea:3000" + GITEA_ADMIN_USER: "opaladmin" + GITEA_ADMIN_PASSWORD: "opaladmin" + REPO_COUNT: "${REPO_COUNT:-50}" + volumes: + - seed-output:/seed-output + restart: "no" + + opal_server: + build: + context: ../.. + dockerfile: docker/Dockerfile + target: server + environment: + # Single worker on purpose: the GitPolicyFetcher caches read by + # /internal/git-fetcher-cache-stats are per-process, so with >1 worker a + # round-robin read can miss the worker that fetched and make a `== 0` + # drain assertion pass falsely. One worker makes every cache read + # deterministic. The leak/boot/offline bugs all reproduce single-worker. + UVICORN_NUM_WORKERS: "1" + OPAL_SCOPES: "1" + OPAL_REDIS_URL: "redis://redis:6379" + OPAL_BROADCAST_URI: "postgres://opal:opal@postgres:5432/opal" + OPAL_BASE_DIR: "/opal" + OPAL_POLICY_REFRESH_INTERVAL: "0" + OPAL_DEBUG_INTERNAL_STATS: "1" + # OPAL_AUTH_PUBLIC_KEY is intentionally left unset: with no public key the + # JWT verifier is disabled, so the harness can call scope routes without + # minting JWTs. Local test bed only; never a production setting. + OPAL_LOG_FORMAT_INCLUDE_PID: "true" + ports: + - "7002:7002" + depends_on: + redis: + condition: service_healthy + postgres: + condition: service_healthy + +volumes: + gitea-data: + seed-output: diff --git a/app-tests/git-leak/helpers.py b/app-tests/git-leak/helpers.py new file mode 100644 index 000000000..452d16b99 --- /dev/null +++ b/app-tests/git-leak/helpers.py @@ -0,0 +1,312 @@ +"""HTTP + infra helpers for the git-leak test bed.""" +import subprocess +import time +from pathlib import Path +from typing import Dict, List + +import requests + +OPAL_URL = "http://localhost:7002" +# reachable from inside the opal_server container (compose network) +GITEA_INTERNAL_URL = "http://gitea:3000" +# reachable from the host-side test harness (published port, see docker-compose.yml) +GITEA_HOST_URL = "http://localhost:13000" +GITEA_USER = "opaladmin" +GITEA_PASSWORD = "opaladmin" + +# the `blackhole` compose service (alpine/socat) accepts the TCP handshake then +# never answers, so a clone connects and blocks reading the response — a +# deterministic hang. Reachable from the opal_server container on the compose +# network. (A TEST-NET-1 address was rejected too fast on many networks, so the +# clone failed fast instead of hanging and the offline scenario wasn't exercised.) +UNREACHABLE_HOST = "blackhole" + +# the compose project lives next to this file; compose() runs from here +_COMPOSE_DIR = str(Path(__file__).resolve().parent) + + +class OpalServerClient: + def __init__(self, base_url: str = OPAL_URL): + self.base_url = base_url.rstrip("/") + # scope_ids created via put_scope, so a per-test fixture can delete them + # on teardown. Clone paths are keyed by repo URL (not scope_id), so a + # scope left behind by one test shares a GitPolicyFetcher cache entry + # with any other test pointing at the same seeded repo — without cleanup + # that leftover keeps the entry alive and pollutes a drain assertion. + self._created_scopes: set = set() + + def wait_healthy(self, timeout: int = 180) -> None: + deadline = time.time() + timeout + last = None + while time.time() < deadline: + try: + if ( + requests.get(f"{self.base_url}/healthcheck", timeout=5).status_code + == 200 + ): + return + except requests.RequestException as exc: + last = exc + time.sleep(2) + raise RuntimeError(f"opal-server not healthy in {timeout}s (last: {last})") + + def stats(self, samples: int = 3, interval: float = 0.1) -> Dict[str, int]: + """Read the git-fetcher cache stats, merged across a few reads. + + The stack runs a single uvicorn worker (see docker-compose.yml), so the + per-process ``GitPolicyFetcher`` caches are read deterministically — a + read can't miss the worker that fetched. Sampling a few times and taking + the ``max`` per key only smooths over a read that races an in-flight + mutation; it is not relied on to paper over multi-worker nondeterminism + (which the single-worker setup removes outright). + """ + merged: Dict[str, int] = {} + for i in range(max(1, samples)): + resp = requests.get( + f"{self.base_url}/internal/git-fetcher-cache-stats", timeout=10 + ) + resp.raise_for_status() + for key, value in resp.json().items(): + merged[key] = max(merged.get(key, 0), value) + if i < samples - 1: + time.sleep(interval) + return merged + + def put_scope(self, scope_id: str, repo_url: str, branch: str = "main") -> None: + body = { + "scope_id": scope_id, + "policy": { + "source_type": "git", + "url": repo_url, + "auth": {"auth_type": "none"}, + "branch": branch, + "directories": ["."], + "extensions": [".rego", ".json"], + "manifest": ".manifest", + "poll_updates": False, + }, + "data": {"entries": []}, + } + # the scope router mounts at prefix="/scopes" with @router.put("") + resp = requests.put(f"{self.base_url}/scopes", json=body, timeout=30) + resp.raise_for_status() + self._created_scopes.add(scope_id) + + def delete_scope(self, scope_id: str) -> None: + resp = requests.delete(f"{self.base_url}/scopes/{scope_id}", timeout=30) + if resp.status_code not in (200, 204, 404): + resp.raise_for_status() + self._created_scopes.discard(scope_id) + + def list_scope_ids(self) -> List[str]: + """All scope ids the server currently knows (GET /scopes).""" + resp = requests.get(f"{self.base_url}/scopes", timeout=30) + resp.raise_for_status() + return [s["scope_id"] for s in resp.json()] + + def hard_reset(self, timeout: int = 600) -> None: + """Recover the server from a saturated fetch executor by wiping state. + + When a test leaves many clones hung (the offline-repo test saturates the + executor on purpose), per-scope DELETEs would queue *behind* those hung + threads, and a plain restart would have ``preload_scopes`` re-clone the + offline scopes and saturate again. Instead: stop the server (killing the + hung threads), flush the Redis scope store so nothing is re-cloned, then + start clean. Used in that test's teardown so the session-scoped stack is + usable by every later test. + """ + compose("stop", "opal_server") + try: + compose("exec", "-T", "redis", "redis-cli", "FLUSHALL") + finally: + # Always bring the server back up, even if the flush failed: leaving + # it stopped would fail every later session-scoped test, and since + # this runs in a test's `finally` it would also mask the real result. + compose("start", "opal_server") + self._created_scopes.clear() + self.wait_healthy(timeout=timeout) + + def delete_all_scopes(self, drain_timeout: int = 20) -> None: + """Delete every scope the *server* knows (not just this client's), then + best-effort wait for the caches to drain — a clean slate independent of + what any prior, possibly-failed, test left behind. + + Best-effort drain by design: on master, delete never purges the caches + (the leak this suite gates), so the wait simply times out. This runs in + fixture setup/teardown, so a failure here must not mask the test, hence + the broad excepts and bounded wait. + """ + try: + for scope_id in self.list_scope_ids(): + try: + self.delete_scope(scope_id) + except Exception: + self._created_scopes.discard(scope_id) + except Exception: + pass + self._created_scopes.clear() + deadline = time.time() + drain_timeout + while time.time() < deadline: + try: + # Single snapshot: we're waiting for zero, so the peak-merge + # (max over samples) would only delay observing the drain. + if self.stats(samples=1)["repo_locks"] == 0: + return + except Exception: + # A transient stats-read failure is not proof of a drain — keep + # polling until the deadline rather than returning early, which + # would let a not-yet-drained cache leak into the next test. + pass + time.sleep(1) + + def get_scope_policy(self, scope_id: str) -> requests.Response: + """Fetch a scope's policy bundle (GET /scopes/{id}/policy). + + A 200 here proves the scope's repo was cloned and is being + served — the signal that a healthy scope still works while + another scope's clone is hanging. + """ + return requests.get(f"{self.base_url}/scopes/{scope_id}/policy", timeout=30) + + def refresh_all(self) -> None: + # POST /scopes/refresh publishes on the webhook topic so the leader + # re-syncs all scopes. The second sync takes the discover/fetch path + # (not the first-sync clone path), which is what populates the `repos` + # and `repos_last_fetched` caches. A missing route is a real error and + # is surfaced via raise_for_status. + resp = requests.post(f"{self.base_url}/scopes/refresh", timeout=30) + resp.raise_for_status() + + +class GiteaAdmin: + """Host-side admin client for the test bed's Gitea. + + The ``seed`` sidecar does the bulk repo creation from inside the compose + network; this class lets a test inspect or mutate Gitea repos directly + from the host (e.g. assert seeding happened, or add/remove a single repo + for a specific scenario). It authenticates with the admin user that the + ``gitea-admin`` sidecar created, over the published host port. + """ + + def __init__( + self, + base_url: str = GITEA_HOST_URL, + user: str = GITEA_USER, + password: str = GITEA_PASSWORD, + ): + self.base_url = base_url.rstrip("/") + self._user = user + self._auth = (user, password) + + def repo_exists(self, name: str) -> bool: + resp = requests.get( + f"{self.base_url}/api/v1/repos/{self._user}/{name}", + auth=self._auth, + timeout=10, + ) + return resp.status_code == 200 + + def list_repos(self) -> List[str]: + names: List[str] = [] + page = 1 + while True: + resp = requests.get( + f"{self.base_url}/api/v1/users/{self._user}/repos", + params={"page": page, "limit": 50}, + auth=self._auth, + timeout=10, + ) + resp.raise_for_status() + batch = resp.json() + if not batch: + break + names.extend(r["name"] for r in batch) + page += 1 + return names + + def create_repo(self, name: str) -> None: + if self.repo_exists(name): + return + resp = requests.post( + f"{self.base_url}/api/v1/user/repos", + json={"name": name, "private": False, "auto_init": True}, + auth=self._auth, + timeout=10, + ) + resp.raise_for_status() + + def delete_repo(self, name: str) -> None: + resp = requests.delete( + f"{self.base_url}/api/v1/repos/{self._user}/{name}", + auth=self._auth, + timeout=10, + ) + if resp.status_code not in (204, 404): + resp.raise_for_status() + + +def gitea_repo_url(name: str) -> str: + # url reachable from inside the opal_server container + return f"{GITEA_INTERNAL_URL}/{GITEA_USER}/{name}.git" + + +def make_repo_unreachable(name: str) -> str: + """Return a git URL for ``name`` pointing at the ``blackhole`` sidecar. + + Simulates an offline/unreachable policy repo: ``blackhole`` (alpine/socat) + accepts the TCP handshake then never answers, so the clone connects and + blocks reading the git smart-HTTP response — a deterministic hang that + exercises the missing fetch timeout on the scopes path (the bug PR3 fixes). + The URL keeps the same ``/{user}/{name}.git`` shape as a real Gitea repo so + the scope looks ordinary apart from the unreachable host. + """ + return f"http://{UNREACHABLE_HOST}/{GITEA_USER}/{name}.git" + + +def compose(*args: str) -> subprocess.CompletedProcess: + """Run `docker compose `; on failure, surface the captured output. + + `capture_output=True` keeps compose noise out of passing tests, but + a raw CalledProcessError shows only the exit code — so on failure we + re-raise with the captured stdout/stderr embedded, otherwise a + broken build/seed/ restart is opaque to debug. + """ + proc = subprocess.run( + ["docker", "compose", *args], + cwd=_COMPOSE_DIR, + capture_output=True, + text=True, + ) + if proc.returncode != 0: + raise RuntimeError( + f"`docker compose {' '.join(args)}` failed (exit {proc.returncode})\n" + f"--- stdout ---\n{proc.stdout}\n--- stderr ---\n{proc.stderr}" + ) + return proc + + +def bounce_postgres(down_seconds: int = 5) -> None: + compose("stop", "postgres") + time.sleep(down_seconds) + # `up -d --wait` blocks until Postgres passes its healthcheck again (plain + # `compose start` has no --wait), so a recovery poll that follows isn't + # racing an unready broadcaster. --no-recreate keeps the same container. + compose("up", "-d", "--wait", "--no-recreate", "postgres") + + +def list_seeded_repos(count: int) -> List[str]: + return [f"policy-repo-{i:04d}" for i in range(count)] + + +# A reserved repo seeded *outside* the numeric ``policy-repo-NNNN`` range that +# ``list_seeded_repos`` enumerates, so no boot/leak test ever clones it. The +# resilience offline-hang test uses it as its "healthy" probe: clones live at +# ``base_dir/`` keyed by URL-hash and survive ``compose +# restart/stop/start`` (opal_server mounts no volume at ``/opal``; only +# ``down -v`` wipes them), so pointing the probe at any shared seeded repo would +# let the healthy scope reuse an on-disk clone and serve 200 *without* touching +# the saturated fetch executor — false-passing a gate that must FAIL on this +# branch. A dedicated never-cloned repo forces a genuine fresh clone through the +# starved executor. Keep this name in sync with ``RESERVED_REPOS`` in +# ``seed/seed_gitea.py``. +HEALTHY_PROBE_REPO = "policy-repo-healthy-probe" diff --git a/app-tests/git-leak/seed/Dockerfile b/app-tests/git-leak/seed/Dockerfile new file mode 100644 index 000000000..540118cd2 --- /dev/null +++ b/app-tests/git-leak/seed/Dockerfile @@ -0,0 +1,6 @@ +FROM python:3.11-slim +RUN apt-get update && apt-get install -y --no-install-recommends git \ + && rm -rf /var/lib/apt/lists/* +RUN pip install --no-cache-dir requests==2.32.3 GitPython==3.1.50 +COPY seed_gitea.py /seed_gitea.py +ENTRYPOINT ["python", "/seed_gitea.py"] diff --git a/app-tests/git-leak/seed/seed_gitea.py b/app-tests/git-leak/seed/seed_gitea.py new file mode 100644 index 000000000..7e1de1a4c --- /dev/null +++ b/app-tests/git-leak/seed/seed_gitea.py @@ -0,0 +1,165 @@ +"""Seed a Gitea instance with N policy repos for the OPAL git-leak test bed. + +Idempotent: re-running creates only the missing repos. Each repo gets a +single commit containing a minimal OPA policy tree. + +Env: + GITEA_URL e.g. http://gitea:3000 + GITEA_ADMIN_USER admin username (created out-of-band by compose) + GITEA_ADMIN_PASSWORD admin password + REPO_COUNT how many repos to ensure exist (default 50) +""" +import os +import sys +import time +from pathlib import Path + +import requests +from git import Actor, Repo + +POLICY_REGO = """package example + +default allow = false + +allow { + input.user == "admin" +} +""" + +DATA_JSON = '{"roles": {"admin": ["read", "write"]}}\n' + +# Reserved repos seeded in addition to the numeric ``policy-repo-NNNN`` set. +# These sit outside the range the boot/leak tests enumerate (so no test ever +# clones them) and back the resilience offline-hang test's "healthy" probe, +# which must force a fresh clone through the saturated executor rather than +# reuse a surviving on-disk clone. Keep in sync with ``HEALTHY_PROBE_REPO`` in +# ``helpers.py``. +RESERVED_REPOS = ("policy-repo-healthy-probe",) + + +def _wait_for_gitea(base_url: str, timeout: int = 120) -> None: + deadline = time.time() + timeout + while time.time() < deadline: + try: + if requests.get(f"{base_url}/api/v1/version", timeout=5).status_code == 200: + return + except requests.RequestException: + pass + time.sleep(2) + raise RuntimeError(f"Gitea not reachable at {base_url} within {timeout}s") + + +def _ensure_token(base_url: str, user: str, password: str) -> str: + name = "seed-token" + resp = requests.post( + f"{base_url}/api/v1/users/{user}/tokens", + auth=(user, password), + json={"name": name, "scopes": ["write:repository", "write:user"]}, + timeout=10, + ) + if resp.status_code == 201: + return resp.json()["sha1"] + # token already exists -> delete then recreate (Gitea won't reveal an existing secret) + requests.delete( + f"{base_url}/api/v1/users/{user}/tokens/{name}", + auth=(user, password), + timeout=10, + ) + resp = requests.post( + f"{base_url}/api/v1/users/{user}/tokens", + auth=(user, password), + json={"name": name, "scopes": ["write:repository", "write:user"]}, + timeout=10, + ) + resp.raise_for_status() + return resp.json()["sha1"] + + +def _ensure_repo(base_url: str, token: str, user: str, name: str) -> None: + headers = {"Authorization": f"token {token}"} + exists = requests.get( + f"{base_url}/api/v1/repos/{user}/{name}", headers=headers, timeout=10 + ) + if exists.status_code == 200: + return + created = requests.post( + f"{base_url}/api/v1/user/repos", + headers=headers, + json={"name": name, "private": False, "auto_init": False}, + timeout=10, + ) + created.raise_for_status() + + +def _push_policy( + base_url: str, token: str, user: str, name: str, workdir: Path +) -> None: + repo_dir = workdir / name + repo_dir.mkdir(parents=True, exist_ok=True) + (repo_dir / "example.rego").write_text(POLICY_REGO) + (repo_dir / "data.json").write_text(DATA_JSON) + + repo = Repo.init(repo_dir, initial_branch="main") + repo.index.add(["example.rego", "data.json"]) + author = Actor("seed", "seed@example.com") + repo.index.commit("seed policy", author=author, committer=author) + + push_url = ( + base_url.replace("http://", f"http://{user}:{token}@") + f"/{user}/{name}.git" + ) + origin = repo.create_remote("origin", push_url) + origin.push(refspec="main:main") + + +def main() -> int: + base_url = os.environ["GITEA_URL"].rstrip("/") + user = os.environ["GITEA_ADMIN_USER"] + password = os.environ["GITEA_ADMIN_PASSWORD"] + count = int(os.environ.get("REPO_COUNT", "50")) + + _wait_for_gitea(base_url) + token = _ensure_token(base_url, user, password) + + workdir = Path("/tmp/seed-work") + failures = [] + # the numeric set the boot/leak tests enumerate, plus the reserved repos + # (e.g. the resilience offline-hang test's never-cloned healthy probe) + names = [f"policy-repo-{i:04d}" for i in range(count)] + list(RESERVED_REPOS) + for name in names: + # Isolate per-repo failures: one bad push must not abort the loop and + # leave an indeterminate subset seeded. Collect failures and exit + # non-zero with a count so the harness sees a real seed error (and + # `docker compose wait seed` surfaces it) instead of a later, confusing + # load-gate timeout. + try: + _ensure_repo(base_url, token, user, name) + # only push if the repo is empty (freshly created) + head = requests.get( + f"{base_url}/api/v1/repos/{user}/{name}/branches/main", + headers={"Authorization": f"token {token}"}, + timeout=10, + ) + if head.status_code != 200: + _push_policy(base_url, token, user, name, workdir) + print(f"seeded {name}", flush=True) + except Exception as exc: # noqa: BLE001 - report, don't abort the loop + failures.append((name, repr(exc))) + print(f"FAILED {name}: {exc!r}", flush=True) + + total = len(names) + if failures: + print( + f"ERROR: seeded {total - len(failures)}/{total} repos; " + f"{len(failures)} failed (e.g. {failures[:3]})", + flush=True, + ) + return 1 + + # write the token where the test harness can read it + Path("/seed-output/token").write_text(token) + print(f"DONE: ensured {total} repos", flush=True) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/app-tests/git-leak/test_boot.py b/app-tests/git-leak/test_boot.py new file mode 100644 index 000000000..b28637ec1 --- /dev/null +++ b/app-tests/git-leak/test_boot.py @@ -0,0 +1,42 @@ +import os +import time + +import pytest +from helpers import compose, gitea_repo_url, list_seeded_repos + + +@pytest.mark.timeout(2400) +def test_boot_loads_all_scopes(opal, repo_count): + """Measure how long a fresh boot takes to load all scope repos. + + On master this is serial and slow (the ~20-min problem at scale). + PR4 tightens BOOT_TARGET_SECONDS to assert the parallel speedup. + """ + n = repo_count + repos = list_seeded_repos(n) + for i, name in enumerate(repos): + opal.put_scope(f"boot-{i}", gitea_repo_url(name)) + + # Start the clock at the restart, not after wait_healthy: preload_scopes() + # runs in gunicorn's `when_ready` (before workers accept traffic), so by the + # time /healthcheck answers, boot-sync may already be partly done — starting + # the clock later would undercount it. Measuring from the restart captures + # the whole boot-sync window. + start = time.time() + compose("restart", "opal_server") + opal.wait_healthy(timeout=600) + + deadline = start + 2000 + loaded = 0 + while time.time() < deadline: + loaded = opal.stats()["repo_locks"] + if loaded >= n: + break + time.sleep(2) + elapsed = time.time() - start + + # PR1 records the baseline (loose). PR4 will set BOOT_TARGET_SECONDS low. + BOOT_TARGET_SECONDS = int(os.environ.get("BOOT_TARGET_SECONDS", "2000")) + print(f"boot loaded {n} scopes in {elapsed:.1f}s (target {BOOT_TARGET_SECONDS}s)") + assert loaded >= n, "not all scopes loaded after boot" + assert elapsed < BOOT_TARGET_SECONDS, f"boot too slow: {elapsed:.1f}s" diff --git a/app-tests/git-leak/test_leak.py b/app-tests/git-leak/test_leak.py new file mode 100644 index 000000000..09fe9034d --- /dev/null +++ b/app-tests/git-leak/test_leak.py @@ -0,0 +1,114 @@ +import time + +import pytest +from helpers import gitea_repo_url, list_seeded_repos + + +def _wait_until(predicate, timeout=30, interval=0.5): + deadline = time.time() + timeout + while time.time() < deadline: + if predicate(): + return True + time.sleep(interval) + return False + + +def _load_scopes(opal, prefix, names): + """PUT a scope per repo, then force a second sync so all three caches fill. + + The first sync of a fresh scope takes the clone path, which populates only + ``repo_locks`` — ``repos`` and ``repos_last_fetched`` are filled solely by + the discover/fetch path on a *subsequent* sync. ``refresh_all()`` triggers + that second sync, so after this returns all three caches reflect the N + scopes and the drain assertions test a cache the sync path actually fills. + + Returns the per-key load count reached (max over a wait). + """ + n = len(names) + for i, name in enumerate(names): + opal.put_scope(f"{prefix}-{i}", gitea_repo_url(name)) + # repo_locks is populated on the first sync (the clone path), so it is the + # deterministic signal that every scope was at least picked up. + locked = _wait_until(lambda: opal.stats()["repo_locks"] >= n, timeout=600) + assert locked, f"initial load never locked {n} repos: {opal.stats()}" + # force the discover/fetch path so `repos` / `repos_last_fetched` fill too + opal.refresh_all() + fetched = _wait_until(lambda: opal.stats()["repos"] >= n, timeout=600) + assert fetched, f"refresh never populated {n} repos: {opal.stats()}" + return opal.stats() + + +@pytest.mark.timeout(900) +def test_churn_releases_caches(opal, repo_count): + """Create then delete many scopes; the three caches must return to empty. + + FAILS on this branch (without PR2): delete_scope never purges the + GitPolicyFetcher caches, so they stay populated after every scope is + gone. Becomes green once PR2 lands. + """ + n = min(repo_count, 100) + repos = list_seeded_repos(n) + loaded = _load_scopes(opal, "churn", repos) + assert loaded["repo_locks"] >= n and loaded["repos"] >= n, loaded + rss_loaded = loaded["rss_kb"] + + for i in range(n): + opal.delete_scope(f"churn-{i}") + + # all three caches must drain to empty once every scope is deleted. Read a + # single stats snapshot per poll so the three keys reflect the same + # observation (and to avoid 3x the HTTP round-trips per iteration). + def _all_caches_empty() -> bool: + s = opal.stats(samples=1) + return s["repo_locks"] == 0 and s["repos"] == 0 and s["repos_last_fetched"] == 0 + + released = _wait_until(_all_caches_empty, timeout=60) + stats = opal.stats() + assert released, f"caches did not drain after deleting all scopes: {stats}" + + # The cache drain above is the gate. RSS is only a loose backstop here: + # freeing the caches need not return memory to the OS (glibc/Python keep + # arenas), so this guards against a *gross* leak — RSS ballooning well past + # the loaded peak — without false-failing on allocator slack once PR2 lands. + rss_budget = rss_loaded + max(100_000, rss_loaded // 2) + assert stats["rss_kb"] <= rss_budget, ( + f"RSS ballooned across churn: {rss_loaded} -> {stats['rss_kb']} kb " + f"(budget {rss_budget})" + ) + + +@pytest.mark.timeout(900) +def test_repeat_sync_does_not_grow(opal, repo_count): + """Re-syncing the *same* scopes must not grow memory unboundedly. + + Idempotency guard, not a known-broken case — PASSES on this branch. Clone + paths are keyed by the repo URL (``source_id`` = sha256(url)+branch-shard), + so re-syncing identical scopes reuses the existing ``repos`` / + ``repos_last_fetched`` entries rather than allocating new ones; the cache + *counts* therefore can't grow for any implementation, which is why the + load-bearing assertion here is on **RSS**, not on ``len(repos)``. It guards + against a regression where each repeat sync leaks per-sync allocations. The + unbounded growth-then-no-purge-on-delete leak is covered by + ``test_churn_releases_caches`` above, which uses distinct scopes. + """ + n = min(repo_count, 50) + repos = list_seeded_repos(n) + loaded = _load_scopes(opal, "stable", repos) + baseline_repos = loaded["repos"] + baseline_rss = loaded["rss_kb"] + + for _ in range(10): + opal.refresh_all() + time.sleep(2) + + grown = opal.stats() + assert ( + grown["repos"] <= baseline_repos + ), f"repos cache count grew on repeat sync: {baseline_repos} -> {grown['repos']}" + # allow generous headroom for allocator slack; fail only on a real per-sync + # leak (10 refreshes of N scopes ballooning RSS). + rss_budget = baseline_rss + max(50_000, baseline_rss // 5) + assert grown["rss_kb"] <= rss_budget, ( + f"RSS grew unboundedly on repeat sync: " + f"{baseline_rss} -> {grown['rss_kb']} kb (budget {rss_budget})" + ) diff --git a/app-tests/git-leak/test_resilience.py b/app-tests/git-leak/test_resilience.py new file mode 100644 index 000000000..04702448b --- /dev/null +++ b/app-tests/git-leak/test_resilience.py @@ -0,0 +1,129 @@ +import time + +import pytest +import requests +from helpers import ( + HEALTHY_PROBE_REPO, + bounce_postgres, + gitea_repo_url, + list_seeded_repos, + make_repo_unreachable, +) + +# Enough hanging clones to exhaust opal's default fetch executor +# (run_sync -> run_in_executor(None, ...), a ThreadPoolExecutor of +# min(32, cpu+4) workers). One hang wouldn't starve a multi-thread pool, so we +# saturate it with many; after PR3's fetch timeout these give up and free their +# threads, letting the healthy scope through. +OFFLINE_REPOS = 40 + + +@pytest.mark.timeout(420) +def test_offline_repo_does_not_block_healthy_scopes(opal, repo_count): + """Unreachable repos must not stop a healthy scope from serving. + + FAILS on this branch (without PR3): the scopes path has no fetch + timeout, so the hung clones of the offline repos occupy the shared + fetch executor and the healthy scope's bundle never becomes + available. + """ + # the `blackhole` sidecar accepts the TCP handshake then never answers, so + # each of these clones hangs (holding a fetch-executor thread) rather than + # failing fast; enough of them saturate the pool. + for i in range(OFFLINE_REPOS): + opal.put_scope( + f"offline-{i}", make_repo_unreachable(f"dead-{i}"), branch="main" + ) + + # Point the healthy scope at a repo *no other test clones* (HEALTHY_PROBE_REPO + # is seeded outside the numeric range the boot/leak tests enumerate). Clones + # survive compose restart/stop/start, so reusing a shared seeded repo here + # would let this scope hit the existing on-disk clone via _discover_repository, + # skip _clone(), and serve 200 without ever touching the saturated executor — + # false-passing this gate. A never-cloned repo forces a real clone through the + # starved pool, so the gate fails correctly on this branch (no PR3 timeout). + opal.put_scope("healthy", gitea_repo_url(HEALTHY_PROBE_REPO)) + + try: + # The healthy scope must become *servable* within a bounded time even + # while the offline scopes hang. A 200 from its policy bundle proves the + # clone completed and the scope is served — a stronger signal than a + # cache count, and exactly what the offline hang starves on master. + # + # A 200 here can't be a *masked* default bundle: GET /{scope}/policy + # falls back to the "default" scope on a bad/missing repo, but this bed + # never creates a "default" scope, so that fallback raises instead of + # returning 200. The only way to get 200 is the healthy clone completing. + deadline = time.time() + 90 + served = False + last = None + while time.time() < deadline: + try: + resp = opal.get_scope_policy("healthy") + last = resp.status_code + if resp.status_code == 200: + served = True + break + except requests.RequestException as exc: # may stall when starved + last = repr(exc) + time.sleep(2) + assert served, ( + f"healthy scope never served while {OFFLINE_REPOS} offline repos " + f"were hanging (last policy response: {last})" + ) + finally: + # The offline clones hang for the blackhole's full duration, occupying + # executor threads. On the session-scoped stack that would starve every + # later test, and per-scope DELETEs would queue behind the hung threads. + # hard_reset stops the server (killing the hung threads), flushes the + # Redis scope store so preload doesn't re-clone them, and restarts clean. + opal.hard_reset() + + +@pytest.mark.timeout(300) +def test_server_recovers_after_postgres_bounce(opal, repo_count): + """A transient Postgres (broadcaster) outage must not break propagation. + + Recovery guard, not a known-broken case — PASSES on this branch: when the + broadcast channel drops, the affected worker triggers a graceful shutdown + and gunicorn respawns it, and once Postgres is back the broadcaster + reconnects. We prove recovery of the *broadcast path*, not just HTTP + liveness: after the bounce we PUT a fresh scope and assert it actually + syncs (its repo lands in the cache), which only happens if the leader + received the sync notification over the recovered broadcaster. + (PER-15065's in-process reconnect would make recovery cleaner by avoiding + the worker churn, but recovery already holds.) + """ + baseline = opal.stats() # healthy before + assert baseline + baseline_locks = baseline["repo_locks"] + + bounce_postgres(down_seconds=5) + + # wait for the HTTP surface to come back first + deadline = time.time() + 90 + recovered = False + while time.time() < deadline: + try: + opal.wait_healthy(timeout=5) + opal.stats() + recovered = True + break + except (requests.RequestException, RuntimeError): + # RequestException: HTTP not back yet; RuntimeError: wait_healthy timed out + time.sleep(2) + assert recovered, "server did not recover HTTP within 90s of a postgres bounce" + + # prove the broadcast path itself recovered: a freshly PUT scope must sync + healthy = list_seeded_repos(1)[0] + opal.put_scope("post-bounce", gitea_repo_url(healthy)) + synced = False + deadline = time.time() + 120 + while time.time() < deadline: + if opal.stats()["repo_locks"] > baseline_locks: + synced = True + break + time.sleep(2) + assert ( + synced + ), "scope PUT after the bounce never synced; broadcaster did not recover" diff --git a/packages/opal-server/opal_server/config.py b/packages/opal-server/opal_server/config.py index 58341ff37..766f674b4 100644 --- a/packages/opal-server/opal_server/config.py +++ b/packages/opal-server/opal_server/config.py @@ -193,6 +193,20 @@ class OpalServerConfig(Confi): 0, description="The timeout for cloning the policy repository (0 means wait forever)", ) + SCOPES_GIT_FETCH_TIMEOUT = confi.float( + "SCOPES_GIT_FETCH_TIMEOUT", + 120.0, + description="Hard timeout in seconds for a single scope git clone/fetch. " + "On timeout the operation is abandoned and the scope is marked failed so " + "one unreachable repo can never block boot or other scopes (0 = no timeout).", + ) + SCOPES_GIT_MAX_WORKERS = confi.int( + "SCOPES_GIT_MAX_WORKERS", + 10, + description="Size of the dedicated thread pool for scope git operations. " + "Isolating git work keeps a hung fetch from starving bundle serving and " + "other server work that uses the default executor.", + ) LEADER_LOCK_FILE_PATH = confi.str( "LEADER_LOCK_FILE_PATH", "/tmp/opal_server_leader.lock", @@ -404,6 +418,15 @@ class OpalServerConfig(Confi): description="Set if OPAL server should enable tracing with datadog APM", ) + DEBUG_INTERNAL_STATS = confi.bool( + "DEBUG_INTERNAL_STATS", + False, + description=( + "Expose GET /internal/git-fetcher-cache-stats with in-memory cache " + "sizes and process RSS. For diagnostics/tests only; keep off in production." + ), + ) + SCOPES = confi.bool("SCOPES", default=False, description="Enable scopes") SCOPES_REPO_CLONES_SHARDS = confi.int( @@ -412,6 +435,14 @@ class OpalServerConfig(Confi): description="The max number of local clones to use for the same repo (reused across scopes)", ) + SCOPES_SYNC_CONCURRENCY = confi.int( + "SCOPES_SYNC_CONCURRENCY", + 10, + description="Maximum number of scope git repositories fetched concurrently " + "during sync_scopes (boot and periodic). Higher is faster but uses more " + "memory, file descriptors, and network connections at once.", + ) + REDIS_URL = confi.str( "REDIS_URL", default="redis://localhost", diff --git a/packages/opal-server/opal_server/debug_stats.py b/packages/opal-server/opal_server/debug_stats.py new file mode 100644 index 000000000..5439ce950 --- /dev/null +++ b/packages/opal-server/opal_server/debug_stats.py @@ -0,0 +1,61 @@ +"""Read-only introspection of the git-fetcher in-memory caches. + +Used only by the off-by-default /internal stats endpoint so tests can +observe the cache growth that the memory-leak fix (PR2) eliminates. +""" +from pathlib import Path +from typing import Dict, List, Optional + +from fastapi import FastAPI, params +from opal_server.git_fetcher import GitPolicyFetcher + + +def _read_rss_kb() -> int: + """Resident set size of this process in kilobytes (Linux), else 0.""" + try: + for line in Path("/proc/self/status").read_text().splitlines(): + if line.startswith("VmRSS:"): + return int(line.split()[1]) + except (OSError, ValueError, IndexError): + return 0 + return 0 + + +def git_fetcher_cache_stats() -> Dict[str, int]: + """Sizes of the three process-global GitPolicyFetcher caches + RSS.""" + return { + "repo_locks": len(GitPolicyFetcher.repo_locks), + "repos": len(GitPolicyFetcher.repos), + "repos_last_fetched": len(GitPolicyFetcher.repos_last_fetched), + "rss_kb": _read_rss_kb(), + } + + +def register_internal_stats_route( + app: FastAPI, + enabled: bool, + dependencies: Optional[List[params.Depends]] = None, +) -> None: + """Mount GET /internal/git-fetcher-cache-stats only when enabled. + + ``dependencies`` are applied to the route (e.g. the server's + ``JWTAuthenticator``) so the endpoint is protected when JWT verification + is enabled. When verification is disabled — as in the test bed, which + leaves ``OPAL_AUTH_PUBLIC_KEY`` unset — the authenticator is a no-op and + the route stays reachable without a token. + """ + if not enabled: + return + + # Deliberately a sync def: Starlette runs it in its own threadpool, which is + # independent of the default loop executor opal uses for git fetches + # (run_sync -> run_in_executor(None, ...)). So this endpoint keeps answering + # even when hung clones saturate the fetch executor — which is exactly the + # condition the offline-repo test observes through it. + @app.get( + "/internal/git-fetcher-cache-stats", + include_in_schema=False, + dependencies=dependencies or [], + ) + def _git_fetcher_cache_stats() -> Dict[str, int]: + return git_fetcher_cache_stats() diff --git a/packages/opal-server/opal_server/git_fetcher.py b/packages/opal-server/opal_server/git_fetcher.py index e52083c98..a6c971c4c 100644 --- a/packages/opal-server/opal_server/git_fetcher.py +++ b/packages/opal-server/opal_server/git_fetcher.py @@ -4,6 +4,8 @@ import hashlib import shutil import time +from concurrent.futures import ThreadPoolExecutor +from functools import partial from pathlib import Path from typing import Optional, cast @@ -11,7 +13,6 @@ import pygit2 from ddtrace import tracer from git import Repo -from opal_common.async_utils import run_sync from opal_common.git_utils.bundle_maker import BundleMaker from opal_common.logger import logger from opal_common.schemas.policy import PolicyBundle @@ -33,6 +34,45 @@ reference_is_valid_name, ) +_git_executor: Optional[ThreadPoolExecutor] = None + + +def _get_git_executor() -> ThreadPoolExecutor: + """Lazily build the dedicated pool for scope git operations. + + Isolated from the default executor so a hung clone/fetch can never + starve bundle serving or other server work. + """ + global _git_executor + if _git_executor is None: + _git_executor = ThreadPoolExecutor( + max_workers=opal_server_config.SCOPES_GIT_MAX_WORKERS, + thread_name_prefix="opal-git", + ) + return _git_executor + + +async def run_in_git_executor(func, *args, timeout: float, **kwargs): + """Run a blocking git call on the dedicated pool with a hard timeout. + + Raises ``TimeoutError`` (via ``asyncio.wait_for``) when the call exceeds + ``timeout`` seconds. ``timeout <= 0`` means no limit. NOTE: the timeout + unblocks the event loop and the awaiting coroutine, but the underlying + pygit2 call keeps running on its pool thread until the OS network timeout; + the dedicated pool keeps that lingering thread isolated. + """ + loop = asyncio.get_running_loop() + fut = loop.run_in_executor(_get_git_executor(), partial(func, *args, **kwargs)) + if timeout and timeout > 0: + try: + return await asyncio.wait_for(fut, timeout=timeout) + except asyncio.TimeoutError as exc: + # On Python < 3.11 ``asyncio.TimeoutError`` is a distinct class from + # the builtin ``TimeoutError``; normalize so callers (and the + # documented contract) can rely on the builtin everywhere. + raise TimeoutError() from exc + return await fut + class PolicyFetcherCallbacks: async def on_update(self, old_head: Optional[str], head: str): @@ -139,14 +179,24 @@ def __init__( f"Initializing git fetcher: scope_id={scope_id}, url={source.url}, branch={self._source.branch}, source_id={self._source_id}" ) - async def _get_repo_lock(self): - # Previous file based implementation worked across multiple processes/threads, but wasn't fair (next acquiree is random) - # This implementation works only within the same process/thread, but is fair (next acquiree is the earliest to enter the lock) - lock = GitPolicyFetcher.repo_locks[ - self._source_id - ] = GitPolicyFetcher.repo_locks.get(self._source_id, asyncio.Lock()) + @staticmethod + def source_lock(source_id: str) -> asyncio.Lock: + """Return the process-shared per-source lock, creating it on first use. + + The same lock object is reused for a given source_id so that a + fetch and a concurrent delete of the same source serialize + against each other. + """ + lock = GitPolicyFetcher.repo_locks.get(source_id) + if lock is None: + lock = GitPolicyFetcher.repo_locks[source_id] = asyncio.Lock() return lock + async def _get_repo_lock(self): + # Same-process, fair (FIFO) lock per source. Shared with delete_scope + # via GitPolicyFetcher.source_lock so delete cannot race an in-flight fetch. + return GitPolicyFetcher.source_lock(self._source_id) + async def _was_fetched_after(self, t: datetime.datetime): last_fetched = GitPolicyFetcher.repos_last_fetched.get(self._source_id, None) if last_fetched is None: @@ -187,13 +237,28 @@ async def fetch_and_notify_on_changes( logger.debug( f"Fetching remote (force_fetch={force_fetch}): {self._remote} ({self._source.url})" ) + try: + await run_in_git_executor( + repo.remotes[self._remote].fetch, + callbacks=self._auth_callbacks, + timeout=opal_server_config.SCOPES_GIT_FETCH_TIMEOUT, + ) + except TimeoutError as exc: + # Expected when a repo is unreachable: log cleanly + # (no traceback) and skip, matching the clone path. + # repos_last_fetched stays stale so the next cycle + # retries and force_fetch is not wrongly suppressed. + logger.error( + "Timed out fetching {url}, skipping: {err}", + url=self._source.url, + err=repr(exc), + ) + return + # Only mark as fetched after the fetch actually + # succeeds. GitPolicyFetcher.repos_last_fetched[ self._source_id ] = datetime.datetime.now() - await run_sync( - repo.remotes[self._remote].fetch, - callbacks=self._auth_callbacks, - ) logger.debug(f"Fetch completed: {self._source.url}") # New commits might be present because of a previous fetch made by another scope @@ -222,14 +287,19 @@ async def _clone(self): path=self._repo_path, ) try: - repo: Repository = await run_sync( + repo: Repository = await run_in_git_executor( clone_repository, self._source.url, str(self._repo_path), callbacks=self._auth_callbacks, + timeout=opal_server_config.SCOPES_GIT_FETCH_TIMEOUT, + ) + except (pygit2.GitError, TimeoutError) as exc: + logger.error( + "Could not clone repo at {url}: {err}", + url=self._source.url, + err=repr(exc), ) - except pygit2.GitError: - logger.exception(f"Could not clone repo at {self._source.url}") else: logger.info(f"Clone completed: {self._source.url}") await self._notify_on_changes(repo) @@ -366,6 +436,26 @@ def base_dir(base_dir: Path) -> Path: def repo_clone_path(base_dir: Path, source: GitPolicyScopeSource) -> Path: return GitPolicyFetcher.base_dir(base_dir) / GitPolicyFetcher.source_id(source) + @staticmethod + def forget_repo(path: str) -> None: + """Drop the cached repository for a clone path and release its handles. + + The cached ``pygit2.Repository`` keeps OS file descriptors and mmapped + pack indexes open; without this, a deleted scope's repo pins memory and + inodes for the lifetime of the process even after the clone is removed. + ``Repository.free()`` is called only when available (it is not part of + every pygit2 release); otherwise the dropped reference is reclaimed by GC. + """ + repo = GitPolicyFetcher.repos.pop(path, None) + if repo is None: + return + free = getattr(repo, "free", None) + if callable(free): + try: + free() + except Exception: + logger.debug("pygit2 Repository.free() failed; relying on GC") + class GitCallback(RemoteCallbacks): def __init__(self, source: GitPolicyScopeSource): diff --git a/packages/opal-server/opal_server/policy/watcher/task.py b/packages/opal-server/opal_server/policy/watcher/task.py index d420d183c..1afba7ef2 100644 --- a/packages/opal-server/opal_server/policy/watcher/task.py +++ b/packages/opal-server/opal_server/policy/watcher/task.py @@ -28,11 +28,10 @@ async def __aexit__(self, exc_type, exc, tb): async def _on_webhook(self, topic: Topic, data: Any): logger.info(f"Webhook listener triggered ({len(self._webhook_tasks)})") - for task in self._webhook_tasks: - if task.done(): - # Clean references to finished tasks - self._webhook_tasks.remove(task) - + # Rebuild rather than remove-while-iterating: list.remove() inside a + # `for t in self._webhook_tasks` loop skips the element after each removal, + # so finished tasks accumulate. + self._webhook_tasks = [t for t in self._webhook_tasks if not t.done()] self._webhook_tasks.append(asyncio.create_task(self.trigger(topic, data))) async def _listen_to_webhook_notifications(self): diff --git a/packages/opal-server/opal_server/scopes/service.py b/packages/opal-server/opal_server/scopes/service.py index d3df4972f..c4a37e18e 100644 --- a/packages/opal-server/opal_server/scopes/service.py +++ b/packages/opal-server/opal_server/scopes/service.py @@ -1,3 +1,4 @@ +import asyncio import datetime import shutil from functools import partial @@ -12,6 +13,7 @@ from opal_common.schemas.policy import PolicyUpdateMessageNotification from opal_common.schemas.policy_source import GitPolicyScopeSource from opal_common.topics.publisher import ScopedServerSideTopicPublisher +from opal_server.config import opal_server_config from opal_server.git_fetcher import GitPolicyFetcher, PolicyFetcherCallbacks from opal_server.policy.watcher.callbacks import ( create_policy_update, @@ -162,24 +164,38 @@ async def delete_scope(self, scope_id: str): with tracer.trace("scopes_service.delete_scope", resource=scope_id): logger.info(f"Delete scope: {scope_id}") scope = await self._scopes.get(scope_id) - url = scope.policy.url - - scopes = await self._scopes.all() - remove_repo_clone = True + deleted_source = cast(GitPolicyScopeSource, scope.policy) + deleted_source_id = GitPolicyFetcher.source_id(deleted_source) + scope_dir = GitPolicyFetcher.repo_clone_path(self._base_dir, deleted_source) + + # Clone dir, the `repos` handle cache, and `repos_last_fetched` are + # all keyed by source_id (= the clone path). A sibling only shares + # storage when it resolves to the same source_id; same url with a + # different branch can shard to a different source_id (and a + # different clone dir) when SCOPES_REPO_CLONES_SHARDS > 1, so gate on + # source_id, not url — otherwise the deleted scope's clone + pygit2 + # handle leak. + other_scopes = [ + s for s in await self._scopes.all() if s.scope_id != scope_id + ] + source_id_shared = any( + isinstance(s.policy, GitPolicyScopeSource) + and GitPolicyFetcher.source_id(s.policy) == deleted_source_id + for s in other_scopes + ) - for scope in scopes: - if scope.scope_id != scope_id and scope.policy.url == url: + # Serialize the filesystem + cache mutation against an in-flight + # fetch of the same source so a delete cannot race a concurrent + # sync_scope (see PR4 bounded-concurrency loading). + async with GitPolicyFetcher.source_lock(deleted_source_id): + if source_id_shared: logger.info( - f"found another scope with same remote url ({scope.scope_id}), skipping clone deletion" + "Another scope shares the same clone (source id), skipping clone deletion" ) - remove_repo_clone = False - break - - if remove_repo_clone: - scope_dir = GitPolicyFetcher.repo_clone_path( - self._base_dir, cast(GitPolicyScopeSource, scope.policy) - ) - shutil.rmtree(scope_dir, ignore_errors=True) + else: + shutil.rmtree(scope_dir, ignore_errors=True) + GitPolicyFetcher.forget_repo(str(scope_dir)) + GitPolicyFetcher.repos_last_fetched.pop(deleted_source_id, None) await self._scopes.delete(scope_id) @@ -190,38 +206,56 @@ async def sync_scopes(self, only_poll_updates=False, notify_on_changes=True): # Only sync scopes that have polling enabled (in a periodic check) scopes = [scope for scope in scopes if scope.policy.poll_updates] + concurrency = max(1, opal_server_config.SCOPES_SYNC_CONCURRENCY) logger.info( - f"OPAL Scopes: syncing {len(scopes)} scopes in the background (polling updates: {only_poll_updates})" + f"OPAL Scopes: syncing {len(scopes)} scopes " + f"(concurrency={concurrency}, polling updates: {only_poll_updates})" ) + # Pass 1: fetch each distinct source once (force_fetch). Pass 2: scopes that + # share an already-fetched source only re-check locally (no network). fetched_source_ids = set() - skipped_scopes = [] + first_pass = [] + second_pass = [] for scope in scopes: src_id = GitPolicyFetcher.source_id(scope.policy) # Give priority to scopes that have a unique url per shard (so we'll clone all repos asap) if src_id in fetched_source_ids: - skipped_scopes.append(scope) - continue - - try: - await self.sync_scope( - scope=scope, - force_fetch=True, - notify_on_changes=notify_on_changes, - ) - except Exception as e: - logger.exception(f"sync_scope failed for {scope.scope_id}") + second_pass.append(scope) + else: + fetched_source_ids.add(src_id) + first_pass.append(scope) + + await self._sync_scope_batch( + first_pass, + force_fetch=True, + notify_on_changes=notify_on_changes, + concurrency=concurrency, + ) + await self._sync_scope_batch( + second_pass, + force_fetch=False, + notify_on_changes=notify_on_changes, + concurrency=concurrency, + ) - fetched_source_ids.add(src_id) + async def _sync_scope_batch( + self, scopes, *, force_fetch, notify_on_changes, concurrency + ): + if not scopes: + return + semaphore = asyncio.Semaphore(concurrency) - for scope in skipped_scopes: - # No need to refetch the same repo, just check for changes + async def _one(scope): + async with semaphore: try: await self.sync_scope( scope=scope, - force_fetch=False, + force_fetch=force_fetch, notify_on_changes=notify_on_changes, ) - except Exception as e: + except Exception: logger.exception(f"sync_scope failed for {scope.scope_id}") + + await asyncio.gather(*(_one(scope) for scope in scopes)) diff --git a/packages/opal-server/opal_server/server.py b/packages/opal-server/opal_server/server.py index 00de60747..0946d3d2c 100644 --- a/packages/opal-server/opal_server/server.py +++ b/packages/opal-server/opal_server/server.py @@ -27,6 +27,7 @@ from opal_server.config import opal_server_config from opal_server.data.api import init_data_updates_router from opal_server.data.data_update_publisher import DataUpdatePublisher +from opal_server.debug_stats import register_internal_stats_route from opal_server.loadlimiting import init_loadlimit_router from opal_server.policy.bundles.api import router as bundles_router from opal_server.policy.watcher.factory import setup_watcher_task @@ -313,6 +314,12 @@ def healthcheck(): ) return {"status": "ok"} + register_internal_stats_route( + app, + enabled=opal_server_config.DEBUG_INTERNAL_STATS, + dependencies=[Depends(authenticator)], + ) + return app def _configure_lifecycle_callbacks(self, app: FastAPI): diff --git a/packages/opal-server/opal_server/tests/debug_stats_endpoint_test.py b/packages/opal-server/opal_server/tests/debug_stats_endpoint_test.py new file mode 100644 index 000000000..4abd1abb1 --- /dev/null +++ b/packages/opal-server/opal_server/tests/debug_stats_endpoint_test.py @@ -0,0 +1,40 @@ +from fastapi import Depends, FastAPI, HTTPException +from fastapi.testclient import TestClient +from opal_server.debug_stats import register_internal_stats_route + + +def _app_with_flag(enabled: bool) -> FastAPI: + app = FastAPI() + register_internal_stats_route(app, enabled=enabled) + return app + + +def test_endpoint_absent_when_disabled(): + client = TestClient(_app_with_flag(False)) + assert client.get("/internal/git-fetcher-cache-stats").status_code == 404 + + +def test_endpoint_present_when_enabled(): + client = TestClient(_app_with_flag(True)) + resp = client.get("/internal/git-fetcher-cache-stats") + assert resp.status_code == 200 + body = resp.json() + assert set(body) == {"repo_locks", "repos", "repos_last_fetched", "rss_kb"} + + +def test_endpoint_applies_passed_dependencies(): + """A route dependency (e.g. the server's authenticator) is enforced. + + Mirrors how server.py wires the real JWTAuthenticator: when + verification is enabled the dependency rejects unauthenticated + reads; when disabled it is a no-op (covered by the test above, which + passes no dependency). + """ + + def _deny(): + raise HTTPException(status_code=401, detail="unauthorized") + + app = FastAPI() + register_internal_stats_route(app, enabled=True, dependencies=[Depends(_deny)]) + resp = TestClient(app).get("/internal/git-fetcher-cache-stats") + assert resp.status_code == 401 diff --git a/packages/opal-server/opal_server/tests/debug_stats_test.py b/packages/opal-server/opal_server/tests/debug_stats_test.py new file mode 100644 index 000000000..c67fb3530 --- /dev/null +++ b/packages/opal-server/opal_server/tests/debug_stats_test.py @@ -0,0 +1,28 @@ +import sys + +from opal_server.config import opal_server_config +from opal_server.debug_stats import git_fetcher_cache_stats +from opal_server.git_fetcher import GitPolicyFetcher + + +def test_stats_report_dict_sizes(monkeypatch): + monkeypatch.setattr(GitPolicyFetcher, "repo_locks", {"a": object()}) + monkeypatch.setattr(GitPolicyFetcher, "repos", {"p1": object(), "p2": object()}) + monkeypatch.setattr(GitPolicyFetcher, "repos_last_fetched", {}) + + stats = git_fetcher_cache_stats() + + assert stats["repo_locks"] == 1 + assert stats["repos"] == 2 + assert stats["repos_last_fetched"] == 0 + assert isinstance(stats["rss_kb"], int) + # On Linux /proc/self/status exists, so RSS reading must actually work; on + # other platforms _read_rss_kb falls back to 0 and the wiring is untestable. + if sys.platform.startswith("linux"): + assert stats["rss_kb"] > 0 + else: + assert stats["rss_kb"] >= 0 + + +def test_internal_stats_flag_defaults_off(): + assert opal_server_config.DEBUG_INTERNAL_STATS is False diff --git a/packages/opal-server/opal_server/tests/delete_scope_cache_purge_test.py b/packages/opal-server/opal_server/tests/delete_scope_cache_purge_test.py new file mode 100644 index 000000000..4a4e0a5e7 --- /dev/null +++ b/packages/opal-server/opal_server/tests/delete_scope_cache_purge_test.py @@ -0,0 +1,132 @@ +import pytest +from opal_common.schemas.policy_source import GitPolicyScopeSource, NoAuthData +from opal_common.schemas.scopes import Scope +from opal_server.git_fetcher import GitPolicyFetcher +from opal_server.scopes.scope_repository import ScopeNotFoundError +from opal_server.scopes.service import ScopesService + + +class FakeScopeRepository: + def __init__(self, scopes): + self._scopes = {s.scope_id: s for s in scopes} + + async def get(self, scope_id): + if scope_id not in self._scopes: + raise ScopeNotFoundError(scope_id) + return self._scopes[scope_id] + + async def all(self): + return list(self._scopes.values()) + + async def delete(self, scope_id): + self._scopes.pop(scope_id, None) + + +def _scope(scope_id, url, branch="main"): + return Scope( + scope_id=scope_id, + policy=GitPolicyScopeSource( + source_type="git", + url=url, + branch=branch, + auth=NoAuthData(auth_type="none"), + ), + data={"entries": []}, + ) + + +@pytest.fixture(autouse=True) +def clear_caches(): + GitPolicyFetcher.repos.clear() + GitPolicyFetcher.repos_last_fetched.clear() + GitPolicyFetcher.repo_locks.clear() + yield + GitPolicyFetcher.repos.clear() + GitPolicyFetcher.repos_last_fetched.clear() + GitPolicyFetcher.repo_locks.clear() + + +@pytest.mark.asyncio +async def test_delete_unique_scope_purges_caches(tmp_path, monkeypatch): + scope = _scope("only", "https://git/repo-a.git") + repo = FakeScopeRepository([scope]) + svc = ScopesService(base_dir=tmp_path, scopes=repo, pubsub_endpoint=None) + + src = scope.policy + sid = GitPolicyFetcher.source_id(src) + clone_path = str(GitPolicyFetcher.repo_clone_path(tmp_path, src)) + GitPolicyFetcher.repos[clone_path] = object() + GitPolicyFetcher.repos_last_fetched[sid] = "ts" + + monkeypatch.setattr( + "opal_server.scopes.service.shutil.rmtree", lambda *a, **k: None + ) + + await svc.delete_scope("only") + + assert clone_path not in GitPolicyFetcher.repos + assert sid not in GitPolicyFetcher.repos_last_fetched + + +@pytest.mark.asyncio +async def test_delete_keeps_caches_when_sibling_shares_source(tmp_path, monkeypatch): + a = _scope("a", "https://git/shared.git") + b = _scope("b", "https://git/shared.git") # same url+branch -> same source_id + repo = FakeScopeRepository([a, b]) + svc = ScopesService(base_dir=tmp_path, scopes=repo, pubsub_endpoint=None) + + sid = GitPolicyFetcher.source_id(a.policy) + clone_path = str(GitPolicyFetcher.repo_clone_path(tmp_path, a.policy)) + GitPolicyFetcher.repos[clone_path] = object() + GitPolicyFetcher.repos_last_fetched[sid] = "ts" + + rmtree_calls = [] + monkeypatch.setattr( + "opal_server.scopes.service.shutil.rmtree", + lambda p, **k: rmtree_calls.append(p), + ) + + await svc.delete_scope("a") + + assert rmtree_calls == [] # sibling shares the source id; clone must survive + assert clone_path in GitPolicyFetcher.repos + assert sid in GitPolicyFetcher.repos_last_fetched + + +@pytest.mark.asyncio +async def test_delete_purges_when_sibling_shares_url_but_not_source( + tmp_path, monkeypatch +): + """Same url, different branch, sharded clones (SCOPES_REPO_CLONES_SHARDS>1) + resolve to different source_ids -> different clone dirs. + + Deleting one must still purge its own clone + caches; the url- + sharing sibling lives elsewhere. + """ + # shards=4: branch "main" -> index 1, "dev" -> index 3 (distinct source_ids). + monkeypatch.setattr( + "opal_server.git_fetcher.opal_server_config.SCOPES_REPO_CLONES_SHARDS", 4 + ) + a = _scope("a", "https://git/shared.git", branch="main") + b = _scope("b", "https://git/shared.git", branch="dev") # same url, diff source_id + assert GitPolicyFetcher.source_id(a.policy) != GitPolicyFetcher.source_id(b.policy) + + repo = FakeScopeRepository([a, b]) + svc = ScopesService(base_dir=tmp_path, scopes=repo, pubsub_endpoint=None) + + sid_a = GitPolicyFetcher.source_id(a.policy) + clone_path_a = str(GitPolicyFetcher.repo_clone_path(tmp_path, a.policy)) + GitPolicyFetcher.repos[clone_path_a] = object() + GitPolicyFetcher.repos_last_fetched[sid_a] = "ts" + + rmtree_calls = [] + monkeypatch.setattr( + "opal_server.scopes.service.shutil.rmtree", + lambda p, **k: rmtree_calls.append(str(p)), + ) + + await svc.delete_scope("a") + + assert rmtree_calls == [clone_path_a] # its own clone dir removed + assert clone_path_a not in GitPolicyFetcher.repos + assert sid_a not in GitPolicyFetcher.repos_last_fetched diff --git a/packages/opal-server/opal_server/tests/fetch_timeout_test.py b/packages/opal-server/opal_server/tests/fetch_timeout_test.py new file mode 100644 index 000000000..b8bde71e1 --- /dev/null +++ b/packages/opal-server/opal_server/tests/fetch_timeout_test.py @@ -0,0 +1,24 @@ +import time + +import pytest +from opal_server.config import opal_server_config +from opal_server.git_fetcher import run_in_git_executor + + +@pytest.mark.asyncio +async def test_hanging_git_op_raises_timeout(monkeypatch): + """A clone/fetch that hangs must surface TimeoutError, not block + forever.""" + monkeypatch.setattr(opal_server_config, "SCOPES_GIT_FETCH_TIMEOUT", 0.2) + + def _hang(): + # Short enough that the lingering pool thread doesn't delay teardown, + # but well above the 0.2s timeout under test. + time.sleep(1) + + start = time.monotonic() + with pytest.raises(TimeoutError): + await run_in_git_executor( + _hang, timeout=opal_server_config.SCOPES_GIT_FETCH_TIMEOUT + ) + assert time.monotonic() - start < 2, "wait_for did not unblock promptly" diff --git a/packages/opal-server/opal_server/tests/forget_repo_test.py b/packages/opal-server/opal_server/tests/forget_repo_test.py new file mode 100644 index 000000000..b6e223cf1 --- /dev/null +++ b/packages/opal-server/opal_server/tests/forget_repo_test.py @@ -0,0 +1,31 @@ +from opal_server.git_fetcher import GitPolicyFetcher + + +class _FakeRepo: + def __init__(self): + self.freed = False + + def free(self): + self.freed = True + + +def test_forget_repo_pops_and_frees(monkeypatch): + fake = _FakeRepo() + monkeypatch.setattr(GitPolicyFetcher, "repos", {"/clones/x": fake}) + + GitPolicyFetcher.forget_repo("/clones/x") + + assert "/clones/x" not in GitPolicyFetcher.repos + assert fake.freed is True + + +def test_forget_repo_unknown_path_is_noop(monkeypatch): + monkeypatch.setattr(GitPolicyFetcher, "repos", {}) + GitPolicyFetcher.forget_repo("/clones/missing") # must not raise + assert GitPolicyFetcher.repos == {} + + +def test_forget_repo_without_free_method(monkeypatch): + monkeypatch.setattr(GitPolicyFetcher, "repos", {"/clones/y": object()}) + GitPolicyFetcher.forget_repo("/clones/y") # object() has no .free(); must not raise + assert "/clones/y" not in GitPolicyFetcher.repos diff --git a/packages/opal-server/opal_server/tests/git_executor_test.py b/packages/opal-server/opal_server/tests/git_executor_test.py new file mode 100644 index 000000000..e1c534c80 --- /dev/null +++ b/packages/opal-server/opal_server/tests/git_executor_test.py @@ -0,0 +1,29 @@ +import time + +import pytest +from opal_server.config import OpalServerConfig +from opal_server.git_fetcher import run_in_git_executor + + +def test_git_resilience_config_defaults(): + clean = OpalServerConfig(prefix="OPAL_") + assert clean.SCOPES_GIT_FETCH_TIMEOUT == 120.0 + assert clean.SCOPES_GIT_MAX_WORKERS == 10 + + +@pytest.mark.asyncio +async def test_run_in_git_executor_returns_value(): + result = await run_in_git_executor(lambda: 21 * 2, timeout=5) + assert result == 42 + + +@pytest.mark.asyncio +async def test_run_in_git_executor_times_out(): + with pytest.raises(TimeoutError): + await run_in_git_executor(lambda: time.sleep(1), timeout=0.1) + + +@pytest.mark.asyncio +async def test_zero_timeout_means_no_limit(): + result = await run_in_git_executor(lambda: "ok", timeout=0) + assert result == "ok" diff --git a/packages/opal-server/opal_server/tests/sync_concurrency_test.py b/packages/opal-server/opal_server/tests/sync_concurrency_test.py new file mode 100644 index 000000000..f41ff7b21 --- /dev/null +++ b/packages/opal-server/opal_server/tests/sync_concurrency_test.py @@ -0,0 +1,157 @@ +import asyncio + +import pytest +from opal_common.schemas.policy_source import GitPolicyScopeSource +from opal_common.schemas.scopes import Scope +from opal_server.config import OpalServerConfig, opal_server_config +from opal_server.git_fetcher import GitPolicyFetcher +from opal_server.scopes.scope_repository import ScopeNotFoundError +from opal_server.scopes.service import ScopesService + + +class _FakeRepoStore: + def __init__(self, scopes): + self._s = {x.scope_id: x for x in scopes} + + async def get(self, sid): + if sid not in self._s: + raise ScopeNotFoundError(sid) + return self._s[sid] + + async def all(self): + return list(self._s.values()) + + async def delete(self, sid): + self._s.pop(sid, None) + + +def _git_scope(sid, url): + return Scope( + scope_id=sid, + policy=GitPolicyScopeSource( + source_type="git", + url=url, + branch="main", + auth={"auth_type": "none"}, + ), + data={"entries": []}, + ) + + +def test_sync_concurrency_default(): + clean = OpalServerConfig(prefix="OPAL_") + assert clean.SCOPES_SYNC_CONCURRENCY == 10 + + +@pytest.mark.asyncio +async def test_source_lock_is_stable_per_source(monkeypatch): + monkeypatch.setattr(GitPolicyFetcher, "repo_locks", {}) + lock_a1 = GitPolicyFetcher.source_lock("src-a") + lock_a2 = GitPolicyFetcher.source_lock("src-a") + lock_b = GitPolicyFetcher.source_lock("src-b") + assert lock_a1 is lock_a2 # same source -> same lock object + assert lock_a1 is not lock_b + assert isinstance(lock_a1, asyncio.Lock) + + +@pytest.mark.asyncio +async def test_delete_waits_for_in_flight_fetch(tmp_path, monkeypatch): + monkeypatch.setattr(GitPolicyFetcher, "repo_locks", {}) + monkeypatch.setattr(GitPolicyFetcher, "repos", {}) + monkeypatch.setattr(GitPolicyFetcher, "repos_last_fetched", {}) + monkeypatch.setattr( + "opal_server.scopes.service.shutil.rmtree", lambda *a, **k: None + ) + + scope = _git_scope("s1", "https://git/repo.git") + svc = ScopesService( + base_dir=tmp_path, scopes=_FakeRepoStore([scope]), pubsub_endpoint=None + ) + sid = GitPolicyFetcher.source_id(scope.policy) + + # simulate an in-flight fetch holding the per-source lock + lock = GitPolicyFetcher.source_lock(sid) + await lock.acquire() + + delete_task = asyncio.create_task(svc.delete_scope("s1")) + await asyncio.sleep(0.05) + assert not delete_task.done(), "delete proceeded while the source lock was held" + + lock.release() + await asyncio.wait_for(delete_task, timeout=2) + assert delete_task.done() + + +@pytest.mark.asyncio +async def test_sync_scopes_runs_in_parallel_bounded(tmp_path, monkeypatch): + monkeypatch.setattr(opal_server_config, "SCOPES_SYNC_CONCURRENCY", 3) + + scopes = [_git_scope(f"s{i}", f"https://git/r{i}.git") for i in range(9)] + svc = ScopesService( + base_dir=tmp_path, scopes=_FakeRepoStore(scopes), pubsub_endpoint=None + ) + + in_flight = 0 + peak = 0 + + async def fake_sync_scope(*args, scope=None, **kwargs): + nonlocal in_flight, peak + in_flight += 1 + peak = max(peak, in_flight) + await asyncio.sleep(0.05) + in_flight -= 1 + + monkeypatch.setattr(svc, "sync_scope", fake_sync_scope) + + await svc.sync_scopes() + + assert peak <= 3, f"concurrency exceeded the cap: peak={peak}" + assert peak > 1, "did not actually run in parallel" + + +@pytest.mark.asyncio +async def test_sync_scopes_isolates_one_failure(tmp_path, monkeypatch): + monkeypatch.setattr(opal_server_config, "SCOPES_SYNC_CONCURRENCY", 5) + + scopes = [_git_scope(f"s{i}", f"https://git/r{i}.git") for i in range(5)] + svc = ScopesService( + base_dir=tmp_path, scopes=_FakeRepoStore(scopes), pubsub_endpoint=None + ) + + started = [] + completed = [] + in_flight = 0 + peak = 0 + + async def fake_sync_scope(*args, scope=None, **kwargs): + nonlocal in_flight, peak + started.append(scope.scope_id) + in_flight += 1 + peak = max(peak, in_flight) + try: + # A real suspension point so sibling coroutines actually interleave; + # the failing scope then raises mid-flight while its peers are still + # in-flight — the exact condition the concurrent path must isolate. + await asyncio.sleep(0.05) + if scope.scope_id == "s2": + raise RuntimeError("boom") + completed.append(scope.scope_id) + finally: + in_flight -= 1 + + monkeypatch.setattr(svc, "sync_scope", fake_sync_scope) + + await svc.sync_scopes() # must not raise + + # Every scope was attempted, and s2's mid-flight failure neither stopped the + # batch nor cancelled its concurrently-running siblings (all four completed). + assert set(started) == {f"s{i}" for i in range(5)}, "a failure stopped the batch" + assert set(completed) == { + "s0", + "s1", + "s3", + "s4", + }, "a sibling failure cancelled in-flight peers" + # peak > 1 proves this exercised the concurrent path (a serial loop would + # give peak == 1), so the isolation guarantee is verified against parallelism. + assert peak > 1, "did not actually run in parallel" diff --git a/packages/opal-server/opal_server/tests/webhook_task_cleanup_test.py b/packages/opal-server/opal_server/tests/webhook_task_cleanup_test.py new file mode 100644 index 000000000..a576d924c --- /dev/null +++ b/packages/opal-server/opal_server/tests/webhook_task_cleanup_test.py @@ -0,0 +1,35 @@ +import asyncio + +import pytest +from opal_server.policy.watcher.task import BasePolicyWatcherTask + + +class _Watcher(BasePolicyWatcherTask): + async def trigger(self, topic, data): + return None # fast no-op so the created task finishes quickly + + +@pytest.mark.asyncio +async def test_done_tasks_are_all_removed(): + w = _Watcher(pubsub_endpoint=None) + + async def _done(): + return None + + # three already-finished tasks pre-loaded into the list + finished = [asyncio.create_task(_done()) for _ in range(3)] + await asyncio.gather(*finished) + w._webhook_tasks = list(finished) + + await w._on_webhook("webhook", None) + await asyncio.sleep(0) # let the newly created trigger task finish + + # all 3 done ones removed... + remaining_done = [t for t in w._webhook_tasks if t in finished] + assert remaining_done == [], f"stale done tasks leaked: {remaining_done}" + # ...and exactly the one freshly scheduled trigger task survives. + survivors = [t for t in w._webhook_tasks if t not in finished] + assert len(w._webhook_tasks) == 1 + assert len(survivors) == 1, f"new trigger task not scheduled: {w._webhook_tasks}" + + await asyncio.gather(*survivors) # drain the dangling task diff --git a/pytest.ini b/pytest.ini index 16c88ba91..c98ec88b9 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,3 +1,10 @@ # Handling DeprecationWarning 'asyncio_mode' default value [pytest] asyncio_mode = strict +# Scope the default (rootdir) collection to the unit tests under packages/. +# The heavyweight, docker-compose-driven test bed under app-tests/git-leak/ is +# designed to FAIL on master (it is the regression gate for later PRs) and must +# not be collected by the repo's CI `pytest` run. testpaths only applies when +# pytest is invoked from the rootdir with no args, so running it explicitly +# (e.g. `cd app-tests/git-leak && pytest`) still works. +testpaths = packages