From 958973ae81e59b64224434181beb0e4eb4ac8d40 Mon Sep 17 00:00:00 2001 From: Zeev Manilovich Date: Fri, 5 Jun 2026 21:14:31 +0300 Subject: [PATCH 1/8] fix(opal-server): reconnect broadcaster on backbone disconnect A brief broadcaster-backbone outage escalated into a fleet-wide client connection-drop storm that only a worker restart cleared: the shared broadcaster reader task completed on disconnect and was never restarted while clients stayed connected, so every reconnecting client was cancelled. Add ReconnectingBroadcaster (reader reconnects with bounded exponential backoff, stays pending across a transient backbone loss) and an idempotent SafeConnectionManager (eliminates the ValueError('list.remove(x): x not in list') churn). Gate via OPAL_BROADCAST_RECONNECT_* config keys. Includes unit + integration tests with negative controls that reproduce the bug, and an extended app-tests e2e (graceful + ungraceful backbone drop). Co-Authored-By: Claude Opus 4.8 (1M context) --- app-tests/docker-compose-app-tests.yml | 5 + app-tests/run.sh | 47 +++- .../run-opal-server/broadcast-interface.mdx | 13 ++ packages/opal-server/opal_server/config.py | 23 ++ packages/opal-server/opal_server/pubsub.py | 37 ++- .../opal_server/pubsub_resilience.py | 181 +++++++++++++++ .../broadcaster_reconnect_integration_test.py | 218 ++++++++++++++++++ .../tests/reconnecting_broadcaster_test.py | 192 +++++++++++++++ .../tests/safe_connection_manager_test.py | 44 ++++ 9 files changed, 752 insertions(+), 8 deletions(-) create mode 100644 packages/opal-server/opal_server/pubsub_resilience.py create mode 100644 packages/opal-server/opal_server/tests/broadcaster_reconnect_integration_test.py create mode 100644 packages/opal-server/opal_server/tests/reconnecting_broadcaster_test.py create mode 100644 packages/opal-server/opal_server/tests/safe_connection_manager_test.py diff --git a/app-tests/docker-compose-app-tests.yml b/app-tests/docker-compose-app-tests.yml index 4b72dac15..7d126bfb8 100644 --- a/app-tests/docker-compose-app-tests.yml +++ b/app-tests/docker-compose-app-tests.yml @@ -47,6 +47,11 @@ services: - POSTGRES_DB=postgres - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 5s + timeout: 3s + retries: 10 opal_server: image: permitio/opal-server:${OPAL_IMAGE_TAG:-latest} diff --git a/app-tests/run.sh b/app-tests/run.sh index 8acaaf21d..a58d9cb42 100755 --- a/app-tests/run.sh +++ b/app-tests/run.sh @@ -238,6 +238,33 @@ function check_no_error { fi } +function check_servers_logged { + echo "- Looking for msg '$1' in server's logs" + compose logs opal_server | grep -q "$1" +} + +function check_servers_not_logged { + echo "- Ensuring msg '$1' is absent from server's logs" + if compose logs opal_server | grep -q "$1"; then + echo "- Unexpectedly found '$1' in server logs:" + compose logs opal_server | grep "$1" + exit 1 + fi +} + +function wait_for_broadcaster { + echo "- Waiting for broadcast_channel to accept connections" + for _ in $(seq 1 30); do + if compose exec -T broadcast_channel pg_isready -U postgres -q; then + echo " broadcast_channel is ready" + return 0 + fi + sleep 1 + done + echo " broadcast_channel did not become ready in time" + exit 1 +} + function clean_up { ARG=$? # Ensure we're in the script directory for cleanup @@ -344,15 +371,31 @@ function main { test_push_policy "something" test_statistics - echo "- Testing broadcast channel disconnection" + echo "- Testing broadcast channel disconnection (graceful restart)" compose restart broadcast_channel - sleep 10 + wait_for_broadcaster + # Give the servers' reconnecting broadcaster a moment to re-establish the backbone + sleep 5 test_data_publish "alice" test_push_policy "another" + + echo "- Testing broadcast channel disconnection (ungraceful kill)" + compose kill broadcast_channel + sleep 3 + compose up -d broadcast_channel + wait_for_broadcaster + sleep 5 + test_data_publish "sunil" test_data_publish "eve" test_push_policy "best_one_yet" + + # Regression guards for the broadcaster-disconnect storm (see pubsub_resilience.py): + # the servers must have exercised the reconnect path, and must NOT have spewed the + # non-idempotent-disconnect ValueError that drove the fleet-wide drop storm. + check_servers_logged "backbone connection closed" + check_servers_not_logged "list.remove(x): x not in list" # TODO: Test statistics feature again after broadcaster restart (should first fix statistics bug) } diff --git a/documentation/docs/getting-started/running-opal/run-opal-server/broadcast-interface.mdx b/documentation/docs/getting-started/running-opal/run-opal-server/broadcast-interface.mdx index 4b9253947..7c7dbeda6 100644 --- a/documentation/docs/getting-started/running-opal/run-opal-server/broadcast-interface.mdx +++ b/documentation/docs/getting-started/running-opal/run-opal-server/broadcast-interface.mdx @@ -59,3 +59,16 @@ This is how you define the number of workers (pay attention: this env var is not | Env Var Name | Function | | :------------------ | :--------------------------------------------------------------- | | UVICORN_NUM_WORKERS | the number of workers in a single container (example value: `4`) | + +#### 4) Broadcaster reconnection (resilience) + +If the broadcast backbone (Postgres/Redis/Kafka) briefly drops — for example during a managed-database failover or restart — OPAL servers reconnect to it automatically with bounded exponential backoff, instead of dropping their connected clients. This is enabled by default; the following **server-side** env vars (all prefixed with `OPAL_`) tune it: + +| Env Var Name | Default | Function | +| :------------------------------------------- | :------ | :----------------------------------------------------------------------------------------------------------------------------------------------- | +| OPAL_BROADCAST_RECONNECT_ENABLED | `true` | Reconnect the broadcaster reader on a backbone disconnect instead of dropping all client connections. Set to `false` for the legacy behavior. | +| OPAL_BROADCAST_RECONNECT_MAX_RETRIES | `0` | Maximum consecutive reconnect attempts before giving up and letting the worker restart. `0` means retry forever. | +| OPAL_BROADCAST_RECONNECT_BACKOFF_MIN_SECONDS | `0.5` | Minimum backoff (seconds) between reconnect attempts. | +| OPAL_BROADCAST_RECONNECT_BACKOFF_MAX_SECONDS | `30` | Maximum backoff (seconds) between reconnect attempts. | + +While the backbone is unreachable, cross-server fan-out is paused but client websocket connections are kept alive; updates published during the outage are delivered once the backbone reconnects (OPAL keeps no replay buffer, so a client that itself reconnects re-fetches the full policy/data state). diff --git a/packages/opal-server/opal_server/config.py b/packages/opal-server/opal_server/config.py index 9faac9be4..c0f80d1ef 100644 --- a/packages/opal-server/opal_server/config.py +++ b/packages/opal-server/opal_server/config.py @@ -57,6 +57,29 @@ class OpalServerConfig(Confi): True, description="Enable experimental bugfix for broadcast connection loss", ) + BROADCAST_RECONNECT_ENABLED = confi.bool( + "BROADCAST_RECONNECT_ENABLED", + True, + description="Reconnect the broadcaster reader on a backbone disconnect instead " + "of dropping all client connections. Set to False to revert to the legacy " + "(non-reconnecting) broadcaster.", + ) + BROADCAST_RECONNECT_MAX_RETRIES = confi.int( + "BROADCAST_RECONNECT_MAX_RETRIES", + 0, + description="Maximum consecutive broadcaster reconnect attempts before giving " + "up and letting the worker restart (0 = retry forever).", + ) + BROADCAST_RECONNECT_BACKOFF_MIN_SECONDS = confi.float( + "BROADCAST_RECONNECT_BACKOFF_MIN_SECONDS", + 0.5, + description="Minimum backoff in seconds between broadcaster reconnect attempts.", + ) + BROADCAST_RECONNECT_BACKOFF_MAX_SECONDS = confi.float( + "BROADCAST_RECONNECT_BACKOFF_MAX_SECONDS", + 30.0, + description="Maximum backoff in seconds between broadcaster reconnect attempts.", + ) # server security AUTH_PRIVATE_KEY_FORMAT = confi.enum( diff --git a/packages/opal-server/opal_server/pubsub.py b/packages/opal-server/opal_server/pubsub.py index c7a3b875e..807315320 100644 --- a/packages/opal-server/opal_server/pubsub.py +++ b/packages/opal-server/opal_server/pubsub.py @@ -29,6 +29,7 @@ from opal_common.config import opal_common_config from opal_common.logger import logger from opal_server.config import opal_server_config +from opal_server.pubsub_resilience import ReconnectingBroadcaster, SafeConnectionManager from pydantic import BaseModel from starlette.datastructures import QueryParams @@ -141,12 +142,27 @@ def __init__(self, signer: JWTSigner, broadcaster_uri: str = None): self.broadcaster = None if broadcaster_uri is not None: - logger.info(f"Initializing broadcaster for server<->server communication") - self.broadcaster = EventBroadcaster( - broadcaster_uri, - notifier=self.notifier, - channel=opal_server_config.BROADCAST_CHANNEL_NAME, - ) + if opal_server_config.BROADCAST_RECONNECT_ENABLED: + logger.info( + "Initializing reconnecting broadcaster for server<->server communication" + ) + self.broadcaster = ReconnectingBroadcaster( + broadcaster_uri, + notifier=self.notifier, + channel=opal_server_config.BROADCAST_CHANNEL_NAME, + reconnect_max_retries=opal_server_config.BROADCAST_RECONNECT_MAX_RETRIES, + reconnect_backoff_min=opal_server_config.BROADCAST_RECONNECT_BACKOFF_MIN_SECONDS, + reconnect_backoff_max=opal_server_config.BROADCAST_RECONNECT_BACKOFF_MAX_SECONDS, + ) + else: + logger.info( + "Initializing broadcaster for server<->server communication" + ) + self.broadcaster = EventBroadcaster( + broadcaster_uri, + notifier=self.notifier, + channel=opal_server_config.BROADCAST_CHANNEL_NAME, + ) else: logger.info("Pub/Sub broadcaster is off") @@ -159,6 +175,15 @@ def __init__(self, signer: JWTSigner, broadcaster_uri: str = None): not opal_server_config.BROADCAST_CONN_LOSS_BUGFIX_EXPERIMENT_ENABLED ), ) + # fastapi_websocket_rpc's ConnectionManager.disconnect is not idempotent: the RPC + # endpoint can call it twice for one socket (handle_disconnect plus the outer + # except in WebsocketRPCEndpoint.main_loop), raising + # ValueError('list.remove(x): x not in list'). Swap in an idempotent manager + # before any connection is served. + assert hasattr(self.endpoint, "endpoint") and hasattr( + self.endpoint.endpoint, "manager" + ), "Unexpected fastapi_websocket_pubsub internals: cannot install SafeConnectionManager" + self.endpoint.endpoint.manager = SafeConnectionManager() authenticator = WebsocketJWTAuthenticator(signer) @self.api_router.get( diff --git a/packages/opal-server/opal_server/pubsub_resilience.py b/packages/opal-server/opal_server/pubsub_resilience.py new file mode 100644 index 000000000..85b99aef4 --- /dev/null +++ b/packages/opal-server/opal_server/pubsub_resilience.py @@ -0,0 +1,181 @@ +"""Resilience wrappers around the ``fastapi_websocket_pubsub`` / +``fastapi_websocket_rpc`` pub/sub layer used by the OPAL server. + +Two upstream issues let a *transient* broadcaster-backbone disconnect +(Postgres ``LISTEN/NOTIFY``, Redis, Kafka) escalate into a self-sustaining, +fleet-wide client connection-drop storm that only a worker restart clears: + +1. ``EventBroadcaster``'s reader task runs to completion when the backbone + connection drops and is never restarted while clients remain connected + (``_subscription_task`` is reset only when the listener count reaches 0). + Because OPAL runs with ``ignore_broadcaster_disconnected=False``, every + client websocket waits on that shared reader task; once it is *done* every + client is cancelled and dropped, indefinitely. +2. ``ConnectionManager.disconnect`` is not idempotent; the RPC endpoint can + call it twice for the same socket, raising + ``ValueError('list.remove(x): x not in list')``. + +``ReconnectingBroadcaster`` keeps the reader task *pending* across transient +backbone losses by reconnecting with bounded exponential backoff. +``SafeConnectionManager`` makes ``disconnect`` idempotent. Both are stop-gaps +until the fixes land in the upstream libraries (see Phase 2 of the plan). +""" +import asyncio +import random + +from fastapi import WebSocket +from fastapi_websocket_pubsub import EventBroadcaster +from fastapi_websocket_pubsub.event_broadcaster import BroadcastNotification +from fastapi_websocket_rpc.connection_manager import ConnectionManager +from opal_common.logger import logger + + +class SafeConnectionManager(ConnectionManager): + """A ``ConnectionManager`` whose ``disconnect`` is idempotent. + + The upstream implementation calls ``self.active_connections.remove(websocket)`` + unconditionally, so a second disconnect for the same socket raises + ``ValueError('list.remove(x): x not in list')``. That error escapes + ``WebsocketRPCEndpoint.main_loop`` as an unretrieved task exception and, under a + reconnect storm, is logged thousands of times. Guarding the removal turns a + double disconnect into a no-op. + """ + + def disconnect(self, websocket: WebSocket): + try: + self.active_connections.remove(websocket) + except ValueError: + logger.debug("Ignoring duplicate websocket disconnect") + + +class ReconnectingBroadcaster(EventBroadcaster): + """An ``EventBroadcaster`` whose listener reconnects instead of dying. + + The base reader coroutine (``__read_notifications__``) returns when the backbone + connection closes and is not restarted while clients stay connected, leaving + ``get_reader_task()`` permanently *done* — which cancels every client websocket + loop. This subclass wraps the connect/subscribe/read cycle in a reconnect loop + with bounded exponential backoff, so the reader task stays *pending* across + transient outages. The task only completes on clean shutdown (cancellation) or + after ``reconnect_max_retries`` consecutive failures, in which case the existing + ``ignore_broadcaster_disconnected=False`` path triggers a graceful worker restart. + """ + + def __init__( + self, + *args, + reconnect_max_retries: int = 0, + reconnect_backoff_min: float = 0.5, + reconnect_backoff_max: float = 30.0, + **kwargs, + ): + super().__init__(*args, **kwargs) + self._reconnect_max_retries = reconnect_max_retries + self._reconnect_backoff_min = reconnect_backoff_min + self._reconnect_backoff_max = reconnect_backoff_max + + async def start_reader_task(self): + """Spawn the reconnecting reader task once. + + Unlike the base implementation we do not connect the channel + here — the reader loop owns (re)connection, so a backbone that + is already down at startup is retried rather than raised to the + first connecting client. + """ + if self._subscription_task is not None: + logger.debug("No need for listen task, already started") + return self._subscription_task + logger.debug("Spawning reconnecting broadcast listen task") + self._subscription_task = asyncio.create_task(self.__read_notifications__()) + return self._subscription_task + + async def __read_notifications__(self): + """Read incoming broadcasts, reconnecting on backbone disconnect. + + ``__read_notifications__`` ends in a double underscore, so it is not a + name-mangled private name and this override is what the inherited + ``start_reader_task`` (and ours) dispatches to. + """ + attempt = 0 + while True: + try: + channel = await self._ensure_connected() + attempt = 0 + logger.info( + f"Broadcaster listener connected to channel '{self._channel}'" + ) + async with channel.subscribe(channel=self._channel) as subscriber: + async for event in subscriber: + await self._handle_broadcast_event(event) + logger.warning( + "Broadcast subscriber ended (backbone connection closed); reconnecting" + ) + except asyncio.CancelledError: + logger.info("Broadcaster listener cancelled; stopping") + raise + except Exception as e: + attempt += 1 + logger.error(f"Broadcaster listener error (attempt {attempt}): {e!r}") + if ( + self._reconnect_max_retries + and attempt >= self._reconnect_max_retries + ): + logger.error( + f"Broadcaster reconnect exhausted after {attempt} attempts; " + "giving up so the worker can restart" + ) + break + finally: + await self._safe_disconnect_channel() + await asyncio.sleep(self._backoff_seconds(attempt)) + + async def _ensure_connected(self): + if self.listening_broadcast_channel is None: + self.listening_broadcast_channel = self._broadcast_type(self._broadcast_url) + await self.listening_broadcast_channel.connect() + return self.listening_broadcast_channel + + async def _handle_broadcast_event(self, event): + """Forward one incoming broadcast to the internal notifier. + + Mirrors the base class' per-event handling; kept here so the + reconnect loop above stays readable. + """ + try: + notification = BroadcastNotification.parse_raw(event.message) + # Avoid re-publishing our own broadcasts + if notification.notifier_id != self._id: + logger.debug( + "Handling incoming broadcast event: {}".format( + {"topics": notification.topics, "src": notification.notifier_id} + ) + ) + task = asyncio.create_task( + self._notifier.notify( + notification.topics, + notification.data, + notifier_id=self._id, + ) + ) + self._tasks.add(task) + task.add_done_callback(self._tasks.discard) + except Exception: + logger.exception("Failed handling incoming broadcast") + + async def _safe_disconnect_channel(self): + channel = self.listening_broadcast_channel + self.listening_broadcast_channel = None + if channel is not None: + try: + await channel.disconnect() + except Exception: + logger.debug("Error while disconnecting broadcast channel; ignoring") + + def _backoff_seconds(self, attempt: int) -> float: + if attempt <= 0: + base = self._reconnect_backoff_min + else: + base = self._reconnect_backoff_min * (2 ** (attempt - 1)) + base = min(base, self._reconnect_backoff_max) + # Equal jitter, so a fleet of pods does not reconnect to the backbone in lockstep. + return base / 2 + random.uniform(0, base / 2) diff --git a/packages/opal-server/opal_server/tests/broadcaster_reconnect_integration_test.py b/packages/opal-server/opal_server/tests/broadcaster_reconnect_integration_test.py new file mode 100644 index 000000000..ddad8e84e --- /dev/null +++ b/packages/opal-server/opal_server/tests/broadcaster_reconnect_integration_test.py @@ -0,0 +1,218 @@ +"""Integration test for the reconnecting broadcaster against the real +``PubSubEndpoint.main_loop``. + +In production every client websocket waits (``ignore_broadcaster_disconnected=False``) +on a single shared broadcaster reader task. When the backbone drops, the stock reader +completes and ``PubSubEndpoint.main_loop`` cancels the client's websocket loop — the +mechanism behind the fleet-wide drop storm. This test wires the real ``PubSubEndpoint`` +to a fault-injectable in-memory backbone and asserts: + +* with ``ReconnectingBroadcaster`` the client loop is NOT cancelled across a backbone + outage (the reader stays pending and reconnects), and +* (negative control) with the stock ``EventBroadcaster`` the client loop IS cancelled, + reproducing the bug. + +It runs fully in-process, driving the actual ``PubSubEndpoint`` / ``WebsocketRPCEndpoint`` +code paths with a fake websocket, so it is deterministic and needs no real backbone. +""" +import asyncio +from types import SimpleNamespace + +import pytest +from fastapi import WebSocketDisconnect +from fastapi_websocket_pubsub import EventBroadcaster, PubSubEndpoint +from fastapi_websocket_pubsub.websocket_rpc_event_notifier import ( + WebSocketRpcEventNotifier, +) +from opal_server.pubsub_resilience import ReconnectingBroadcaster + +_END = object() + + +class FaultyBus: + """A fault-injectable in-memory backbone shared by all channel + instances.""" + + def __init__(self): + self.faulted = False + self.connects = 0 + self.subscribes = 0 + self._subscriber_queues = [] + + def channel_factory(self, _url): + return _FaultyChannel(self) + + def fault(self): + """Drop the backbone: end active reads and refuse new connections.""" + self.faulted = True + for queue in list(self._subscriber_queues): + queue.put_nowait(_END) + + def recover(self): + self.faulted = False + + +class _FaultyChannel: + def __init__(self, bus): + self._bus = bus + + async def connect(self): + self._bus.connects += 1 + if self._bus.faulted: + raise ConnectionError("backbone is faulted") + + async def disconnect(self): + pass + + def subscribe(self, channel): + return _FaultySubscription(self._bus) + + +class _FaultySubscription: + def __init__(self, bus): + self._bus = bus + self._queue = asyncio.Queue() + + async def __aenter__(self): + self._bus.subscribes += 1 + self._bus._subscriber_queues.append(self._queue) + return _FaultySubscriber(self._queue) + + async def __aexit__(self, *exc): + if self._queue in self._bus._subscriber_queues: + self._bus._subscriber_queues.remove(self._queue) + return False + + +class _FaultySubscriber: + def __init__(self, queue): + self._queue = queue + + def __aiter__(self): + return self + + async def __anext__(self): + item = await self._queue.get() + if item is _END: + raise StopAsyncIteration + return item + + +class FakeWebSocket: + """A websocket whose receive blocks until close — an idle, connected + client.""" + + def __init__(self): + self.client = SimpleNamespace(host="test-client", port=12345) + self._closed = asyncio.Event() + + async def accept(self): + pass + + async def send_text(self, data): + pass + + async def send_bytes(self, data): + pass + + async def receive_text(self): + await self._closed.wait() + raise WebSocketDisconnect() + + async def receive_bytes(self): + await self._closed.wait() + raise WebSocketDisconnect() + + async def close(self, code: int = 1000): + self._closed.set() + + +async def _wait_for(predicate, timeout=3.0): + loop = asyncio.get_event_loop() + deadline = loop.time() + timeout + while loop.time() < deadline: + if predicate(): + return + await asyncio.sleep(0.01) + raise AssertionError("condition not met within timeout") + + +async def _serve_one_client(endpoint, websocket): + return asyncio.create_task(endpoint.main_loop(websocket)) + + +async def _shutdown(websocket, client_task): + await websocket.close() + client_task.cancel() + try: + await client_task + except (asyncio.CancelledError, Exception): + pass + + +@pytest.mark.asyncio +async def test_client_not_cancelled_when_backbone_drops_with_reconnect(): + notifier = WebSocketRpcEventNotifier() + bus = FaultyBus() + broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=notifier, + channel="test", + broadcast_type=bus.channel_factory, + reconnect_backoff_min=0.01, + reconnect_backoff_max=0.05, + ) + endpoint = PubSubEndpoint( + notifier=notifier, + broadcaster=broadcaster, + ignore_broadcaster_disconnected=False, + ) + websocket = FakeWebSocket() + client_task = await _serve_one_client(endpoint, websocket) + try: + await _wait_for(lambda: bus.subscribes >= 1) + connects_before = bus.connects + + bus.fault() # backbone outage + await asyncio.sleep(0.3) + # The client websocket loop must survive the outage... + assert not client_task.done() + # ...because the reader kept trying to reconnect instead of dying. + assert bus.connects > connects_before + + bus.recover() + await _wait_for(lambda: bus.subscribes >= 2) # reader reconnected + assert not client_task.done() + finally: + await _shutdown(websocket, client_task) + + +@pytest.mark.asyncio +async def test_client_cancelled_when_backbone_drops_without_reconnect(): + # Negative control: the stock EventBroadcaster reproduces the production bug — the + # shared reader task completes on a backbone drop and the client loop is cancelled. + notifier = WebSocketRpcEventNotifier() + bus = FaultyBus() + broadcaster = EventBroadcaster( + "memory://", + notifier=notifier, + channel="test", + broadcast_type=bus.channel_factory, + ) + endpoint = PubSubEndpoint( + notifier=notifier, + broadcaster=broadcaster, + ignore_broadcaster_disconnected=False, + ) + websocket = FakeWebSocket() + client_task = await _serve_one_client(endpoint, websocket) + try: + await _wait_for(lambda: bus.subscribes >= 1) + + bus.fault() + # The client loop ends (gets cancelled) because the shared reader died. + await _wait_for(lambda: client_task.done(), timeout=3) + assert client_task.done() + assert bus.connects == 1 # never reconnected + finally: + await _shutdown(websocket, client_task) diff --git a/packages/opal-server/opal_server/tests/reconnecting_broadcaster_test.py b/packages/opal-server/opal_server/tests/reconnecting_broadcaster_test.py new file mode 100644 index 000000000..fc3091fbf --- /dev/null +++ b/packages/opal-server/opal_server/tests/reconnecting_broadcaster_test.py @@ -0,0 +1,192 @@ +"""Unit tests for ReconnectingBroadcaster. + +The tests drive the broadcaster against an in-memory, fault-injectable backbone so +they are deterministic and require no real Postgres/Redis. The key invariant under +test is that a transient backbone disconnect must NOT complete the reader task (which +is what cancels every client websocket in production). The final test is a negative +control: the stock EventBroadcaster reader DOES complete on the same disconnect, +proving these tests actually catch the regression. +""" +import asyncio + +import pytest +from fastapi_websocket_pubsub import EventBroadcaster +from fastapi_websocket_pubsub.event_broadcaster import BroadcastNotification +from opal_server.pubsub_resilience import ReconnectingBroadcaster + +_END = object() + + +class _Event: + def __init__(self, message): + self.message = message + + +class FakeNotifier: + def __init__(self): + self.notified = [] + + async def notify(self, topics, data, notifier_id=None): + self.notified.append((list(topics), data, notifier_id)) + + +class FakeBus: + """A controllable in-memory broadcast backbone for a single channel.""" + + def __init__(self, fail_connect=False): + self.fail_connect = fail_connect + self.connects = 0 + self.subscribes = 0 + self.disconnects = 0 + self.queue = asyncio.Queue() + + def channel_factory(self, _url): + return _FakeChannel(self) + + async def drop(self): + """Simulate the backbone closing the read connection.""" + await self.queue.put(_END) + + async def push(self, topics, data, notifier_id): + note = BroadcastNotification(notifier_id=notifier_id, topics=topics, data=data) + await self.queue.put(_Event(note.json())) + + +class _FakeChannel: + def __init__(self, bus): + self._bus = bus + + async def connect(self): + self._bus.connects += 1 + if self._bus.fail_connect: + raise ConnectionError("backbone refused connection") + + async def disconnect(self): + self._bus.disconnects += 1 + + def subscribe(self, channel): + return _FakeSubscription(self._bus) + + +class _FakeSubscription: + def __init__(self, bus): + self._bus = bus + + async def __aenter__(self): + self._bus.subscribes += 1 + return _FakeSubscriber(self._bus) + + async def __aexit__(self, *exc): + return False + + +class _FakeSubscriber: + def __init__(self, bus): + self._bus = bus + + def __aiter__(self): + return self + + async def __anext__(self): + item = await self._bus.queue.get() + if item is _END: + raise StopAsyncIteration + return item + + +async def _wait_for(predicate, timeout=2.0): + loop = asyncio.get_event_loop() + deadline = loop.time() + timeout + while loop.time() < deadline: + if predicate(): + return + await asyncio.sleep(0.01) + raise AssertionError("condition not met within timeout") + + +@pytest.mark.asyncio +async def test_reader_reconnects_and_stays_pending(): + bus = FakeBus() + notifier = FakeNotifier() + broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=notifier, + channel="test", + broadcast_type=bus.channel_factory, + reconnect_backoff_min=0, + reconnect_backoff_max=0, + ) + task = await broadcaster.start_reader_task() + try: + await _wait_for(lambda: bus.subscribes >= 1) + assert bus.connects == 1 + + await bus.drop() # the backbone connection closes + + await _wait_for(lambda: bus.subscribes >= 2) # reader reconnected + assert bus.connects == 2 + # The whole point: the reader task survives a transient disconnect. + assert not task.done() + + # After reconnect, a broadcast from another server is still delivered. + await bus.push(["policy_data"], {"x": 1}, notifier_id="other-server") + await _wait_for(lambda: notifier.notified) + assert notifier.notified[0][0] == ["policy_data"] + finally: + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + +@pytest.mark.asyncio +async def test_reader_gives_up_after_max_retries(): + bus = FakeBus(fail_connect=True) + broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=FakeNotifier(), + channel="test", + broadcast_type=bus.channel_factory, + reconnect_max_retries=3, + reconnect_backoff_min=0, + reconnect_backoff_max=0, + ) + task = await broadcaster.start_reader_task() + # Exhausting retries completes the task cleanly (no escaping exception), which lets + # the existing ignore_broadcaster_disconnected=False path restart the worker. + await asyncio.wait_for(task, timeout=2) + assert task.done() + assert task.exception() is None + assert bus.connects == 3 + + +@pytest.mark.asyncio +async def test_stock_broadcaster_reader_dies_on_drop(): + # Negative control: the unpatched EventBroadcaster reader completes on a single + # backbone drop and never reconnects — the bug this change fixes. + bus = FakeBus() + broadcaster = EventBroadcaster( + "memory://", + notifier=FakeNotifier(), + channel="test", + broadcast_type=bus.channel_factory, + ) + task = await broadcaster.start_reader_task() + await _wait_for(lambda: bus.subscribes >= 1) + + await bus.drop() + + await asyncio.wait_for(task, timeout=2) + assert task.done() + assert bus.connects == 1 # never reconnected + + +def test_backoff_is_bounded(): + broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=FakeNotifier(), + reconnect_backoff_min=0.5, + reconnect_backoff_max=10.0, + ) + delays = [broadcaster._backoff_seconds(attempt) for attempt in range(0, 25)] + assert all(0.0 <= delay <= 10.0 for delay in delays) + assert max(delays) > 0 diff --git a/packages/opal-server/opal_server/tests/safe_connection_manager_test.py b/packages/opal-server/opal_server/tests/safe_connection_manager_test.py new file mode 100644 index 000000000..a58bf0a36 --- /dev/null +++ b/packages/opal-server/opal_server/tests/safe_connection_manager_test.py @@ -0,0 +1,44 @@ +import pytest +from opal_server.pubsub_resilience import SafeConnectionManager + + +class _FakeWebSocket: + def __init__(self): + self.accepted = False + + async def accept(self): + self.accepted = True + + +def test_disconnect_unknown_socket_does_not_raise(): + manager = SafeConnectionManager() + # A socket that was never connected must not raise on disconnect. + manager.disconnect(object()) + + +def test_double_disconnect_is_idempotent(): + manager = SafeConnectionManager() + websocket = object() + manager.active_connections.append(websocket) + + manager.disconnect(websocket) + assert websocket not in manager.active_connections + + # The upstream ConnectionManager would raise ValueError('list.remove(x): x not in + # list') here; SafeConnectionManager must swallow it. + manager.disconnect(websocket) + assert websocket not in manager.active_connections + + +@pytest.mark.asyncio +async def test_connect_then_double_disconnect(): + manager = SafeConnectionManager() + websocket = _FakeWebSocket() + + await manager.connect(websocket) + assert websocket.accepted + assert websocket in manager.active_connections + + manager.disconnect(websocket) + manager.disconnect(websocket) + assert websocket not in manager.active_connections From dc7e11cab128a918f25ddc511159a32586288af4 Mon Sep 17 00:00:00 2001 From: Zeev Manilovich Date: Tue, 9 Jun 2026 16:24:22 +0300 Subject: [PATCH 2/8] feat(opal-server): replay + resync keep instances consistent across a broadcaster gap Builds on the reconnecting broadcaster so a transient backbone outage no longer silently desyncs OPAL server instances: - (B) bounded outbound replay buffer: broadcasts that fail while the backbone is down are queued and replayed on reconnect so peers that re-subscribe catch up. - (A) resync on recovery (the guarantee): after any gap each worker forces its own clients to reconnect and re-fetch full policy + data state, so the fleet converges (a worker may have missed incoming peer updates during its gap). Pins the broadcaster's listening context during a resync so recycling clients does not drop the listener count to 0 and cancel the reader; single-flight recovery plus a lock around the buffer/overflow flag; drops un-serializable buffered items so a poison payload cannot wedge the buffer; cancels pending recovery tasks on shutdown. Adds OPAL_BROADCAST_REPLAY_BUFFER_SIZE / _RESYNC_ON_RECONNECT / _RESYNC_SETTLE_SECONDS. Includes multi-instance convergence tests and extends the app-tests e2e with a publish-during-outage consistency scenario. Co-Authored-By: Claude Opus 4.8 (1M context) --- app-tests/run.sh | 41 ++- .../run-opal-server/broadcast-interface.mdx | 8 +- packages/opal-server/opal_server/config.py | 20 ++ packages/opal-server/opal_server/pubsub.py | 62 +++- .../opal_server/pubsub_resilience.py | 221 ++++++++++++- ...roadcaster_consistency_integration_test.py | 292 ++++++++++++++++++ .../tests/safe_connection_manager_test.py | 25 ++ 7 files changed, 643 insertions(+), 26 deletions(-) create mode 100644 packages/opal-server/opal_server/tests/broadcaster_consistency_integration_test.py diff --git a/app-tests/run.sh b/app-tests/run.sh index a58d9cb42..0a0a29550 100755 --- a/app-tests/run.sh +++ b/app-tests/run.sh @@ -304,11 +304,9 @@ function test_push_policy { check_clients_logged "PUT /v1/policies/$regofile -> 200" } -function test_data_publish { - echo "- Testing data publish for user $1" +function publish_data { + # POST a data update to a single OPAL server (no assertion). user=$1 - - # Use curl to publish data update via OPAL server API curl -s -X POST http://localhost:7002/data/config \ -H "Authorization: Bearer $OPAL_DATA_SOURCE_TOKEN" \ -H "Content-Type: application/json" \ @@ -321,9 +319,13 @@ function test_data_publish { "save_method": "PUT" }] }' +} +function test_data_publish { + echo "- Testing data publish for user $1" + publish_data "$1" sleep 5 - check_clients_logged "PUT /v1/data/users/$user/location -> 204" + check_clients_logged "PUT /v1/data/users/$1/location -> 204" } function test_statistics { @@ -392,10 +394,33 @@ function main { test_push_policy "best_one_yet" # Regression guards for the broadcaster-disconnect storm (see pubsub_resilience.py): - # the servers must have exercised the reconnect path, and must NOT have spewed the - # non-idempotent-disconnect ValueError that drove the fleet-wide drop storm. - check_servers_logged "backbone connection closed" + # the servers must have reconnected to the backbone (this line is logged on every + # (re)connect, so it fires on both the graceful-restart and ungraceful-kill paths), + # and must NOT have spewed the non-idempotent-disconnect ValueError that drove the + # fleet-wide drop storm. + check_servers_logged "Broadcaster listener connected to channel" check_servers_not_logged "list.remove(x): x not in list" + + # Cross-instance consistency: publish an update WHILE the backbone is down, then + # recover. The two clients connect to different server replicas via the service VIP, + # so for BOTH to end up with the value the missed cross-server update must converge + # after recovery (via the replay buffer and/or the resync-on-reconnect path). + echo "- Testing cross-instance consistency across a backbone outage" + compose kill broadcast_channel + sleep 3 + publish_data "consistency_user" + sleep 2 + compose up -d broadcast_channel + wait_for_broadcaster + # allow buffered replay + (if needed) client resync + full refetch to settle + sleep 15 + # The server that received the publish while the backbone was down must have + # buffered it and replayed it on recovery (proves the replay path actually ran, + # not just a client refetch). + check_servers_logged "buffered for replay" + check_servers_logged "Replaying" + # BOTH clients (on different replicas via the VIP) must end up with the value. + check_clients_logged "PUT /v1/data/users/consistency_user/location -> 204" # TODO: Test statistics feature again after broadcaster restart (should first fix statistics bug) } diff --git a/documentation/docs/getting-started/running-opal/run-opal-server/broadcast-interface.mdx b/documentation/docs/getting-started/running-opal/run-opal-server/broadcast-interface.mdx index 7c7dbeda6..3aa679278 100644 --- a/documentation/docs/getting-started/running-opal/run-opal-server/broadcast-interface.mdx +++ b/documentation/docs/getting-started/running-opal/run-opal-server/broadcast-interface.mdx @@ -70,5 +70,11 @@ If the broadcast backbone (Postgres/Redis/Kafka) briefly drops — for example d | OPAL_BROADCAST_RECONNECT_MAX_RETRIES | `0` | Maximum consecutive reconnect attempts before giving up and letting the worker restart. `0` means retry forever. | | OPAL_BROADCAST_RECONNECT_BACKOFF_MIN_SECONDS | `0.5` | Minimum backoff (seconds) between reconnect attempts. | | OPAL_BROADCAST_RECONNECT_BACKOFF_MAX_SECONDS | `30` | Maximum backoff (seconds) between reconnect attempts. | +| OPAL_BROADCAST_REPLAY_BUFFER_SIZE | `10000` | Max number of outbound broadcasts buffered while the backbone is down and replayed on reconnect (`0` disables buffering). On overflow the oldest are dropped. | +| OPAL_BROADCAST_RESYNC_ON_RECONNECT | `true` | After a backbone gap, force this worker's clients to reconnect so they re-fetch full policy + data state. Set to `false` to rely only on best-effort replay. | +| OPAL_BROADCAST_RESYNC_SETTLE_SECONDS | `2` | Grace period after a reconnect before replaying buffered broadcasts and resyncing clients, to let peer servers re-subscribe. | -While the backbone is unreachable, cross-server fan-out is paused but client websocket connections are kept alive; updates published during the outage are delivered once the backbone reconnects (OPAL keeps no replay buffer, so a client that itself reconnects re-fetches the full policy/data state). +Consistency across the outage is handled in two layers. While the backbone is unreachable, client websocket connections are kept alive but cross-server fan-out is paused. On reconnect: + +- **Replay buffer** — broadcasts that failed to reach the backbone during the outage are replayed, so peer servers that have re-subscribed catch up without a client refetch. This is best-effort: the backbone keeps no replay of its own, so a peer that is slow to re-subscribe may miss a replayed message. +- **Resync** (the guarantee) — each server forces its own clients to reconnect and re-fetch the full policy/data state. Because every server experienced the same gap, every server reconciles its own clients and the fleet converges to current truth. Updates missed during the gap are therefore reconciled even if the replay did not reach a peer in time. diff --git a/packages/opal-server/opal_server/config.py b/packages/opal-server/opal_server/config.py index c0f80d1ef..9dfc18beb 100644 --- a/packages/opal-server/opal_server/config.py +++ b/packages/opal-server/opal_server/config.py @@ -80,6 +80,26 @@ class OpalServerConfig(Confi): 30.0, description="Maximum backoff in seconds between broadcaster reconnect attempts.", ) + BROADCAST_REPLAY_BUFFER_SIZE = confi.int( + "BROADCAST_REPLAY_BUFFER_SIZE", + 10000, + description="Max number of outbound broadcasts buffered while the backbone is " + "down and replayed on reconnect (0 disables buffering). On overflow the oldest " + "are dropped and clients are resynced instead.", + ) + BROADCAST_RESYNC_ON_RECONNECT = confi.bool( + "BROADCAST_RESYNC_ON_RECONNECT", + True, + description="After a backbone gap that may have lost updates, force this " + "worker's connected clients to reconnect so they re-fetch full policy + data " + "state (guarantees cross-instance consistency).", + ) + BROADCAST_RESYNC_SETTLE_SECONDS = confi.float( + "BROADCAST_RESYNC_SETTLE_SECONDS", + 2.0, + description="Grace period after a broadcaster reconnect before replaying " + "buffered broadcasts and resyncing clients, to let peer servers re-subscribe.", + ) # server security AUTH_PRIVATE_KEY_FORMAT = confi.enum( diff --git a/packages/opal-server/opal_server/pubsub.py b/packages/opal-server/opal_server/pubsub.py index 807315320..6cc6742e8 100644 --- a/packages/opal-server/opal_server/pubsub.py +++ b/packages/opal-server/opal_server/pubsub.py @@ -1,3 +1,5 @@ +import asyncio +import random import time from contextlib import contextmanager from contextvars import ContextVar @@ -153,6 +155,8 @@ def __init__(self, signer: JWTSigner, broadcaster_uri: str = None): reconnect_max_retries=opal_server_config.BROADCAST_RECONNECT_MAX_RETRIES, reconnect_backoff_min=opal_server_config.BROADCAST_RECONNECT_BACKOFF_MIN_SECONDS, reconnect_backoff_max=opal_server_config.BROADCAST_RECONNECT_BACKOFF_MAX_SECONDS, + replay_buffer_size=opal_server_config.BROADCAST_REPLAY_BUFFER_SIZE, + resync_settle_seconds=opal_server_config.BROADCAST_RESYNC_SETTLE_SECONDS, ) else: logger.info( @@ -179,11 +183,23 @@ def __init__(self, signer: JWTSigner, broadcaster_uri: str = None): # endpoint can call it twice for one socket (handle_disconnect plus the outer # except in WebsocketRPCEndpoint.main_loop), raising # ValueError('list.remove(x): x not in list'). Swap in an idempotent manager - # before any connection is served. - assert hasattr(self.endpoint, "endpoint") and hasattr( - self.endpoint.endpoint, "manager" - ), "Unexpected fastapi_websocket_pubsub internals: cannot install SafeConnectionManager" + # before any connection is served. Reaching into the wrapped endpoint is the + # only injection point the library offers (PubSubEndpoint takes no manager), + # so fail loudly if its internals ever move (a bare assert would be stripped + # under python -O, silently restoring the storm). + if not ( + hasattr(self.endpoint, "endpoint") + and hasattr(self.endpoint.endpoint, "manager") + ): + raise RuntimeError( + "Unexpected fastapi_websocket_pubsub internals: cannot install " + "SafeConnectionManager (endpoint.endpoint.manager not found)" + ) self.endpoint.endpoint.manager = SafeConnectionManager() + + if isinstance(self.broadcaster, ReconnectingBroadcaster): + self._wire_broadcaster_resync() + authenticator = WebsocketJWTAuthenticator(signer) @self.api_router.get( @@ -227,6 +243,44 @@ async def websocket_rpc_endpoint( finally: await websocket.close() + def _wire_broadcaster_resync(self): + """Register the post-gap resync. + + After any backbone gap, force this worker's clients to reconnect + so they re-run their full (scope-aware) policy + data + reconciliation. Every worker hit the same gap, so each + reconciles its own clients and the fleet converges — this is the + consistency guarantee; the broadcaster's replay buffer only + narrows the staleness window. + """ + manager = self.endpoint.endpoint.manager + broadcaster = self.broadcaster + resync_enabled = opal_server_config.BROADCAST_RESYNC_ON_RECONNECT + settle = opal_server_config.BROADCAST_RESYNC_SETTLE_SECONDS + + async def _on_broadcaster_reconnect(): + if not resync_enabled: + logger.info("Broadcaster recovered after a gap; client resync disabled") + return + # Every worker hit the same gap; add a per-worker random delay so the + # fleet does not recycle its clients in lockstep. + if settle > 0: + await asyncio.sleep(random.uniform(0, settle)) + logger.warning( + "Broadcaster recovered after a gap; resyncing this worker's clients " + "so they re-fetch current policy + data state" + ) + # Closing every client would drive the broadcaster's listener count to 0 + # and cancel the reconnecting reader task. Pin a listening context so the + # reader survives the recycle, and hold it briefly so reconnecting clients + # re-establish the count before we release. + async with broadcaster.get_listening_context(): + await manager.close_all_staggered() + if settle > 0: + await asyncio.sleep(settle) + + broadcaster.set_reconnect_callback(_on_broadcaster_reconnect) + @staticmethod async def _verify_permitted_topics( topics: Union[TopicList, ALL_TOPICS], channel: RpcChannel diff --git a/packages/opal-server/opal_server/pubsub_resilience.py b/packages/opal-server/opal_server/pubsub_resilience.py index 85b99aef4..df8b22ff1 100644 --- a/packages/opal-server/opal_server/pubsub_resilience.py +++ b/packages/opal-server/opal_server/pubsub_resilience.py @@ -16,29 +16,54 @@ ``ValueError('list.remove(x): x not in list')``. ``ReconnectingBroadcaster`` keeps the reader task *pending* across transient -backbone losses by reconnecting with bounded exponential backoff. -``SafeConnectionManager`` makes ``disconnect`` idempotent. Both are stop-gaps -until the fixes land in the upstream libraries (see Phase 2 of the plan). +backbone losses by reconnecting with bounded exponential backoff, and adds two +consistency mechanisms so a backbone gap does not silently desync instances: + +* **(B) outbound replay buffer** — broadcasts that fail to reach the backbone + while it is down are kept in a bounded FIFO and replayed once it reconnects, + so peers that re-subscribe in time catch up without a refetch. This narrows + the staleness window; it is *not* a delivery guarantee (the backbone keeps no + replay of its own and a slow peer may not have re-subscribed at flush time). +* **(A) resync on recovery** — the *guarantee*. After **any** gap the broadcaster + fires the registered ``on_reconnect`` callback. OPAL uses it to make this + worker's own clients re-run their full (scope-aware) policy + data + reconciliation. Every worker experiences the same gap, so every worker + reconciles its own clients (a worker's clients may have missed *incoming* peer + updates during the gap — independent of what this worker published), and the + fleet converges to current truth. + +``SafeConnectionManager`` makes ``disconnect`` idempotent and can close a +worker's client connections (staggered) to drive the resync. All of this is a +stop-gap until the fixes land in the upstream libraries (see Phase 2 of the plan). """ import asyncio import random +from collections import deque +from typing import Awaitable, Callable, Optional from fastapi import WebSocket from fastapi_websocket_pubsub import EventBroadcaster from fastapi_websocket_pubsub.event_broadcaster import BroadcastNotification +from fastapi_websocket_pubsub.event_notifier import Subscription +from fastapi_websocket_pubsub.util import pydantic_serialize from fastapi_websocket_rpc.connection_manager import ConnectionManager from opal_common.logger import logger +ReconnectCallback = Callable[[], Awaitable[None]] + class SafeConnectionManager(ConnectionManager): - """A ``ConnectionManager`` whose ``disconnect`` is idempotent. + """A ``ConnectionManager`` whose ``disconnect`` is idempotent, able to drop + its connections on demand. - The upstream implementation calls ``self.active_connections.remove(websocket)`` + The upstream ``disconnect`` calls ``self.active_connections.remove(websocket)`` unconditionally, so a second disconnect for the same socket raises - ``ValueError('list.remove(x): x not in list')``. That error escapes + ``ValueError('list.remove(x): x not in list')`` — which escapes ``WebsocketRPCEndpoint.main_loop`` as an unretrieved task exception and, under a reconnect storm, is logged thousands of times. Guarding the removal turns a - double disconnect into a no-op. + double disconnect into a no-op. ``close_all_staggered`` additionally lets the + server intentionally recycle its client connections (e.g. to force a post-outage + resync) without a thundering herd. """ def disconnect(self, websocket: WebSocket): @@ -47,9 +72,44 @@ def disconnect(self, websocket: WebSocket): except ValueError: logger.debug("Ignoring duplicate websocket disconnect") + async def close_all_staggered( + self, min_interval: float = 0.0, max_interval: float = 0.2 + ) -> int: + """Close every currently-tracked client websocket, spaced out with + jitter. + + Clients reconnect on their own and re-run their on-connect reconciliation, + so this is how a worker forces its clients back to a consistent state after + a broadcaster gap. Close code 1012 ("Service Restart") signals a reconnect. + + Note: the broadcaster's reader task is tied to the per-client listening + context, so the caller MUST pin a listening context around this call (see + ``opal_server.pubsub``) — otherwise closing the last client cancels the + reader we just worked to keep alive. + """ + connections = list(self.active_connections) + if not connections: + return 0 + logger.info( + f"Resync: closing {len(connections)} client connection(s) to trigger " + "client-side reconciliation" + ) + closed = 0 + for index, websocket in enumerate(connections): + try: + await websocket.close(code=1012) + closed += 1 + except Exception as e: + logger.warning(f"Error closing a client websocket during resync: {e!r}") + # Jitter between closes (but not after the last one). + if max_interval > 0 and index < len(connections) - 1: + await asyncio.sleep(random.uniform(min_interval, max_interval)) + return closed + class ReconnectingBroadcaster(EventBroadcaster): - """An ``EventBroadcaster`` whose listener reconnects instead of dying. + """An ``EventBroadcaster`` whose listener reconnects instead of dying, + buffers failed outbound broadcasts, and fires a resync hook after a gap. The base reader coroutine (``__read_notifications__``) returns when the backbone connection closes and is not restarted while clients stay connected, leaving @@ -67,12 +127,33 @@ def __init__( reconnect_max_retries: int = 0, reconnect_backoff_min: float = 0.5, reconnect_backoff_max: float = 30.0, + replay_buffer_size: int = 10000, + resync_settle_seconds: float = 2.0, **kwargs, ): super().__init__(*args, **kwargs) self._reconnect_max_retries = reconnect_max_retries self._reconnect_backoff_min = reconnect_backoff_min self._reconnect_backoff_max = reconnect_backoff_max + self._replay_buffer_size = replay_buffer_size + self._resync_settle_seconds = resync_settle_seconds + # (B) bounded outbound replay buffer; deque(maxlen) drops the oldest on overflow. + self._outbound_buffer: deque = deque( + maxlen=replay_buffer_size if replay_buffer_size > 0 else None + ) + # Single lock serialises buffer mutation, the overflow flag, and the flush, + # so a concurrent broadcast cannot race the flush's drain/flag-reset. + self._buffer_lock = asyncio.Lock() + self._buffer_overflowed = False + # (A) gap detection + single-flight resync hook. + self._had_prior_connection = False + self._recovery_task: Optional[asyncio.Task] = None + self._on_reconnect: Optional[ReconnectCallback] = None + + def set_reconnect_callback(self, callback: Optional[ReconnectCallback]): + """Register an ``async () -> None`` callback fired once after each gap + (a reconnect that follows a previously-established connection).""" + self._on_reconnect = callback async def start_reader_task(self): """Spawn the reconnecting reader task once. @@ -89,13 +170,43 @@ async def start_reader_task(self): self._subscription_task = asyncio.create_task(self.__read_notifications__()) return self._subscription_task - async def __read_notifications__(self): - """Read incoming broadcasts, reconnecting on backbone disconnect. + async def __broadcast_notifications__(self, subscription: Subscription, data): + """Share a local notification with the backbone; buffer it on failure. - ``__read_notifications__`` ends in a double underscore, so it is not a - name-mangled private name and this override is what the inherited - ``start_reader_task`` (and ours) dispatches to. + ``__broadcast_notifications__`` ends in a double underscore (not name-mangled), + so this override is what ``_subscribe_to_all_topics`` dispatches to. The local + delivery to this worker's own clients is independent of this publish (it runs + as a sibling notifier callback), so a failure here means only "peers may miss + this" — we keep it for replay rather than dropping it. """ + try: + await super().__broadcast_notifications__(subscription, data) + except asyncio.CancelledError: + raise + except Exception as e: + await self._buffer_outbound(subscription.topic, data, e) + + async def _buffer_outbound(self, topic, data, error: Exception): + async with self._buffer_lock: + if self._replay_buffer_size <= 0: + # buffering disabled — nothing to replay; the gap resync is the only + # recovery path. Nothing to record here. + logger.warning( + f"Broadcast to backbone failed ({error!r}); replay buffer disabled" + ) + return + if len(self._outbound_buffer) >= self._replay_buffer_size: + # at capacity: this append drops the oldest entry, unrecoverable. + self._buffer_overflowed = True + self._outbound_buffer.append((topic, data)) + logger.warning( + f"Broadcast to backbone failed ({error!r}); buffered for replay " + f"({len(self._outbound_buffer)}/{self._replay_buffer_size}" + f"{', OVERFLOW' if self._buffer_overflowed else ''})" + ) + + async def __read_notifications__(self): + """Read incoming broadcasts, reconnecting on backbone disconnect.""" attempt = 0 while True: try: @@ -105,6 +216,11 @@ async def __read_notifications__(self): f"Broadcaster listener connected to channel '{self._channel}'" ) async with channel.subscribe(channel=self._channel) as subscriber: + # We are subscribed again; recover concurrently so we keep reading + # (and can receive peers' replays) during the settle window. + if self._had_prior_connection: + self._schedule_gap_recovery() + self._had_prior_connection = True async for event in subscriber: await self._handle_broadcast_event(event) logger.warning( @@ -112,6 +228,7 @@ async def __read_notifications__(self): ) except asyncio.CancelledError: logger.info("Broadcaster listener cancelled; stopping") + self._cancel_pending_tasks() raise except Exception as e: attempt += 1 @@ -129,6 +246,84 @@ async def __read_notifications__(self): await self._safe_disconnect_channel() await asyncio.sleep(self._backoff_seconds(attempt)) + def _cancel_pending_tasks(self): + for task in list(self._tasks): + task.cancel() + + def _schedule_gap_recovery(self): + # Single-flight: a flap during the settle window must not spawn a second + # recovery (concurrent flushes corrupt the buffer; double resync re-storms). + if self._recovery_task is not None and not self._recovery_task.done(): + logger.debug("Gap recovery already in progress; not scheduling another") + return + self._recovery_task = asyncio.create_task(self._recover_after_gap()) + self._tasks.add(self._recovery_task) + self._recovery_task.add_done_callback(self._tasks.discard) + + async def _recover_after_gap(self): + """After reconnecting following a gap: let peers re-subscribe, replay the + buffer (B), then fire the resync hook (A).""" + try: + if self._resync_settle_seconds > 0: + await asyncio.sleep(self._resync_settle_seconds) + await self._flush_outbound_buffer() + await self._fire_reconnect() + except asyncio.CancelledError: + raise + except Exception: + logger.exception("Error during post-reconnect broadcast recovery") + + async def _flush_outbound_buffer(self): + """Replay buffered broadcasts to the backbone (best-effort). + + Held under ``_buffer_lock`` so a concurrent failed broadcast cannot mutate + the buffer mid-drain. Items that can no longer be serialized are dropped (so + one poison payload can't wedge the buffer); a transport failure stops the + drain and leaves the rest for the next recovery. + """ + async with self._buffer_lock: + self._buffer_overflowed = False + if not self._outbound_buffer: + return + count = len(self._outbound_buffer) + logger.info(f"Replaying {count} buffered broadcast(s) after recovery") + try: + async with self._broadcast_type(self._broadcast_url) as channel: + while self._outbound_buffer: + topic, data = self._outbound_buffer[0] + try: + payload = pydantic_serialize( + BroadcastNotification( + notifier_id=self._id, topics=[topic], data=data + ) + ) + except Exception as e: + logger.error( + f"Dropping un-serializable buffered broadcast on " + f"topic '{topic}': {e!r}" + ) + self._outbound_buffer.popleft() + continue + await channel.publish(self._channel, payload) + self._outbound_buffer.popleft() + except asyncio.CancelledError: + raise + except Exception as e: + logger.error( + f"Failed to replay buffered broadcasts ({len(self._outbound_buffer)} " + f"left, will retry on next recovery): {e!r}" + ) + + async def _fire_reconnect(self): + if self._on_reconnect is None: + return + try: + await self._on_reconnect() + except asyncio.CancelledError: + raise + except Exception: + logger.exception("Broadcaster on_reconnect callback failed") + async def _ensure_connected(self): if self.listening_broadcast_channel is None: self.listening_broadcast_channel = self._broadcast_type(self._broadcast_url) diff --git a/packages/opal-server/opal_server/tests/broadcaster_consistency_integration_test.py b/packages/opal-server/opal_server/tests/broadcaster_consistency_integration_test.py new file mode 100644 index 000000000..670800910 --- /dev/null +++ b/packages/opal-server/opal_server/tests/broadcaster_consistency_integration_test.py @@ -0,0 +1,292 @@ +"""Multi-instance consistency tests for the resilient broadcaster. + +These wire two independent ``ReconnectingBroadcaster`` instances (modelling two +OPAL server workers/pods) to a single shared in-memory backbone with NOTIFY-like +semantics (only currently-subscribed peers receive a publish) and fault injection. +They assert the property that matters operationally: after a backbone outage during +which an update is published, **all instances converge to the same state** — via the +replay buffer (B) when possible, and via the resync hook (A) when delivery could not +be guaranteed. +""" +import asyncio + +import pytest +from fastapi_websocket_pubsub.event_notifier import ALL_TOPICS +from fastapi_websocket_pubsub.websocket_rpc_event_notifier import ( + WebSocketRpcEventNotifier, +) +from opal_server.pubsub_resilience import ReconnectingBroadcaster + +CHANNEL = "EventNotifier" +_CLOSED = object() + + +class _Event: + def __init__(self, message): + self.message = message + + +class InMemoryBackbone: + """A tiny stand-in for the broadcaster.Broadcast backbone, shared by all + channel instances. + + Like Postgres LISTEN/NOTIFY: a publish only reaches peers that are + subscribed at that moment; nothing is queued for absent peers. + """ + + def __init__(self): + self.faulted = False + self._subscribers = {} # channel -> set[asyncio.Queue] + + def factory(self, _url): + return _Channel(self) + + def subscriber_count(self, channel=CHANNEL): + return len(self._subscribers.get(channel, ())) + + def fault(self): + self.faulted = True + for queues in self._subscribers.values(): + for queue in list(queues): + queue.put_nowait(_CLOSED) + + def recover(self): + self.faulted = False + + def _subscribe(self, channel, queue): + self._subscribers.setdefault(channel, set()).add(queue) + + def _unsubscribe(self, channel, queue): + self._subscribers.get(channel, set()).discard(queue) + + def _deliver(self, channel, message): + for queue in list(self._subscribers.get(channel, ())): + queue.put_nowait(_Event(message)) + + +class _Channel: + def __init__(self, bus): + self._bus = bus + + async def connect(self): + if self._bus.faulted: + raise ConnectionError("backbone down") + + async def disconnect(self): + pass + + async def __aenter__(self): + if self._bus.faulted: + raise ConnectionError("backbone down") + return self + + async def __aexit__(self, *exc): + return False + + async def publish(self, channel, message): + if self._bus.faulted: + raise ConnectionError("backbone down") + self._bus._deliver(channel, message) + + def subscribe(self, channel): + return _Subscription(self._bus, channel) + + +class _Subscription: + def __init__(self, bus, channel): + self._bus = bus + self._channel = channel + self._queue = asyncio.Queue() + + async def __aenter__(self): + self._bus._subscribe(self._channel, self._queue) + return _Subscriber(self._queue) + + async def __aexit__(self, *exc): + self._bus._unsubscribe(self._channel, self._queue) + return False + + +class _Subscriber: + def __init__(self, queue): + self._queue = queue + + def __aiter__(self): + return self + + async def __anext__(self): + item = await self._queue.get() + if item is _CLOSED: + raise StopAsyncIteration + return item + + +class Instance: + """One server worker: its own notifier + reconnecting broadcaster + a recorder + standing in for 'the state this worker's clients would have received'.""" + + def __init__(self, bus, name, **broadcaster_kwargs): + self.name = name + self.notifier = WebSocketRpcEventNotifier() + self.broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=self.notifier, + channel=CHANNEL, + broadcast_type=bus.factory, + reconnect_backoff_min=0.01, + reconnect_backoff_max=0.02, + **broadcaster_kwargs, + ) + self.store = [] + self._reader = None + + async def start(self): + async def record(subscription, data): + self.store.append((subscription.topic, data)) + + await self.notifier.subscribe(f"{self.name}-store", ALL_TOPICS, record) + # Share this worker's local notifications with the backbone, and listen. + await self.broadcaster._subscribe_to_all_topics() + self._reader = await self.broadcaster.start_reader_task() + + async def publish(self, topic, data): + await self.notifier.notify([topic], data, notifier_id=f"{self.name}-publisher") + + async def stop(self): + if self._reader is not None: + self._reader.cancel() + try: + await self._reader + except (asyncio.CancelledError, Exception): + pass + + +async def _wait_for(predicate, timeout=5.0): + loop = asyncio.get_event_loop() + deadline = loop.time() + timeout + while loop.time() < deadline: + if predicate(): + return + await asyncio.sleep(0.01) + raise AssertionError("condition not met within timeout") + + +@pytest.mark.asyncio +async def test_instances_converge_after_backbone_outage(): + """An update published while the backbone is DOWN must still reach the + other instance once the backbone recovers (replay buffer path).""" + bus = InMemoryBackbone() + a = Instance(bus, "A", resync_settle_seconds=0.05, replay_buffer_size=100) + b = Instance(bus, "B", resync_settle_seconds=0.05, replay_buffer_size=100) + await a.start() + await b.start() + try: + await _wait_for(lambda: bus.subscriber_count() == 2) + + # Baseline: while connected, an update on A reaches B. + await a.publish("policy_data", {"u": 1}) + await _wait_for(lambda: ("policy_data", {"u": 1}) in b.store) + + # Backbone drops; readers lose their subscriptions. + bus.fault() + await _wait_for(lambda: bus.subscriber_count() == 0) + + # Update during the outage: A's local store sees it, B does not (yet). + await a.publish("policy_data", {"u": 2}) + await _wait_for(lambda: ("policy_data", {"u": 2}) in a.store) + await asyncio.sleep(0.1) + assert ("policy_data", {"u": 2}) not in b.store # inconsistent *during* outage + + # Backbone recovers: A replays the buffered update; B converges. + bus.recover() + await _wait_for(lambda: ("policy_data", {"u": 2}) in b.store) + + # CONSISTENCY: both instances now hold the same updates. + for store in (a.store, b.store): + assert ("policy_data", {"u": 1}) in store + assert ("policy_data", {"u": 2}) in store + finally: + await a.stop() + await b.stop() + + +@pytest.mark.asyncio +async def test_resync_callback_fires_after_a_gap(): + """The resync hook fires once after a backbone gap so the worker can force + its own clients to reconcile (a worker may have missed incoming peer + updates during the gap, regardless of what it published).""" + bus = InMemoryBackbone() + a = Instance(bus, "A", resync_settle_seconds=0, replay_buffer_size=100) + calls = [] + + async def on_reconnect(): + calls.append(1) + + a.broadcaster.set_reconnect_callback(on_reconnect) + await a.start() + try: + await _wait_for(lambda: bus.subscriber_count() == 1) + bus.fault() + await _wait_for(lambda: bus.subscriber_count() == 0) + bus.recover() + await _wait_for(lambda: calls) + assert len(calls) == 1 + finally: + await a.stop() + + +@pytest.mark.asyncio +async def test_resync_is_single_flight_across_a_flap_during_settle(): + """A second disconnect during the settle window must not spawn a second + concurrent recovery (which would corrupt the buffer / double-resync).""" + bus = InMemoryBackbone() + # Long settle so a second gap lands while the first recovery is still pending. + a = Instance(bus, "A", resync_settle_seconds=0.3, replay_buffer_size=100) + calls = [] + + async def on_reconnect(): + calls.append(1) + + a.broadcaster.set_reconnect_callback(on_reconnect) + await a.start() + try: + await _wait_for(lambda: bus.subscriber_count() == 1) + # First gap. + bus.fault() + await _wait_for(lambda: bus.subscriber_count() == 0) + bus.recover() + await _wait_for(lambda: bus.subscriber_count() == 1) + # Second gap while the first recovery is still inside its settle sleep. + bus.fault() + await _wait_for(lambda: bus.subscriber_count() == 0) + bus.recover() + await _wait_for(lambda: bus.subscriber_count() == 1) + # Let recoveries settle. + await asyncio.sleep(1.0) + # Single-flight: the overlapping flap collapses into one recovery, not three. + assert len(calls) <= 2 + assert a.broadcaster._recovery_task is not None + assert a.broadcaster._recovery_task.done() + finally: + await a.stop() + + +@pytest.mark.asyncio +async def test_resync_callback_does_not_fire_on_first_connect(): + """The resync hook is for *recovery*, not boot — it must not fire on the + initial connection.""" + bus = InMemoryBackbone() + a = Instance(bus, "A", resync_settle_seconds=0, replay_buffer_size=100) + calls = [] + + async def on_reconnect(): + calls.append(1) + + a.broadcaster.set_reconnect_callback(on_reconnect) + await a.start() + try: + await _wait_for(lambda: bus.subscriber_count() == 1) + await asyncio.sleep(0.2) + assert calls == [] + finally: + await a.stop() diff --git a/packages/opal-server/opal_server/tests/safe_connection_manager_test.py b/packages/opal-server/opal_server/tests/safe_connection_manager_test.py index a58bf0a36..4db2335c0 100644 --- a/packages/opal-server/opal_server/tests/safe_connection_manager_test.py +++ b/packages/opal-server/opal_server/tests/safe_connection_manager_test.py @@ -5,10 +5,14 @@ class _FakeWebSocket: def __init__(self): self.accepted = False + self.closed_code = None async def accept(self): self.accepted = True + async def close(self, code: int = 1000): + self.closed_code = code + def test_disconnect_unknown_socket_does_not_raise(): manager = SafeConnectionManager() @@ -42,3 +46,24 @@ async def test_connect_then_double_disconnect(): manager.disconnect(websocket) manager.disconnect(websocket) assert websocket not in manager.active_connections + + +@pytest.mark.asyncio +async def test_close_all_staggered_closes_every_connection(): + manager = SafeConnectionManager() + sockets = [_FakeWebSocket() for _ in range(4)] + for websocket in sockets: + await manager.connect(websocket) + + # No jitter sleeps in the test. + closed = await manager.close_all_staggered(min_interval=0, max_interval=0) + + assert closed == 4 + # 1012 = "Service Restart" — tells the client to reconnect (and re-reconcile). + assert all(websocket.closed_code == 1012 for websocket in sockets) + + +@pytest.mark.asyncio +async def test_close_all_staggered_on_empty_is_noop(): + manager = SafeConnectionManager() + assert await manager.close_all_staggered(min_interval=0, max_interval=0) == 0 From 39996e5f8075231eee06622760c42b954e9e1edb Mon Sep 17 00:00:00 2001 From: Zeev Manilovich Date: Wed, 10 Jun 2026 15:45:50 +0300 Subject: [PATCH 3/8] feat(opal-server): make /healthcheck reflect broadcaster-reader health MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A wedged broadcaster reader (reader task absent/dead while clients depend on it) previously left /healthcheck returning 200, so k8s never routed away from or restarted the bad worker — a transient backbone blip could wedge a pod for hours. Add ReconnectingBroadcaster.is_reader_healthy(): unhealthy only when listeners are present and the reader task is missing or done (crashed / gave up) — it stays healthy through a normal transient reconnect, so the probe does not flap during a backbone blip. /healthcheck returns 503 in that wedged state (/ stays a trivial liveness 200), gated by OPAL_BROADCAST_HEALTHCHECK_ENABLED (default true). Unit tests for the health contract plus an end-to-end route test (200 / 503 / kill-switch). Co-Authored-By: Claude Opus 4.8 (1M context) --- packages/opal-server/opal_server/config.py | 8 +++ .../opal_server/pubsub_resilience.py | 35 +++++++++ packages/opal-server/opal_server/server.py | 23 +++++- .../tests/healthcheck_endpoint_test.py | 71 +++++++++++++++++++ .../tests/reconnecting_broadcaster_test.py | 71 +++++++++++++++++++ 5 files changed, 206 insertions(+), 2 deletions(-) create mode 100644 packages/opal-server/opal_server/tests/healthcheck_endpoint_test.py diff --git a/packages/opal-server/opal_server/config.py b/packages/opal-server/opal_server/config.py index 9dfc18beb..069cca0bf 100644 --- a/packages/opal-server/opal_server/config.py +++ b/packages/opal-server/opal_server/config.py @@ -100,6 +100,14 @@ class OpalServerConfig(Confi): description="Grace period after a broadcaster reconnect before replaying " "buffered broadcasts and resyncing clients, to let peer servers re-subscribe.", ) + BROADCAST_HEALTHCHECK_ENABLED = confi.bool( + "BROADCAST_HEALTHCHECK_ENABLED", + True, + description="Make /healthcheck reflect the broadcaster reader's health so a " + "k8s readiness/liveness probe can route away from or restart a worker whose " + "reader is wedged while clients depend on it. Set to False to revert " + "/healthcheck to always returning ok.", + ) # server security AUTH_PRIVATE_KEY_FORMAT = confi.enum( diff --git a/packages/opal-server/opal_server/pubsub_resilience.py b/packages/opal-server/opal_server/pubsub_resilience.py index df8b22ff1..adb6a2860 100644 --- a/packages/opal-server/opal_server/pubsub_resilience.py +++ b/packages/opal-server/opal_server/pubsub_resilience.py @@ -170,6 +170,41 @@ async def start_reader_task(self): self._subscription_task = asyncio.create_task(self.__read_notifications__()) return self._subscription_task + def is_reader_healthy(self) -> bool: + """Report whether the reconnecting reader can still serve connected + clients. + + Used by the server ``/healthcheck`` so a k8s readiness/liveness probe can + route away from (or restart) a worker whose reader is wedged while clients + depend on it — defense in depth on top of the reconnect loop itself. + + Health is judged against the listener count, not the backbone connection: + + * No listeners (``_listen_count <= 0``): healthy. The reader is started lazily + when the count goes 0->1 and reset to ``None`` when it returns to 0, so its + absence here is expected idleness, not a fault — nothing depends on it. + * Listeners present: healthy only while the reader task is a live, *pending* + task. A pending task INCLUDES the case where it is mid-reconnect through a + backbone outage: ``ReconnectingBroadcaster`` deliberately keeps the reader + pending across a drop, so a transient reconnect must NOT read as unhealthy + (otherwise the probe would flap the pod during every normal backbone blip). + It reads unhealthy only in the two wedged states — the task is ``None`` + (never started / leaked listen-count) or ``done`` (crashed, or gave up after + ``reconnect_max_retries``) — which is exactly when clients are stuck. + + This is a cheap, non-blocking attribute read (no await, no lock); any rare race + against a reconnect is absorbed by the probe's ``failureThreshold``. + + Returns: + bool: True if idle or the reader is live and pending; False if listeners + depend on a missing or completed reader task. + """ + if self._listen_count <= 0: + return True + return ( + self._subscription_task is not None and not self._subscription_task.done() + ) + async def __broadcast_notifications__(self, subscription: Subscription, data): """Share a local notification with the backbone; buffer it on failure. diff --git a/packages/opal-server/opal_server/server.py b/packages/opal-server/opal_server/server.py index a846e5a80..2fbbeeb8c 100644 --- a/packages/opal-server/opal_server/server.py +++ b/packages/opal-server/opal_server/server.py @@ -8,7 +8,7 @@ from fastapi import Depends, FastAPI, Request from fastapi.openapi.docs import get_redoc_html -from fastapi.responses import HTMLResponse +from fastapi.responses import HTMLResponse, JSONResponse from fastapi_websocket_pubsub.event_broadcaster import EventBroadcasterContextManager from opal_common.authentication.deps import JWTAuthenticator, StaticBearerAuthenticator from opal_common.authentication.signer import JWTSigner @@ -34,6 +34,7 @@ from opal_server.policy.webhook.api import init_git_webhook_router from opal_server.publisher import setup_broadcaster_keepalive_task from opal_server.pubsub import PubSub +from opal_server.pubsub_resilience import ReconnectingBroadcaster from opal_server.redis_utils import RedisDB from opal_server.scopes.api import init_scope_router from opal_server.scopes.loader import load_scopes @@ -288,9 +289,27 @@ async def redoc_html(req: Request) -> HTMLResponse: ) # top level routes (i.e: healthchecks) - @app.get("/healthcheck", include_in_schema=False) @app.get("/", include_in_schema=False) + def root(): + # Liveness / process-up: trivially ok as long as the process serves. + return {"status": "ok"} + + @app.get("/healthcheck", include_in_schema=False) def healthcheck(): + # Readiness: also reflect broadcaster-reader health so a wedged reader + # (reader task absent/dead while clients depend on it) fails the probe + # and k8s can route away from / restart this worker. Stays ok through a + # normal transient reconnect (see ReconnectingBroadcaster.is_reader_healthy). + broadcaster = self.pubsub.broadcaster + if ( + opal_server_config.BROADCAST_HEALTHCHECK_ENABLED + and isinstance(broadcaster, ReconnectingBroadcaster) + and not broadcaster.is_reader_healthy() + ): + return JSONResponse( + status_code=503, + content={"status": "error", "broadcaster": "unhealthy"}, + ) return {"status": "ok"} return app diff --git a/packages/opal-server/opal_server/tests/healthcheck_endpoint_test.py b/packages/opal-server/opal_server/tests/healthcheck_endpoint_test.py new file mode 100644 index 000000000..937957b0d --- /dev/null +++ b/packages/opal-server/opal_server/tests/healthcheck_endpoint_test.py @@ -0,0 +1,71 @@ +"""End-to-end checks of the broadcaster-aware /healthcheck route. + +Verifies the wiring in OpalServer._init_fast_api_app: a wedged +reconnecting broadcaster (listeners present, reader task missing) makes +/healthcheck return 503 so a k8s probe can act, while / stays a trivial +liveness 200. +""" +import contextlib + +from opal_server.config import opal_server_config +from opal_server.pubsub_resilience import ReconnectingBroadcaster +from opal_server.server import OpalServer +from starlette.testclient import TestClient + + +@contextlib.contextmanager +def _override_config(**overrides): + saved = {key: getattr(opal_server_config, key) for key in overrides} + try: + for key, value in overrides.items(): + setattr(opal_server_config, key, value) + yield + finally: + for key, value in saved.items(): + setattr(opal_server_config, key, value) + + +def _build_server(): + return OpalServer( + init_policy_watcher=False, + broadcaster_uri=None, + enable_jwks_endpoint=False, + ) + + +def _wedge(server): + """Install a reconnecting broadcaster whose reader is wedged (listeners + present, reader task missing) — the exact state the staging incident got + stuck in.""" + broadcaster = ReconnectingBroadcaster( + "memory://", notifier=server.pubsub.notifier, channel="test" + ) + broadcaster._listen_count = 1 + broadcaster._subscription_task = None + server.pubsub.broadcaster = broadcaster + + +def test_root_and_healthcheck_ok_without_broadcaster(): + client = TestClient(_build_server().app) + assert client.get("/").status_code == 200 + assert client.get("/healthcheck").status_code == 200 + + +def test_healthcheck_503_when_reader_wedged(): + server = _build_server() + _wedge(server) + client = TestClient(server.app) + + response = client.get("/healthcheck") + assert response.status_code == 503 + assert response.json()["broadcaster"] == "unhealthy" + # liveness stays up even while readiness is failing + assert client.get("/").status_code == 200 + + +def test_healthcheck_kill_switch_keeps_it_ok_when_disabled(): + server = _build_server() + _wedge(server) + client = TestClient(server.app) + with _override_config(BROADCAST_HEALTHCHECK_ENABLED=False): + assert client.get("/healthcheck").status_code == 200 diff --git a/packages/opal-server/opal_server/tests/reconnecting_broadcaster_test.py b/packages/opal-server/opal_server/tests/reconnecting_broadcaster_test.py index fc3091fbf..67bf3e15b 100644 --- a/packages/opal-server/opal_server/tests/reconnecting_broadcaster_test.py +++ b/packages/opal-server/opal_server/tests/reconnecting_broadcaster_test.py @@ -190,3 +190,74 @@ def test_backoff_is_bounded(): delays = [broadcaster._backoff_seconds(attempt) for attempt in range(0, 25)] assert all(0.0 <= delay <= 10.0 for delay in delays) assert max(delays) > 0 + + +def _make_broadcaster(): + return ReconnectingBroadcaster( + "memory://", + notifier=FakeNotifier(), + channel="test", + ) + + +def test_is_reader_healthy_idle_when_no_listeners(): + # No clients depend on the reader; its absence is expected idleness, not a fault. + broadcaster = _make_broadcaster() + broadcaster._listen_count = 0 + broadcaster._subscription_task = None + assert broadcaster.is_reader_healthy() is True + + +@pytest.mark.asyncio +async def test_is_reader_healthy_with_live_pending_reader(): + # Listeners present and the reader is a live, pending task (this also models the + # mid-reconnect state ReconnectingBroadcaster keeps pending across a backbone gap). + broadcaster = _make_broadcaster() + broadcaster._listen_count = 1 + task = asyncio.create_task(asyncio.sleep(60)) + broadcaster._subscription_task = task + try: + assert broadcaster.is_reader_healthy() is True + finally: + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + +def test_is_reader_healthy_false_when_reader_missing(): + # Listeners present but the reader task is None: leaked listen-count / never-started. + broadcaster = _make_broadcaster() + broadcaster._listen_count = 1 + broadcaster._subscription_task = None + assert broadcaster.is_reader_healthy() is False + + +@pytest.mark.asyncio +async def test_is_reader_healthy_false_when_reader_done(): + # Listeners present but the reader task completed: it crashed or gave up retrying. + broadcaster = _make_broadcaster() + broadcaster._listen_count = 1 + + async def _noop(): + return None + + task = asyncio.create_task(_noop()) + await task + broadcaster._subscription_task = task + assert task.done() + assert broadcaster.is_reader_healthy() is False + + +@pytest.mark.asyncio +async def test_is_reader_healthy_false_when_reader_cancelled(): + # A cancelled reader is also "done" — the wedged state after a clean shutdown + # of the reader while listeners somehow remain. + broadcaster = _make_broadcaster() + broadcaster._listen_count = 1 + task = asyncio.create_task(asyncio.sleep(60)) + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + broadcaster._subscription_task = task + assert task.done() + assert broadcaster.is_reader_healthy() is False From 2ffa9d0cc7694a4a1973238395e5df5d24c4f0b9 Mon Sep 17 00:00:00 2001 From: Zeev Manilovich Date: Wed, 10 Jun 2026 16:05:25 +0300 Subject: [PATCH 4/8] test(opal-server): validate broadcaster /healthcheck under a simulated DB kill Drives a real ReconnectingBroadcaster reader against a fault-injectable backbone: - transient DB kill -> reader stays pending (reconnecting) -> is_reader_healthy() stays True / /healthcheck stays 200 (the no-flap property: no needless restart during a normal blip), and recovers on bus.recover(); - permanent kill that exhausts reconnect retries -> reader task done -> is_reader_healthy() False / /healthcheck 503. Plus route-level checks (pending -> 200, gave-up -> 503 with body, / stays 200). Co-Authored-By: Claude Opus 4.8 (1M context) --- .../tests/healthcheck_db_kill_test.py | 223 ++++++++++++++++++ 1 file changed, 223 insertions(+) create mode 100644 packages/opal-server/opal_server/tests/healthcheck_db_kill_test.py diff --git a/packages/opal-server/opal_server/tests/healthcheck_db_kill_test.py b/packages/opal-server/opal_server/tests/healthcheck_db_kill_test.py new file mode 100644 index 000000000..371759088 --- /dev/null +++ b/packages/opal-server/opal_server/tests/healthcheck_db_kill_test.py @@ -0,0 +1,223 @@ +"""Broadcaster-aware /healthcheck behavior under a simulated database kill. + +These tests drive a real ``ReconnectingBroadcaster`` reader against an in-memory, +fault-injectable backbone (``InMemoryBackbone`` from the consistency test, where +``fault()`` == "kill the DB" and ``recover()`` restores it) and assert the +*non-flapping* property the probe exists to provide: + +* A **transient** backbone/DB kill keeps the reader pending (it reconnects), so + ``is_reader_healthy()`` stays True and /healthcheck stays 200 — no needless pod + restart during a normal blip. +* A backbone/DB kill the reader **gives up on** (reconnect attempts exhausted → + reader task done) flips ``is_reader_healthy()`` to False and /healthcheck to 503, + so k8s can act. +* After **recovery** the reader reconnects and health returns to True / 200. + +The route-level assertions tie these predicate states to the real OpalServer +``/healthcheck`` endpoint, proving the wiring and not just the predicate. +""" +import asyncio +import contextlib + +import pytest +from fastapi_websocket_pubsub.websocket_rpc_event_notifier import ( + WebSocketRpcEventNotifier, +) +from opal_server.pubsub_resilience import ReconnectingBroadcaster +from opal_server.server import OpalServer +from opal_server.tests.broadcaster_consistency_integration_test import InMemoryBackbone +from starlette.testclient import TestClient + +CHANNEL = "EventNotifier" + + +async def _wait_for(predicate, timeout=5.0): + """Poll ``predicate`` until it is truthy or ``timeout`` elapses. + + Copied from the consistency test for determinism: prefer polling a condition + over fixed sleeps so the tests are fast and not timing-fragile. + """ + loop = asyncio.get_event_loop() + deadline = loop.time() + timeout + while loop.time() < deadline: + if predicate(): + return + await asyncio.sleep(0.01) + raise AssertionError("condition not met within timeout") + + +def _make_live_broadcaster(bus, **kwargs): + """A ReconnectingBroadcaster wired to ``bus`` with small/zero backoff. + + ``_listen_count`` is set to 1 to represent "clients depend on the reader" — + that is the state in which ``is_reader_healthy()`` reflects the reader task. + """ + broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=WebSocketRpcEventNotifier(), + channel=CHANNEL, + broadcast_type=bus.factory, + reconnect_backoff_min=0.01, + reconnect_backoff_max=0.02, + **kwargs, + ) + broadcaster._listen_count = 1 + return broadcaster + + +async def _cancel_reader(task): + if task is None: + return + task.cancel() + try: + await task + except (asyncio.CancelledError, Exception): + pass + + +@contextlib.contextmanager +def _pending_reader_task(): + """Yield a never-completing task standing in for a live, pending reader. + + The task lives on a dedicated loop that stays open for the body and is + cancelled, drained, and closed on exit — so there is no "Task was destroyed + but it is pending" warning. The route only reads ``is_reader_healthy()`` (a + synchronous attribute check), so the task itself is never awaited by the route. + """ + loop = asyncio.new_event_loop() + task = loop.create_task(asyncio.Event().wait()) + try: + yield task + finally: + task.cancel() + loop.run_until_complete(asyncio.gather(task, return_exceptions=True)) + loop.close() + + +def _done_task(): + """An already-completed task standing in for a reader that gave up retrying.""" + loop = asyncio.new_event_loop() + try: + task = loop.create_task(asyncio.sleep(0)) + loop.run_until_complete(task) + return task + finally: + loop.close() + + +@pytest.mark.asyncio +async def test_healthcheck_stays_ok_through_transient_db_kill(): + """A transient DB kill must NOT flip the reader to unhealthy. + + ``reconnect_max_retries=0`` (retry forever): the reader keeps reconnecting, so + its task stays pending across the kill and ``is_reader_healthy()`` stays True + throughout — the property that stops the probe from flapping the pod during a + normal backbone blip. After recovery the reader re-subscribes and stays healthy. + """ + bus = InMemoryBackbone() + broadcaster = _make_live_broadcaster(bus, reconnect_max_retries=0) + reader = await broadcaster.start_reader_task() + try: + await _wait_for(lambda: bus.subscriber_count() >= 1) + assert broadcaster.is_reader_healthy() is True + + # Kill the DB: connect()/publish() now raise and active subscribers end. + bus.fault() + await _wait_for(lambda: bus.subscriber_count() == 0) + # Give the reconnect loop time to spin (it cannot reconnect while faulted). + await asyncio.sleep(0.1) + + # The crucial non-flap assertion: the reader is still pending and healthy + # mid-reconnect, so /healthcheck would stay 200, not restart the pod. + assert not reader.done() + assert broadcaster.is_reader_healthy() is True + + # Recover the DB: the reader re-subscribes and remains healthy. + bus.recover() + await _wait_for(lambda: bus.subscriber_count() >= 1) + assert not reader.done() + assert broadcaster.is_reader_healthy() is True + finally: + await _cancel_reader(reader) + + +@pytest.mark.asyncio +async def test_healthcheck_goes_503_when_reconnect_gives_up(): + """A permanent DB kill that exhausts retries must flip the reader to unhealthy. + + ``reconnect_max_retries=3`` with tiny backoff: a permanent fault drives the + reconnect loop to give up; the reader task completes (done). With listeners + present that is exactly the wedged state ``is_reader_healthy()`` reports False + for, which is what makes /healthcheck return 503. + """ + bus = InMemoryBackbone() + broadcaster = _make_live_broadcaster(bus, reconnect_max_retries=3) + reader = await broadcaster.start_reader_task() + try: + await _wait_for(lambda: bus.subscriber_count() >= 1) + assert broadcaster.is_reader_healthy() is True + + # Kill the DB permanently (never recover): every reconnect attempt fails. + bus.fault() + + # The reader exhausts its retries and completes cleanly (no escaping error). + await asyncio.wait_for(reader, timeout=5) + assert reader.done() + assert reader.exception() is None + # Listeners still depend on a now-dead reader -> unhealthy -> 503. + assert broadcaster.is_reader_healthy() is False + finally: + await _cancel_reader(reader) + + +def _build_server(): + return OpalServer( + init_policy_watcher=False, + broadcaster_uri=None, + enable_jwks_endpoint=False, + ) + + +def test_route_healthcheck_200_when_reader_pending(): + """Route-level: reader pending (healthy) -> GET /healthcheck == 200. + + Uses a deterministic never-completing task for the pending reader (what the + route reads is the synchronous ``is_reader_healthy()`` state check); the live + fault/reconnect/give-up path is exercised by the async tests above. + """ + server = _build_server() + broadcaster = ReconnectingBroadcaster( + "memory://", notifier=server.pubsub.notifier, channel=CHANNEL + ) + broadcaster._listen_count = 1 + with _pending_reader_task() as pending: + broadcaster._subscription_task = pending + server.pubsub.broadcaster = broadcaster + + assert broadcaster.is_reader_healthy() is True + client = TestClient(server.app) + assert client.get("/healthcheck").status_code == 200 + + +def test_route_healthcheck_503_when_reader_gave_up(): + """Route-level: reader done (gave up) -> GET /healthcheck == 503. + + Uses an already-completed task for the "gave up" reader; the route reads the + synchronous ``is_reader_healthy()`` state, which is False with listeners present + and a done reader task. + """ + server = _build_server() + broadcaster = ReconnectingBroadcaster( + "memory://", notifier=server.pubsub.notifier, channel=CHANNEL + ) + broadcaster._listen_count = 1 + broadcaster._subscription_task = _done_task() + server.pubsub.broadcaster = broadcaster + + assert broadcaster.is_reader_healthy() is False + client = TestClient(server.app) + response = client.get("/healthcheck") + assert response.status_code == 503 + assert response.json()["broadcaster"] == "unhealthy" + # Liveness stays up even while readiness fails. + assert client.get("/").status_code == 200 From c8afdfc4aec42a890131f9a1d2536e0155191b0a Mon Sep 17 00:00:00 2001 From: Zeev Manilovich Date: Wed, 10 Jun 2026 16:27:39 +0300 Subject: [PATCH 5/8] chore: repair pre-commit for prek / pre-commit>=4 and Python 3.13+ MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The docformatter hook pinned at v1.7.5 ships a second hook (`docformatter-venv`) declaring `language: python_venv`, which prek and pre-commit>=4 reject for the whole manifest — so hooks could not initialize locally. CI passed only because it pins `pre-commit<4` on Python 3.12. - Bump docformatter v1.7.5 -> v1.7.6 (drops the `docformatter-venv` hook; keeps `language: python`). v1.7.6 is byte-identical to v1.7.5 on this codebase, so the bump introduces no reformatting (v1.7.7 changes docstring wrapping, hence v1.7.6). - Pin the hook toolchain to Python 3.12 via default_language_version: the pinned black 23.1.0 / isort / docformatter do not run on 3.13+, so hooks failed on machines whose default interpreter is newer. - Apply docformatter to healthcheck_db_kill_test.py (the one file that predated this fix and had not been run through the hook). Co-Authored-By: Claude Opus 4.8 (1M context) --- .pre-commit-config.yaml | 11 ++++++++++- .../opal_server/tests/healthcheck_db_kill_test.py | 6 ++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8ae1d7512..2bc699fa3 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,3 +1,9 @@ +# The pinned formatter versions below (black 23.1.0, isort 5.12.0, docformatter +# 1.7.x) do not run on Python 3.13+, so pin the hook toolchain to 3.12 (the version +# CI uses). Without this, hooks fail on machines whose default python is newer. +default_language_version: + python: python3.12 + repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v5.0.0 @@ -18,8 +24,11 @@ repos: hooks: - id: codespell args: [--skip, "*.json,*.lock"] + # v1.7.6 dropped the `docformatter-venv` hook whose `language: python_venv` made + # prek / pre-commit>=4 reject the whole manifest. Pinned to v1.7.6 (not v1.7.7, + # whose docstring wrapping differs) so the bump introduces no reformatting. - repo: https://github.com/PyCQA/docformatter - rev: v1.7.5 + rev: v1.7.6 hooks: - id: docformatter args: [--in-place] diff --git a/packages/opal-server/opal_server/tests/healthcheck_db_kill_test.py b/packages/opal-server/opal_server/tests/healthcheck_db_kill_test.py index 371759088..a64eaee53 100644 --- a/packages/opal-server/opal_server/tests/healthcheck_db_kill_test.py +++ b/packages/opal-server/opal_server/tests/healthcheck_db_kill_test.py @@ -95,7 +95,8 @@ def _pending_reader_task(): def _done_task(): - """An already-completed task standing in for a reader that gave up retrying.""" + """An already-completed task standing in for a reader that gave up + retrying.""" loop = asyncio.new_event_loop() try: task = loop.create_task(asyncio.sleep(0)) @@ -143,7 +144,8 @@ async def test_healthcheck_stays_ok_through_transient_db_kill(): @pytest.mark.asyncio async def test_healthcheck_goes_503_when_reconnect_gives_up(): - """A permanent DB kill that exhausts retries must flip the reader to unhealthy. + """A permanent DB kill that exhausts retries must flip the reader to + unhealthy. ``reconnect_max_retries=3`` with tiny backoff: a permanent fault drives the reconnect loop to give up; the reader task completes (done). With listeners From c3a3f09d182840e0cb5250efa82ff31d5c73713e Mon Sep 17 00:00:00 2001 From: Zeev Manilovich Date: Thu, 11 Jun 2026 13:22:22 +0300 Subject: [PATCH 6/8] fix(app-tests): stabilize broadcaster e2e (deterministic replay + clean retries) The e2e failed in CI for two reasons: 1. The cross-instance consistency check was racy. A data update published to one server while the backbone is down reaches the second server's client only via the replay buffer (the base data config restores only /static on a resync), and with the default reconnect backoff (max 30s) the second server could still be mid-reconnect when the first replayed after the 2s settle, missing it. Pin the broadcaster timing in the compose env (BACKOFF_MIN=0.5, BACKOFF_MAX=2, RESYNC_SETTLE_SECONDS=3) so both replicas re-subscribe before the replay fires. 2. The retry loop re-ran main() without tearing the stack down, so generate_opal_keys could not bind host port 7002 and the previous attempt's stale transient client ERRORs tripped check_no_error. Tear the stack down between attempts. Validated locally: the full app-tests suite (including the graceful + ungraceful broadcaster kills and the consistency scenario) now passes on the first attempt. Co-Authored-By: Claude Opus 4.8 (1M context) --- app-tests/docker-compose-app-tests.yml | 9 +++++++++ app-tests/run.sh | 8 ++++++++ 2 files changed, 17 insertions(+) diff --git a/app-tests/docker-compose-app-tests.yml b/app-tests/docker-compose-app-tests.yml index 7d126bfb8..1abd65853 100644 --- a/app-tests/docker-compose-app-tests.yml +++ b/app-tests/docker-compose-app-tests.yml @@ -61,6 +61,15 @@ services: endpoint_mode: vip environment: - OPAL_BROADCAST_URI=postgres://postgres:postgres@broadcast_channel:5432/postgres + # Make the broadcaster reconnect fast and replay after both replicas have + # re-subscribed, so a cross-instance update published during a backbone + # outage deterministically converges to both clients (see run.sh + # "cross-instance consistency" test). With backoff_max=2 both servers + # re-subscribe within ~2s of the DB returning, while settle=3 delays the + # replay until after that, guaranteeing the replayed update is delivered. + - OPAL_BROADCAST_RECONNECT_BACKOFF_MIN_SECONDS=0.5 + - OPAL_BROADCAST_RECONNECT_BACKOFF_MAX_SECONDS=2 + - OPAL_BROADCAST_RESYNC_SETTLE_SECONDS=3 - UVICORN_NUM_WORKERS=4 - OPAL_POLICY_REPO_URL=http://gitea:3000/gitea_admin/policy-repo.git - OPAL_POLICY_REPO_MAIN_BRANCH=${POLICY_REPO_BRANCH:-main} diff --git a/app-tests/run.sh b/app-tests/run.sh index 0a0a29550..6c4bf5657 100755 --- a/app-tests/run.sh +++ b/app-tests/run.sh @@ -433,6 +433,14 @@ while [ $RETRY_COUNT -lt $MAX_RETRIES ]; do main && break RETRY_COUNT=$((RETRY_COUNT + 1)) echo "Test failed, retrying..." + # Tear the stack down before retrying so the next attempt starts clean: + # generate_opal_keys binds host port 7002, which conflicts with a leftover + # stack, and stale (transient) client ERRORs from the previous attempt's + # broadcaster kills would otherwise trip check_no_error. The compose helper + # uses --env-file .env, which exists after the first attempt's keygen. + compose down --remove-orphans --volumes 2>/dev/null || true + docker rm -f --wait opal-server-keygen 2>/dev/null || true + rm -rf ./opal-tests-policy-repo ./temp-repo ./gitea-data ./git-repos 2>/dev/null || true done if [ $RETRY_COUNT -ge $MAX_RETRIES ]; then From 13fb57e182ad592117651c55467d6b42381d3d1f Mon Sep 17 00:00:00 2001 From: Zeev Manilovich Date: Thu, 11 Jun 2026 21:03:37 +0300 Subject: [PATCH 7/8] refactor(opal-server): remove broadcast-conn-loss experiment flag; harden recovery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove the previous fix attempt for the broadcaster connection-loss storm — the OPAL_BROADCAST_CONN_LOSS_BUGFIX_EXPERIMENT_ENABLED flag — now that the reconnecting broadcaster is the real fix. - Drop the experiment flag; derive ignore_broadcaster_disconnected from the broadcaster type: False for ReconnectingBroadcaster (a completed reader surfaces the disconnect so clients reconnect), True (library-safe) for the stock EventBroadcaster rollback so the legacy path degrades to "stale but connected" instead of the storm. - Pin the whole post-gap recovery in a listening context so the reader cannot be cancelled mid-recovery (previously only close_all_staggered was pinned). - Flush the replay buffer without holding the buffer lock across network I/O; re-enqueue the unsent tail on a mid-drain transport failure. - Cancel AND join child tasks on reader shutdown. - Remove the write-only _buffer_overflowed flag and the config-doc claim that overflow triggers a resync (it does not; the resync fires unconditionally on every gap). Tests: add reconnect/disconnect/slow-connection coverage — flaky-reconnect-recovers, slow-connect-stays-pending, subscribe-fail-reconnects, partial-replay-requeue, buffer overflow/disabled, deterministic single-flight, max_retries>0-then-recover, skip-own, shutdown-cancels-children, and the resync-pins-the-reader-alive invariant. The consistency harness now holds a listening context to mirror a connected client. Reviewed by python-pro + fastapi-pro; findings addressed. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../docs/getting-started/configuration.mdx | 46 ++- packages/opal-server/opal_server/config.py | 7 +- packages/opal-server/opal_server/pubsub.py | 30 +- .../opal_server/pubsub_resilience.py | 111 +++--- ...roadcaster_consistency_integration_test.py | 14 +- .../tests/reconnecting_broadcaster_test.py | 370 +++++++++++++++++- 6 files changed, 508 insertions(+), 70 deletions(-) diff --git a/documentation/docs/getting-started/configuration.mdx b/documentation/docs/getting-started/configuration.mdx index d71278338..79d09b7cc 100644 --- a/documentation/docs/getting-started/configuration.mdx +++ b/documentation/docs/getting-started/configuration.mdx @@ -526,11 +526,53 @@ The channel name for broadcasting messages. For more information, see [running OPAL with Kafka](/tutorials/run_opal_with_kafka) and [running OPAL with Apache Pulsar](/tutorials/run_opal_with_pulsar). -#### OPAL_BROADCAST_CONN_LOSS_BUGFIX_EXPERIMENT_ENABLED +#### OPAL_BROADCAST_RECONNECT_ENABLED Default: `True` -Enable experimental fix for broadcast connection loss issues. +Reconnect the broadcaster reader on a backbone disconnect instead of dropping all client connections. Set to `False` to revert to the legacy (non-reconnecting) broadcaster. + +#### OPAL_BROADCAST_RECONNECT_MAX_RETRIES + +Default: `0` + +Maximum consecutive broadcaster reconnect attempts before giving up and letting the worker restart (`0` = retry forever). + +#### OPAL_BROADCAST_RECONNECT_BACKOFF_MIN_SECONDS + +Default: `0.5` + +Minimum backoff in seconds between broadcaster reconnect attempts. + +#### OPAL_BROADCAST_RECONNECT_BACKOFF_MAX_SECONDS + +Default: `30.0` + +Maximum backoff in seconds between broadcaster reconnect attempts. + +#### OPAL_BROADCAST_REPLAY_BUFFER_SIZE + +Default: `10000` + +Maximum number of outbound broadcasts buffered while the backbone is down and replayed on reconnect (`0` disables buffering). + +#### OPAL_BROADCAST_RESYNC_ON_RECONNECT + +Default: `True` + +After a backbone gap that may have lost updates, force this worker's connected clients to reconnect so they re-fetch full policy + data state (guarantees cross-instance consistency). + +#### OPAL_BROADCAST_RESYNC_SETTLE_SECONDS + +Default: `2.0` + +Grace period after a broadcaster reconnect before replaying buffered broadcasts and resyncing clients, to let peer servers re-subscribe. + +#### OPAL_BROADCAST_HEALTHCHECK_ENABLED + +Default: `True` + +Make `/healthcheck` reflect the broadcaster reader's health, so a Kubernetes readiness/liveness probe can route away from or restart a worker whose reader is wedged while clients depend on it. #### OPAL_BROADCAST_KEEPALIVE_INTERVAL diff --git a/packages/opal-server/opal_server/config.py b/packages/opal-server/opal_server/config.py index 069cca0bf..58341ff37 100644 --- a/packages/opal-server/opal_server/config.py +++ b/packages/opal-server/opal_server/config.py @@ -52,11 +52,6 @@ class OpalServerConfig(Confi): "EventNotifier", description="The name to be used for segmentation in the backbone pub/sub", ) - BROADCAST_CONN_LOSS_BUGFIX_EXPERIMENT_ENABLED = confi.bool( - "BROADCAST_CONN_LOSS_BUGFIX_EXPERIMENT_ENABLED", - True, - description="Enable experimental bugfix for broadcast connection loss", - ) BROADCAST_RECONNECT_ENABLED = confi.bool( "BROADCAST_RECONNECT_ENABLED", True, @@ -85,7 +80,7 @@ class OpalServerConfig(Confi): 10000, description="Max number of outbound broadcasts buffered while the backbone is " "down and replayed on reconnect (0 disables buffering). On overflow the oldest " - "are dropped and clients are resynced instead.", + "buffered broadcasts are dropped; the resync on reconnect still reconciles clients.", ) BROADCAST_RESYNC_ON_RECONNECT = confi.bool( "BROADCAST_RESYNC_ON_RECONNECT", diff --git a/packages/opal-server/opal_server/pubsub.py b/packages/opal-server/opal_server/pubsub.py index 6cc6742e8..8f8a0a0d9 100644 --- a/packages/opal-server/opal_server/pubsub.py +++ b/packages/opal-server/opal_server/pubsub.py @@ -170,13 +170,23 @@ def __init__(self, signer: JWTSigner, broadcaster_uri: str = None): else: logger.info("Pub/Sub broadcaster is off") - # The server endpoint + # The server endpoint. + # + # ignore_broadcaster_disconnected=False races each client's main_loop against the + # shared broadcaster reader task, so a completed reader surfaces the disconnect and + # the client reconnects. That is only safe with ReconnectingBroadcaster, whose + # reader stays *pending* across transient drops and completes only after + # BROADCAST_RECONNECT_MAX_RETRIES is exhausted (the intended last resort). For the + # stock EventBroadcaster (reconnect disabled) the reader dies on the first drop, so + # we keep the library-safe default (True) to degrade to "stale but connected" + # rather than the fleet-wide drop storm. (Replaces an earlier experimental + # broadcast-connection-loss flag.) self.endpoint = PubSubEndpoint( broadcaster=self.broadcaster, notifier=self.notifier, rpc_channel_get_remote_id=opal_common_config.STATISTICS_ENABLED, - ignore_broadcaster_disconnected=( - not opal_server_config.BROADCAST_CONN_LOSS_BUGFIX_EXPERIMENT_ENABLED + ignore_broadcaster_disconnected=not isinstance( + self.broadcaster, ReconnectingBroadcaster ), ) # fastapi_websocket_rpc's ConnectionManager.disconnect is not idempotent: the RPC @@ -270,14 +280,12 @@ async def _on_broadcaster_reconnect(): "Broadcaster recovered after a gap; resyncing this worker's clients " "so they re-fetch current policy + data state" ) - # Closing every client would drive the broadcaster's listener count to 0 - # and cancel the reconnecting reader task. Pin a listening context so the - # reader survives the recycle, and hold it briefly so reconnecting clients - # re-establish the count before we release. - async with broadcaster.get_listening_context(): - await manager.close_all_staggered() - if settle > 0: - await asyncio.sleep(settle) + # The reader is kept alive across this recycle by the listening context the + # broadcaster pins around the whole recovery (see _recover_after_gap), so + # closing every client here cannot cancel it. + await manager.close_all_staggered() + if settle > 0: + await asyncio.sleep(settle) broadcaster.set_reconnect_callback(_on_broadcaster_reconnect) diff --git a/packages/opal-server/opal_server/pubsub_resilience.py b/packages/opal-server/opal_server/pubsub_resilience.py index adb6a2860..4f498cd6c 100644 --- a/packages/opal-server/opal_server/pubsub_resilience.py +++ b/packages/opal-server/opal_server/pubsub_resilience.py @@ -144,7 +144,6 @@ def __init__( # Single lock serialises buffer mutation, the overflow flag, and the flush, # so a concurrent broadcast cannot race the flush's drain/flag-reset. self._buffer_lock = asyncio.Lock() - self._buffer_overflowed = False # (A) gap detection + single-flight resync hook. self._had_prior_connection = False self._recovery_task: Optional[asyncio.Task] = None @@ -224,20 +223,19 @@ async def __broadcast_notifications__(self, subscription: Subscription, data): async def _buffer_outbound(self, topic, data, error: Exception): async with self._buffer_lock: if self._replay_buffer_size <= 0: - # buffering disabled — nothing to replay; the gap resync is the only - # recovery path. Nothing to record here. + # buffering disabled — the gap resync is the only recovery path. logger.warning( f"Broadcast to backbone failed ({error!r}); replay buffer disabled" ) return - if len(self._outbound_buffer) >= self._replay_buffer_size: - # at capacity: this append drops the oldest entry, unrecoverable. - self._buffer_overflowed = True + # At capacity this append drops the oldest entry (bounded deque); the resync + # on reconnect still reconciles clients, so the drop only widens the window. + overflow = len(self._outbound_buffer) >= self._replay_buffer_size self._outbound_buffer.append((topic, data)) logger.warning( f"Broadcast to backbone failed ({error!r}); buffered for replay " f"({len(self._outbound_buffer)}/{self._replay_buffer_size}" - f"{', OVERFLOW' if self._buffer_overflowed else ''})" + f"{', OVERFLOW — oldest dropped' if overflow else ''})" ) async def __read_notifications__(self): @@ -263,7 +261,7 @@ async def __read_notifications__(self): ) except asyncio.CancelledError: logger.info("Broadcaster listener cancelled; stopping") - self._cancel_pending_tasks() + await self._cancel_pending_tasks() raise except Exception as e: attempt += 1 @@ -281,9 +279,16 @@ async def __read_notifications__(self): await self._safe_disconnect_channel() await asyncio.sleep(self._backoff_seconds(attempt)) - def _cancel_pending_tasks(self): - for task in list(self._tasks): + async def _cancel_pending_tasks(self): + # Cancel AND join child notify/recovery tasks so they fully unwind (releasing any + # pinned listening context) before the reader re-raises its own cancellation. + # Exclude the current task defensively (the reader is not in _tasks, but be safe). + current = asyncio.current_task() + tasks = [task for task in self._tasks if task is not current] + for task in tasks: task.cancel() + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) def _schedule_gap_recovery(self): # Single-flight: a flap during the settle window must not spawn a second @@ -297,12 +302,18 @@ def _schedule_gap_recovery(self): async def _recover_after_gap(self): """After reconnecting following a gap: let peers re-subscribe, replay the - buffer (B), then fire the resync hook (A).""" + buffer (B), then fire the resync hook (A). + + The whole recovery runs inside a pinned listening context so the reader task + cannot be cancelled mid-recovery — neither by the resync hook closing every + client nor by an unrelated drop to zero listeners during the settle window. + """ try: - if self._resync_settle_seconds > 0: - await asyncio.sleep(self._resync_settle_seconds) - await self._flush_outbound_buffer() - await self._fire_reconnect() + async with self.get_listening_context(): + if self._resync_settle_seconds > 0: + await asyncio.sleep(self._resync_settle_seconds) + await self._flush_outbound_buffer() + await self._fire_reconnect() except asyncio.CancelledError: raise except Exception: @@ -311,43 +322,49 @@ async def _recover_after_gap(self): async def _flush_outbound_buffer(self): """Replay buffered broadcasts to the backbone (best-effort). - Held under ``_buffer_lock`` so a concurrent failed broadcast cannot mutate - the buffer mid-drain. Items that can no longer be serialized are dropped (so - one poison payload can't wedge the buffer); a transport failure stops the - drain and leaves the rest for the next recovery. + The buffer is drained into a local snapshot under ``_buffer_lock`` and then + published WITHOUT the lock, so a concurrent failed broadcast can still buffer + (and a slow/hung publish can't wedge the buffer lock). Items that can no longer + be serialized are dropped (so one poison payload can't wedge the buffer); a + transport failure re-enqueues the unsent tail (in order, at the front) for the + next recovery. """ async with self._buffer_lock: - self._buffer_overflowed = False if not self._outbound_buffer: return - count = len(self._outbound_buffer) - logger.info(f"Replaying {count} buffered broadcast(s) after recovery") - try: - async with self._broadcast_type(self._broadcast_url) as channel: - while self._outbound_buffer: - topic, data = self._outbound_buffer[0] - try: - payload = pydantic_serialize( - BroadcastNotification( - notifier_id=self._id, topics=[topic], data=data - ) - ) - except Exception as e: - logger.error( - f"Dropping un-serializable buffered broadcast on " - f"topic '{topic}': {e!r}" + pending = list(self._outbound_buffer) + self._outbound_buffer.clear() + logger.info(f"Replaying {len(pending)} buffered broadcast(s) after recovery") + unsent = list(pending) + try: + async with self._broadcast_type(self._broadcast_url) as channel: + while unsent: + topic, data = unsent[0] + try: + payload = pydantic_serialize( + BroadcastNotification( + notifier_id=self._id, topics=[topic], data=data ) - self._outbound_buffer.popleft() - continue - await channel.publish(self._channel, payload) - self._outbound_buffer.popleft() - except asyncio.CancelledError: - raise - except Exception as e: - logger.error( - f"Failed to replay buffered broadcasts ({len(self._outbound_buffer)} " - f"left, will retry on next recovery): {e!r}" - ) + ) + except Exception as e: + logger.error( + f"Dropping un-serializable buffered broadcast on " + f"topic '{topic}': {e!r}" + ) + unsent.pop(0) + continue + await channel.publish(self._channel, payload) + unsent.pop(0) + except asyncio.CancelledError: + raise + except Exception as e: + logger.error( + f"Failed to replay buffered broadcasts ({len(unsent)} left, will retry " + f"on next recovery): {e!r}" + ) + if unsent: + async with self._buffer_lock: + self._outbound_buffer.extendleft(reversed(unsent)) async def _fire_reconnect(self): if self._on_reconnect is None: diff --git a/packages/opal-server/opal_server/tests/broadcaster_consistency_integration_test.py b/packages/opal-server/opal_server/tests/broadcaster_consistency_integration_test.py index 670800910..ab4a14472 100644 --- a/packages/opal-server/opal_server/tests/broadcaster_consistency_integration_test.py +++ b/packages/opal-server/opal_server/tests/broadcaster_consistency_integration_test.py @@ -139,6 +139,7 @@ def __init__(self, bus, name, **broadcaster_kwargs): ) self.store = [] self._reader = None + self._listen_ctx = None async def start(self): async def record(subscription, data): @@ -147,12 +148,23 @@ async def record(subscription, data): await self.notifier.subscribe(f"{self.name}-store", ALL_TOPICS, record) # Share this worker's local notifications with the backbone, and listen. await self.broadcaster._subscribe_to_all_topics() - self._reader = await self.broadcaster.start_reader_task() + # Hold a listening context like a connected client would, so the reader is tied + # to a non-zero listener count (as in production) and the recovery pin cannot + # cancel it when that pin releases. + self._listen_ctx = self.broadcaster.get_listening_context() + await self._listen_ctx.__aenter__() + self._reader = self.broadcaster.get_reader_task() async def publish(self, topic, data): await self.notifier.notify([topic], data, notifier_id=f"{self.name}-publisher") async def stop(self): + if self._listen_ctx is not None: + try: + await self._listen_ctx.__aexit__(None, None, None) + except Exception: + pass + self._listen_ctx = None if self._reader is not None: self._reader.cancel() try: diff --git a/packages/opal-server/opal_server/tests/reconnecting_broadcaster_test.py b/packages/opal-server/opal_server/tests/reconnecting_broadcaster_test.py index 67bf3e15b..bc0eb1a8d 100644 --- a/packages/opal-server/opal_server/tests/reconnecting_broadcaster_test.py +++ b/packages/opal-server/opal_server/tests/reconnecting_broadcaster_test.py @@ -33,11 +33,28 @@ async def notify(self, topics, data, notifier_id=None): class FakeBus: """A controllable in-memory broadcast backbone for a single channel.""" - def __init__(self, fail_connect=False): - self.fail_connect = fail_connect + def __init__( + self, + fail_connect=False, + fail_connect_times=0, + fail_subscribe_times=0, + connect_gate=None, + fail_publish_after=None, + ): + self.fail_connect = fail_connect # permanently refuse to connect + self.fail_connect_times = ( + fail_connect_times # refuse the first N connects, then succeed + ) + self.fail_subscribe_times = fail_subscribe_times # fail the first N subscribes + self.connect_gate = ( + connect_gate # asyncio.Event; if set, connect awaits it (slow connect) + ) + # publish raises once this many payloads have published (transport fails mid-drain) + self.fail_publish_after = fail_publish_after self.connects = 0 self.subscribes = 0 self.disconnects = 0 + self.published = [] # payloads published via the replay/flush path self.queue = asyncio.Queue() def channel_factory(self, _url): @@ -58,7 +75,9 @@ def __init__(self, bus): async def connect(self): self._bus.connects += 1 - if self._bus.fail_connect: + if self._bus.connect_gate is not None: + await self._bus.connect_gate.wait() + if self._bus.fail_connect or self._bus.connects <= self._bus.fail_connect_times: raise ConnectionError("backbone refused connection") async def disconnect(self): @@ -67,6 +86,23 @@ async def disconnect(self): def subscribe(self, channel): return _FakeSubscription(self._bus) + # The replay/flush path uses the channel as an async context manager and publishes. + async def __aenter__(self): + if self._bus.fail_connect: + raise ConnectionError("backbone down") + return self + + async def __aexit__(self, *exc): + return False + + async def publish(self, channel, payload): + if ( + self._bus.fail_publish_after is not None + and len(self._bus.published) >= self._bus.fail_publish_after + ): + raise ConnectionError("publish failed") + self._bus.published.append(payload) + class _FakeSubscription: def __init__(self, bus): @@ -74,6 +110,8 @@ def __init__(self, bus): async def __aenter__(self): self._bus.subscribes += 1 + if self._bus.subscribes <= self._bus.fail_subscribe_times: + raise ConnectionError("subscribe failed") return _FakeSubscriber(self._bus) async def __aexit__(self, *exc): @@ -261,3 +299,329 @@ async def test_is_reader_healthy_false_when_reader_cancelled(): broadcaster._subscription_task = task assert task.done() assert broadcaster.is_reader_healthy() is False + + +@pytest.mark.asyncio +async def test_reader_recovers_after_transient_connect_failures(): + # Flaky reconnect: the first two reconnect attempts fail, the third succeeds. With + # max_retries=0 (retry forever) the reader must recover, not give up or die. + bus = FakeBus(fail_connect_times=2) + notifier = FakeNotifier() + broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=notifier, + channel="test", + broadcast_type=bus.channel_factory, + reconnect_backoff_min=0, + reconnect_backoff_max=0, + ) + task = await broadcaster.start_reader_task() + try: + await _wait_for(lambda: bus.subscribes >= 1) + assert bus.connects >= 3 # two failures, then a successful connect + assert not task.done() + # Delivery resumes once connected. + await bus.push(["policy_data"], {"x": 1}, notifier_id="other-server") + await _wait_for(lambda: notifier.notified) + finally: + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + +@pytest.mark.asyncio +async def test_reader_survives_slow_connect(): + # A slow (but not failed) backbone connect must keep the reader pending. Completing + # the reader task is exactly what cancels every client websocket in production, so a + # slow connection must never be mistaken for a dead reader. + gate = asyncio.Event() + bus = FakeBus(connect_gate=gate) + broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=FakeNotifier(), + channel="test", + broadcast_type=bus.channel_factory, + reconnect_backoff_min=0, + reconnect_backoff_max=0, + ) + task = await broadcaster.start_reader_task() + try: + await _wait_for(lambda: bus.connects >= 1) # connect started, but gated (slow) + await asyncio.sleep(0.05) + assert not task.done() # still pending mid-connect + assert bus.subscribes == 0 # not subscribed until the connect completes + assert ( + broadcaster.is_reader_healthy() is True + ) # healthy while connecting slowly + gate.set() # the slow connection finally completes + await _wait_for(lambda: bus.subscribes >= 1) + assert not task.done() + finally: + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + +@pytest.mark.asyncio +async def test_reader_reconnects_when_subscribe_fails(): + # connect succeeds but subscribe fails once (a partial connection). The reader must + # treat it like any backbone error and reconnect, then deliver. + bus = FakeBus(fail_subscribe_times=1) + notifier = FakeNotifier() + broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=notifier, + channel="test", + broadcast_type=bus.channel_factory, + reconnect_backoff_min=0, + reconnect_backoff_max=0, + ) + task = await broadcaster.start_reader_task() + try: + await _wait_for( + lambda: bus.subscribes >= 2 + ) # first subscribe failed, then retried + assert not task.done() + await bus.push(["policy_data"], {"x": 1}, notifier_id="other-server") + await _wait_for(lambda: notifier.notified) + finally: + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + +@pytest.mark.asyncio +async def test_buffer_overflow_drops_oldest(): + broadcaster = ReconnectingBroadcaster( + "memory://", notifier=FakeNotifier(), channel="test", replay_buffer_size=2 + ) + error = ConnectionError("backbone down") + await broadcaster._buffer_outbound("policy_data", {"n": 1}, error) + await broadcaster._buffer_outbound("policy_data", {"n": 2}, error) + assert len(broadcaster._outbound_buffer) == 2 + + await broadcaster._buffer_outbound("policy_data", {"n": 3}, error) # over capacity + assert len(broadcaster._outbound_buffer) == 2 + # The oldest entry ({"n": 1}) was dropped by the bounded deque. + assert [data for _, data in broadcaster._outbound_buffer] == [{"n": 2}, {"n": 3}] + + +@pytest.mark.asyncio +async def test_buffering_disabled_when_size_is_zero(): + # replay_buffer_size=0 disables buffering entirely; the resync is then the only + # recovery path, so nothing is retained here. + broadcaster = ReconnectingBroadcaster( + "memory://", notifier=FakeNotifier(), channel="test", replay_buffer_size=0 + ) + await broadcaster._buffer_outbound("policy_data", {"n": 1}, ConnectionError("down")) + assert len(broadcaster._outbound_buffer) == 0 + + +@pytest.mark.asyncio +async def test_flush_replays_buffered_broadcasts(): + bus = FakeBus() + broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=FakeNotifier(), + channel="test", + broadcast_type=bus.channel_factory, + replay_buffer_size=10, + ) + error = ConnectionError("backbone down") + await broadcaster._buffer_outbound("policy_data", {"n": 1}, error) + await broadcaster._buffer_outbound("policy_data", {"n": 2}, error) + + await broadcaster._flush_outbound_buffer() + + assert len(bus.published) == 2 # both replayed to the backbone + assert len(broadcaster._outbound_buffer) == 0 # buffer drained + + +@pytest.mark.asyncio +async def test_flush_drops_unserializable_buffered_broadcast(): + bus = FakeBus() + broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=FakeNotifier(), + channel="test", + broadcast_type=bus.channel_factory, + replay_buffer_size=10, + ) + # A poison payload that cannot be serialized must be dropped, not wedge the buffer. + broadcaster._outbound_buffer.append(("policy_data", object())) + broadcaster._outbound_buffer.append(("policy_data", {"n": 1})) + + await broadcaster._flush_outbound_buffer() + + assert len(broadcaster._outbound_buffer) == 0 # poison dropped, good one sent + assert len(bus.published) == 1 # only the serializable broadcast was published + + +@pytest.mark.asyncio +async def test_shutdown_cancels_pending_child_tasks(): + # On clean shutdown (reader cancellation) any in-flight notify/recovery child task + # must be cancelled too, so the worker does not leak tasks. + bus = FakeBus() + broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=FakeNotifier(), + channel="test", + broadcast_type=bus.channel_factory, + reconnect_backoff_min=0, + reconnect_backoff_max=0, + ) + task = await broadcaster.start_reader_task() + await _wait_for(lambda: bus.subscribes >= 1) + child = asyncio.create_task(asyncio.sleep(60)) + broadcaster._tasks.add(child) + + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + await _wait_for(lambda: child.done()) + assert child.cancelled() + + +@pytest.mark.asyncio +async def test_skips_own_broadcasts(): + # A broadcast tagged with our own notifier id must not be re-delivered to our local + # notifier (it was already delivered locally when first published). + notifier = FakeNotifier() + broadcaster = ReconnectingBroadcaster( + "memory://", notifier=notifier, channel="test" + ) + own = BroadcastNotification( + notifier_id=broadcaster._id, topics=["policy_data"], data={"x": 1} + ) + await broadcaster._handle_broadcast_event(_Event(own.json())) + assert notifier.notified == [] + + other = BroadcastNotification( + notifier_id="other-server", topics=["policy_data"], data={"x": 2} + ) + await broadcaster._handle_broadcast_event(_Event(other.json())) + await _wait_for(lambda: notifier.notified) + assert notifier.notified[0][0] == ["policy_data"] + + +@pytest.mark.asyncio +async def test_flush_partial_replay_requeues_unsent(): + # Transport fails after the first publish: the unsent tail must be re-buffered (in + # order, at the front) for the next recovery, and the sent item must not re-send. + bus = FakeBus(fail_publish_after=1) + broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=FakeNotifier(), + channel="test", + broadcast_type=bus.channel_factory, + replay_buffer_size=10, + ) + error = ConnectionError("backbone down") + await broadcaster._buffer_outbound("policy_data", {"n": 1}, error) + await broadcaster._buffer_outbound("policy_data", {"n": 2}, error) + await broadcaster._buffer_outbound("policy_data", {"n": 3}, error) + + await broadcaster._flush_outbound_buffer() + + assert ( + len(bus.published) == 1 + ) # only the first got through before the transport failed + assert [data for _, data in broadcaster._outbound_buffer] == [{"n": 2}, {"n": 3}] + + # The backbone heals; a second flush drains the rest without re-sending {"n": 1}. + bus.fail_publish_after = None + await broadcaster._flush_outbound_buffer() + assert len(bus.published) == 3 + assert len(broadcaster._outbound_buffer) == 0 + + +@pytest.mark.asyncio +async def test_single_flight_recovery_dedupes(): + # A second gap while a recovery is still in flight must not spawn a second recovery. + broadcaster = _make_broadcaster() + blocker = asyncio.Event() + broadcaster._recovery_task = asyncio.create_task(blocker.wait()) + broadcaster._tasks.add(broadcaster._recovery_task) + first = broadcaster._recovery_task + + broadcaster._schedule_gap_recovery() # second gap during the in-flight recovery + assert broadcaster._recovery_task is first # single-flight: no new recovery task + + blocker.set() + await first + + +@pytest.mark.asyncio +async def test_reader_recovers_within_max_retries(): + # max_retries=5 but only 3 transient failures, then success: the reader recovers and + # resets its attempt counter (a later drop gets the full budget again, not give-up). + bus = FakeBus(fail_connect_times=3) + notifier = FakeNotifier() + broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=notifier, + channel="test", + broadcast_type=bus.channel_factory, + reconnect_max_retries=5, + reconnect_backoff_min=0, + reconnect_backoff_max=0, + ) + task = await broadcaster.start_reader_task() + try: + await _wait_for(lambda: bus.subscribes >= 1) + assert bus.connects >= 4 # 3 failures + a success + assert not task.done() + await bus.push(["policy_data"], {"x": 1}, notifier_id="other-server") + await _wait_for(lambda: notifier.notified) + # Counter reset: a later drop gets the full retry budget again. + await bus.drop() + await _wait_for(lambda: bus.subscribes >= 2) + assert not task.done() + finally: + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + +@pytest.mark.asyncio +async def test_recovery_pins_reader_across_client_recycle(): + # The resync closes every client (driving the client-held listen count to 0), but + # _recover_after_gap pins its own listening context for the whole recovery, so the + # reader must NOT be cancelled while the recycle happens. + bus = FakeBus() + broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=FakeNotifier(), + channel="test", + broadcast_type=bus.channel_factory, + resync_settle_seconds=0, + reconnect_backoff_min=0, + reconnect_backoff_max=0, + ) + # One connected client holds the listening context (this also starts the reader). + client_ctx = broadcaster.get_listening_context() + await client_ctx.__aenter__() + reader = broadcaster.get_reader_task() + await _wait_for(lambda: bus.subscribes >= 1) + + observed = {} + + async def on_reconnect(): + # The resync closes the only client: drop its listening context. + await client_ctx.__aexit__(None, None, None) + # The pin in _recover_after_gap must hold the count, keeping the reader alive. + observed["reader_alive"] = not reader.done() + + broadcaster.set_reconnect_callback(on_reconnect) + broadcaster._had_prior_connection = True + + await broadcaster._recover_after_gap() + + assert observed["reader_alive"] is True + # The pin is released and no client remains, so the reader is cancelled by the + # listener count reaching zero — drain it. + if not reader.done(): + reader.cancel() + with pytest.raises(asyncio.CancelledError): + await reader From 671776ec4971031016aa729e15c567414865bdfa Mon Sep 17 00:00:00 2001 From: Zeev Manilovich Date: Sun, 14 Jun 2026 17:28:50 +0300 Subject: [PATCH 8/8] Address review comments: gap-detection, recovery rerun, give-up restart, replay order Addresses EliMoshkovich's deep-review findings 1-5 + the single-flight test note, and Copilot's docs comment, on PR #915. - F1 (HIGH): make gap detection reader-task-local instead of an instance flag, so a fresh reader task (after the last client left and one returns) does not fire a spurious resync on its first connect. Hits default stats-off deployments. - F2: single-flight recovery now reruns for a gap that lands during the late recovery phase (rerun-requested flag) instead of permanently dropping its flush + resync. - F3: on max-retries give-up, fire a give-up hook wired to the worker's graceful shutdown regardless of OPAL_STATISTICS_ENABLED (was only wired in the stats path); count connect-OK/instant-close flaps toward the retry budget so MAX_RETRIES can trip. - F5: partial-replay re-enqueue preserves drop-oldest (rebuild unsent+refill) instead of extendleft evicting the newest refill on a full deque. - F4 + Copilot (docs): document OPAL_BROADCAST_HEALTHCHECK_ENABLED in the resilience table; state the resync guarantee's scope (policy + configured data sources, not runtime inline updates) and the cross-worker replay-ordering caveat. - Strengthen the single-flight test assertion (was len(calls) <= 2, passed on 0). Co-Authored-By: Claude Opus 4.8 (1M context) --- .../run-opal-server/broadcast-interface.mdx | 5 + .../opal_server/pubsub_resilience.py | 168 +++++++++++--- packages/opal-server/opal_server/server.py | 30 +++ ...roadcaster_consistency_integration_test.py | 17 +- .../tests/healthcheck_endpoint_test.py | 33 +++ .../tests/reconnecting_broadcaster_test.py | 210 +++++++++++++++++- 6 files changed, 427 insertions(+), 36 deletions(-) diff --git a/documentation/docs/getting-started/running-opal/run-opal-server/broadcast-interface.mdx b/documentation/docs/getting-started/running-opal/run-opal-server/broadcast-interface.mdx index 3aa679278..28839995d 100644 --- a/documentation/docs/getting-started/running-opal/run-opal-server/broadcast-interface.mdx +++ b/documentation/docs/getting-started/running-opal/run-opal-server/broadcast-interface.mdx @@ -73,8 +73,13 @@ If the broadcast backbone (Postgres/Redis/Kafka) briefly drops — for example d | OPAL_BROADCAST_REPLAY_BUFFER_SIZE | `10000` | Max number of outbound broadcasts buffered while the backbone is down and replayed on reconnect (`0` disables buffering). On overflow the oldest are dropped. | | OPAL_BROADCAST_RESYNC_ON_RECONNECT | `true` | After a backbone gap, force this worker's clients to reconnect so they re-fetch full policy + data state. Set to `false` to rely only on best-effort replay. | | OPAL_BROADCAST_RESYNC_SETTLE_SECONDS | `2` | Grace period after a reconnect before replaying buffered broadcasts and resyncing clients, to let peer servers re-subscribe. | +| OPAL_BROADCAST_HEALTHCHECK_ENABLED | `true` | Make `/healthcheck` reflect the broadcaster reader's health: it returns `503` (instead of `200`) when the reader is wedged while clients depend on it, so a k8s readiness/liveness probe can route away from or restart the worker. A normal transient reconnect still reads healthy. Set to `false` to keep `/healthcheck` always `200`. | Consistency across the outage is handled in two layers. While the backbone is unreachable, client websocket connections are kept alive but cross-server fan-out is paused. On reconnect: - **Replay buffer** — broadcasts that failed to reach the backbone during the outage are replayed, so peer servers that have re-subscribed catch up without a client refetch. This is best-effort: the backbone keeps no replay of its own, so a peer that is slow to re-subscribe may miss a replayed message. - **Resync** (the guarantee) — each server forces its own clients to reconnect and re-fetch the full policy/data state. Because every server experienced the same gap, every server reconciles its own clients and the fleet converges to current truth. Updates missed during the gap are therefore reconciled even if the replay did not reach a peer in time. + +**Scope of the resync guarantee.** The resync re-fetch covers the policy and the **configured data sources** (those in `OPAL_DATA_CONFIG_SOURCES` / the server's data-source config) — clients fully refetch them. Runtime-published **incremental** updates — for example a `POST /data/config` that carries an inline `data` payload or a one-off fetch URL that is not part of the configured sources — are **not** part of that full refetch, so they are recoverable **only** through the best-effort replay buffer; if their replay is dropped (buffer overflow, or a peer that did not re-subscribe in time) the update is lost and not reconciled by the resync. + +**Replay ordering.** Cross-worker replay ordering is not enforced, so a buffered update can be delivered after a newer live update (this self-heals for fetch-URL entries, which re-fetch current state, but a stale value can win for inline-`data` entries that carry the value directly). diff --git a/packages/opal-server/opal_server/pubsub_resilience.py b/packages/opal-server/opal_server/pubsub_resilience.py index 4f498cd6c..2c5095be5 100644 --- a/packages/opal-server/opal_server/pubsub_resilience.py +++ b/packages/opal-server/opal_server/pubsub_resilience.py @@ -117,8 +117,8 @@ class ReconnectingBroadcaster(EventBroadcaster): loop. This subclass wraps the connect/subscribe/read cycle in a reconnect loop with bounded exponential backoff, so the reader task stays *pending* across transient outages. The task only completes on clean shutdown (cancellation) or - after ``reconnect_max_retries`` consecutive failures, in which case the existing - ``ignore_broadcaster_disconnected=False`` path triggers a graceful worker restart. + after ``reconnect_max_retries`` consecutive failures (give-up), in which case it + fires the registered give-up hook so OPAL graceful-restarts the worker. """ def __init__( @@ -144,16 +144,33 @@ def __init__( # Single lock serialises buffer mutation, the overflow flag, and the flush, # so a concurrent broadcast cannot race the flush's drain/flag-reset. self._buffer_lock = asyncio.Lock() - # (A) gap detection + single-flight resync hook. - self._had_prior_connection = False + # (A) single-flight resync hook. Gap detection itself is reader-task-local + # (see __read_notifications__), not an instance flag. self._recovery_task: Optional[asyncio.Task] = None + # Set when a gap arrives while a recovery is already in flight (single-flight), + # so the in-flight recovery loops once more rather than dropping that gap. + self._recovery_rerun_requested = False self._on_reconnect: Optional[ReconnectCallback] = None + # Fired once if the reader gives up (exhausts reconnect retries) and returns, + # so OPAL can graceful-restart the worker even with statistics disabled. + self._on_give_up: Optional[ReconnectCallback] = None def set_reconnect_callback(self, callback: Optional[ReconnectCallback]): """Register an ``async () -> None`` callback fired once after each gap (a reconnect that follows a previously-established connection).""" self._on_reconnect = callback + def set_give_up_callback(self, callback: Optional[ReconnectCallback]): + """Register an ``async () -> None`` callback fired once if the reader + gives up after exhausting ``reconnect_max_retries``. + + Fires only when the reader task completes by *returning* (give- + up), never on cancellation (clean shutdown), so OPAL can wire it + to a graceful worker restart without normal shutdown re- + triggering it. + """ + self._on_give_up = callback + async def start_reader_task(self): """Spawn the reconnecting reader task once. @@ -239,26 +256,56 @@ async def _buffer_outbound(self, topic, data, error: Exception): ) async def __read_notifications__(self): - """Read incoming broadcasts, reconnecting on backbone disconnect.""" + """Read incoming broadcasts, reconnecting on backbone disconnect. + + ``had_prior_connection`` is deliberately a task-local, not an instance + attribute: gap recovery must fire only for a reconnect *within this + reader task's own loop* (a real backbone gap). When the last client + disconnects, the upstream cancels this task and clears + ``_subscription_task``; the next client starts a *fresh* reader task, + which must start clean — an instance flag would carry a stale ``True`` + into that fresh task and schedule a spurious full recovery (flush + + client-recycling resync) on its very first connect, a churn loop that + also hits stats-off deployments. + """ attempt = 0 + had_prior_connection = False while True: try: channel = await self._ensure_connected() - attempt = 0 logger.info( f"Broadcaster listener connected to channel '{self._channel}'" ) async with channel.subscribe(channel=self._channel) as subscriber: # We are subscribed again; recover concurrently so we keep reading # (and can receive peers' replays) during the settle window. - if self._had_prior_connection: + if had_prior_connection: self._schedule_gap_recovery() - self._had_prior_connection = True + had_prior_connection = True + # A connect that ends without sustaining (no read) is a flap, not a + # healthy session, so only a sustained subscriber resets the attempt + # counter — otherwise a connect-OK/instant-close loop would never + # increment ``attempt`` and ``reconnect_max_retries`` could never trip. + sustained = False async for event in subscriber: + if not sustained: + sustained = True + attempt = 0 await self._handle_broadcast_event(event) - logger.warning( - "Broadcast subscriber ended (backbone connection closed); reconnecting" - ) + if sustained: + logger.warning( + "Broadcast subscriber ended (backbone connection closed); " + "reconnecting" + ) + else: + attempt += 1 + logger.warning( + "Broadcast subscriber ended immediately without sustaining " + f"(attempt {attempt}); treating as a failed reconnect" + ) + if self._gave_up(attempt): + await self._fire_give_up() + return except asyncio.CancelledError: logger.info("Broadcaster listener cancelled; stopping") await self._cancel_pending_tasks() @@ -266,19 +313,24 @@ async def __read_notifications__(self): except Exception as e: attempt += 1 logger.error(f"Broadcaster listener error (attempt {attempt}): {e!r}") - if ( - self._reconnect_max_retries - and attempt >= self._reconnect_max_retries - ): - logger.error( - f"Broadcaster reconnect exhausted after {attempt} attempts; " - "giving up so the worker can restart" - ) - break + if self._gave_up(attempt): + await self._fire_give_up() + return finally: await self._safe_disconnect_channel() await asyncio.sleep(self._backoff_seconds(attempt)) + def _gave_up(self, attempt: int) -> bool: + """Return whether ``reconnect_max_retries`` is exhausted (0 = retry + forever).""" + if self._reconnect_max_retries and attempt >= self._reconnect_max_retries: + logger.error( + f"Broadcaster reconnect exhausted after {attempt} attempts; " + "giving up so the worker can restart" + ) + return True + return False + async def _cancel_pending_tasks(self): # Cancel AND join child notify/recovery tasks so they fully unwind (releasing any # pinned listening context) before the reader re-raises its own cancellation. @@ -291,11 +343,16 @@ async def _cancel_pending_tasks(self): await asyncio.gather(*tasks, return_exceptions=True) def _schedule_gap_recovery(self): - # Single-flight: a flap during the settle window must not spawn a second + # Single-flight: a flap during a recovery must not spawn a second concurrent # recovery (concurrent flushes corrupt the buffer; double resync re-storms). + # Instead, request a rerun so a gap that lands while a recovery is in flight — + # including during the late ``_fire_reconnect`` phase, after the flush — is still + # flushed/resynced by one more loop of the in-flight recovery rather than dropped. if self._recovery_task is not None and not self._recovery_task.done(): - logger.debug("Gap recovery already in progress; not scheduling another") + logger.debug("Gap recovery already in progress; requesting a rerun") + self._recovery_rerun_requested = True return + self._recovery_rerun_requested = False self._recovery_task = asyncio.create_task(self._recover_after_gap()) self._tasks.add(self._recovery_task) self._recovery_task.add_done_callback(self._tasks.discard) @@ -307,13 +364,26 @@ async def _recover_after_gap(self): The whole recovery runs inside a pinned listening context so the reader task cannot be cancelled mid-recovery — neither by the resync hook closing every client nor by an unrelated drop to zero listeners during the settle window. + + Loops once more whenever a gap arrives during an iteration (single-flight rerun): + the rerun flag is cleared at the top of each iteration and re-checked after the + full pin -> settle -> flush -> fire body, so a gap landing at any point during the + body — clear happens first, check happens last — is captured and triggers exactly + one more iteration (single-threaded asyncio, no lock needed). """ try: - async with self.get_listening_context(): - if self._resync_settle_seconds > 0: - await asyncio.sleep(self._resync_settle_seconds) - await self._flush_outbound_buffer() - await self._fire_reconnect() + while True: + self._recovery_rerun_requested = False + async with self.get_listening_context(): + if self._resync_settle_seconds > 0: + await asyncio.sleep(self._resync_settle_seconds) + await self._flush_outbound_buffer() + await self._fire_reconnect() + if not self._recovery_rerun_requested: + break + logger.info( + "Gap arrived during recovery; rerunning flush + resync once" + ) except asyncio.CancelledError: raise except Exception: @@ -326,8 +396,8 @@ async def _flush_outbound_buffer(self): published WITHOUT the lock, so a concurrent failed broadcast can still buffer (and a slow/hung publish can't wedge the buffer lock). Items that can no longer be serialized are dropped (so one poison payload can't wedge the buffer); a - transport failure re-enqueues the unsent tail (in order, at the front) for the - next recovery. + transport failure re-enqueues the unsent (older) tail ahead of any concurrent + refill (newer) for the next recovery (see ``_requeue_unsent``). """ async with self._buffer_lock: if not self._outbound_buffer: @@ -363,8 +433,23 @@ async def _flush_outbound_buffer(self): f"on next recovery): {e!r}" ) if unsent: - async with self._buffer_lock: - self._outbound_buffer.extendleft(reversed(unsent)) + await self._requeue_unsent(unsent) + + async def _requeue_unsent(self, unsent: list): + """Re-enqueue the unsent (older) tail ahead of any concurrent refill + (newer). + + The publish above ran WITHOUT the lock, so concurrent failed broadcasts may have + refilled the buffer meanwhile. Rebuild under the lock as ``unsent + refill`` and + rely on the bounded deque dropping from the FRONT (oldest) on overflow — a plain + ``extendleft`` would instead evict the newest refill, inverting the drop-oldest + policy on a deque that is already at ``maxlen``. + """ + async with self._buffer_lock: + refill = list(self._outbound_buffer) + self._outbound_buffer.clear() + self._outbound_buffer.extend(unsent) + self._outbound_buffer.extend(refill) async def _fire_reconnect(self): if self._on_reconnect is None: @@ -376,6 +461,27 @@ async def _fire_reconnect(self): except Exception: logger.exception("Broadcaster on_reconnect callback failed") + async def _fire_give_up(self): + """Fire the give-up hook so OPAL can graceful-restart the worker. + + Called only from the give-up (returning) path of the reader + loop, never on cancellation, so a clean shutdown does not re- + trigger the restart. If no hook is wired, log loudly that this + worker now depends on the liveness probe. + """ + if self._on_give_up is None: + logger.error( + "Broadcaster gave up reconnecting and no give-up hook is wired; this " + "worker now requires the liveness probe (/healthcheck) to be restarted" + ) + return + try: + await self._on_give_up() + except asyncio.CancelledError: + raise + except Exception: + logger.exception("Broadcaster on_give_up callback failed") + async def _ensure_connected(self): if self.listening_broadcast_channel is None: self.listening_broadcast_channel = self._broadcast_type(self._broadcast_url) diff --git a/packages/opal-server/opal_server/server.py b/packages/opal-server/opal_server/server.py index 2fbbeeb8c..00de60747 100644 --- a/packages/opal-server/opal_server/server.py +++ b/packages/opal-server/opal_server/server.py @@ -147,6 +147,7 @@ def __init__( self.jwks_endpoint = None self.pubsub = PubSub(signer=self.signer, broadcaster_uri=broadcaster_uri) + self._wire_broadcaster_give_up() self.publisher: Optional[TopicPublisher] = None self.broadcast_keepalive: Optional[PeriodicPublisher] = None @@ -431,6 +432,35 @@ async def stop_server_background_tasks(self): except Exception: logger.exception("exception while shutting down background tasks") + def _wire_broadcaster_give_up(self): + """Graceful-restart the worker if the reconnecting broadcaster gives + up. + + When ``BROADCAST_RECONNECT_MAX_RETRIES`` is exhausted the reader task + completes by *returning*. With ``ignore_broadcaster_disconnected=False`` a + done reader cancels every client websocket, re-creating the drop storm. The + statistics path already restarts the worker on reader completion, but only + when ``STATISTICS_ENABLED`` — so with statistics off nothing restarts the + worker until the liveness probe acts. This broadcaster-level give-up hook + triggers the same graceful shutdown regardless of the statistics flag. It + fires only on give-up (return), never on cancellation (clean shutdown), so + normal shutdown does not re-trigger it. (When statistics are enabled both + this hook and the stats done-callback may fire; a second SIGTERM to an + already-terminating worker is a harmless no-op.) + """ + broadcaster = self.pubsub.broadcaster + if not isinstance(broadcaster, ReconnectingBroadcaster): + return + + async def _on_broadcaster_give_up(): + logger.error( + "Broadcaster gave up reconnecting to the backbone; " + "restarting this worker" + ) + self._graceful_shutdown() + + broadcaster.set_give_up_callback(_on_broadcaster_give_up) + def _graceful_shutdown(self): logger.info("Trigger worker graceful shutdown") os.kill(os.getpid(), signal.SIGTERM) diff --git a/packages/opal-server/opal_server/tests/broadcaster_consistency_integration_test.py b/packages/opal-server/opal_server/tests/broadcaster_consistency_integration_test.py index ab4a14472..fa72dcd18 100644 --- a/packages/opal-server/opal_server/tests/broadcaster_consistency_integration_test.py +++ b/packages/opal-server/opal_server/tests/broadcaster_consistency_integration_test.py @@ -250,7 +250,13 @@ async def on_reconnect(): @pytest.mark.asyncio async def test_resync_is_single_flight_across_a_flap_during_settle(): """A second disconnect during the settle window must not spawn a second - concurrent recovery (which would corrupt the buffer / double-resync).""" + concurrent recovery (which would corrupt the buffer / double-resync). + + Under the single-flight rerun semantics (F2) the second gap, landing while the + first recovery is still in its settle sleep, requests exactly one rerun: the hook + fires once for the gap and once for the rerun (2), never three concurrent + recoveries, and never zero (F8 — the old ``<= 2`` assertion passed even with 0). + """ bus = InMemoryBackbone() # Long settle so a second gap lands while the first recovery is still pending. a = Instance(bus, "A", resync_settle_seconds=0.3, replay_buffer_size=100) @@ -275,10 +281,15 @@ async def on_reconnect(): await _wait_for(lambda: bus.subscriber_count() == 1) # Let recoveries settle. await asyncio.sleep(1.0) - # Single-flight: the overlapping flap collapses into one recovery, not three. - assert len(calls) <= 2 + # The recovery actually fired (not 0) and the overlapping flap collapsed into a + # single rerun (not three concurrent recoveries). + assert 1 <= len(calls) <= 2 + # The rerun for the in-settle gap ran, so it fired exactly twice. + assert len(calls) == 2 assert a.broadcaster._recovery_task is not None assert a.broadcaster._recovery_task.done() + # The rerun flag is cleared once the recovery loop drains. + assert a.broadcaster._recovery_rerun_requested is False finally: await a.stop() diff --git a/packages/opal-server/opal_server/tests/healthcheck_endpoint_test.py b/packages/opal-server/opal_server/tests/healthcheck_endpoint_test.py index 937957b0d..9dbc4c07c 100644 --- a/packages/opal-server/opal_server/tests/healthcheck_endpoint_test.py +++ b/packages/opal-server/opal_server/tests/healthcheck_endpoint_test.py @@ -7,6 +7,7 @@ """ import contextlib +import pytest from opal_server.config import opal_server_config from opal_server.pubsub_resilience import ReconnectingBroadcaster from opal_server.server import OpalServer @@ -69,3 +70,35 @@ def test_healthcheck_kill_switch_keeps_it_ok_when_disabled(): client = TestClient(server.app) with _override_config(BROADCAST_HEALTHCHECK_ENABLED=False): assert client.get("/healthcheck").status_code == 200 + + +def _build_server_with_broadcaster(): + # A broadcaster_uri makes PubSub build a ReconnectingBroadcaster (the object only; + # no connection happens until the reader task starts), so the give-up wiring runs. + return OpalServer( + init_policy_watcher=False, + init_publisher=False, + broadcaster_uri="postgres://localhost/test", + enable_jwks_endpoint=False, + ) + + +def test_broadcaster_give_up_is_wired_to_graceful_shutdown(): + # F3(a): a reconnecting broadcaster that gives up must restart the worker regardless + # of the statistics flag, via a broadcaster-level give-up hook OPAL wires here. + server = _build_server_with_broadcaster() + assert isinstance(server.pubsub.broadcaster, ReconnectingBroadcaster) + # The give-up hook was registered during OpalServer construction. + assert server.pubsub.broadcaster._on_give_up is not None + + +@pytest.mark.asyncio +async def test_broadcaster_give_up_hook_triggers_graceful_shutdown(): + # Firing the wired hook calls _graceful_shutdown (patched so the test does not + # actually SIGTERM the test process). + server = _build_server_with_broadcaster() + shutdowns = [] + server._graceful_shutdown = lambda: shutdowns.append(1) + + await server.pubsub.broadcaster._fire_give_up() + assert shutdowns == [1] diff --git a/packages/opal-server/opal_server/tests/reconnecting_broadcaster_test.py b/packages/opal-server/opal_server/tests/reconnecting_broadcaster_test.py index bc0eb1a8d..c90d771cd 100644 --- a/packages/opal-server/opal_server/tests/reconnecting_broadcaster_test.py +++ b/packages/opal-server/opal_server/tests/reconnecting_broadcaster_test.py @@ -8,6 +8,7 @@ proving these tests actually catch the regression. """ import asyncio +from contextlib import asynccontextmanager import pytest from fastapi_websocket_pubsub import EventBroadcaster @@ -17,6 +18,18 @@ _END = object() +def _noop_listening_context(): + """Stand-in for the broadcaster's pinned listening context that does + nothing, so a unit test can exercise the recovery loop without a real + backbone reader.""" + + @asynccontextmanager + async def _ctx(): + yield + + return _ctx() + + class _Event: def __init__(self, message): self.message = message @@ -538,20 +551,63 @@ async def test_flush_partial_replay_requeues_unsent(): @pytest.mark.asyncio async def test_single_flight_recovery_dedupes(): - # A second gap while a recovery is still in flight must not spawn a second recovery. + # A second gap while a recovery is still in flight must not spawn a second recovery; + # instead it requests a rerun so the in-flight recovery loops once more (F2). broadcaster = _make_broadcaster() blocker = asyncio.Event() broadcaster._recovery_task = asyncio.create_task(blocker.wait()) broadcaster._tasks.add(broadcaster._recovery_task) first = broadcaster._recovery_task + assert broadcaster._recovery_rerun_requested is False broadcaster._schedule_gap_recovery() # second gap during the in-flight recovery assert broadcaster._recovery_task is first # single-flight: no new recovery task + assert broadcaster._recovery_rerun_requested is True # but a rerun is requested blocker.set() await first +@pytest.mark.asyncio +async def test_recovery_reruns_for_gap_during_late_phase(): + # F2: a gap that lands during the LATE recovery phase (after the flush, while the + # reconnect hook runs) must still be flushed/resynced by exactly one more loop — + # not dropped because a recovery was already in flight. + broadcaster = _make_broadcaster() + broadcaster._resync_settle_seconds = 0 + fire_count = 0 + release_first_fire = asyncio.Event() + + async def on_reconnect(): + nonlocal fire_count + fire_count += 1 + if fire_count == 1: + # A gap arriving during the first recovery's late (fire) phase routes to the + # in-flight recovery (which is broadcaster._recovery_task) and sets the flag. + broadcaster._schedule_gap_recovery() + # Block until the test confirms the rerun was requested mid-iteration. + await release_first_fire.wait() + + broadcaster.set_reconnect_callback(on_reconnect) + # Isolate the rerun-loop semantics from the real listening-context pin (which is + # covered by test_recovery_pins_reader_across_client_recycle). + broadcaster.get_listening_context = _noop_listening_context + + # Start the recovery through the scheduler so broadcaster._recovery_task is the live + # task — that is what the nested _schedule_gap_recovery sees as "already in flight". + broadcaster._schedule_gap_recovery() + recovery = broadcaster._recovery_task + await _wait_for(lambda: fire_count == 1) + # The gap during the late phase requested a rerun on the live recovery. + assert broadcaster._recovery_rerun_requested is True + release_first_fire.set() + + await asyncio.wait_for(recovery, timeout=2) + # The recovery looped exactly once more: the hook fired twice, then settled. + assert fire_count == 2 + assert broadcaster._recovery_rerun_requested is False + + @pytest.mark.asyncio async def test_reader_recovers_within_max_retries(): # max_retries=5 but only 3 transient failures, then success: the reader recovers and @@ -614,7 +670,6 @@ async def on_reconnect(): observed["reader_alive"] = not reader.done() broadcaster.set_reconnect_callback(on_reconnect) - broadcaster._had_prior_connection = True await broadcaster._recover_after_gap() @@ -625,3 +680,154 @@ async def on_reconnect(): reader.cancel() with pytest.raises(asyncio.CancelledError): await reader + + +def _count_gap_recoveries(broadcaster) -> list: + """Replace _schedule_gap_recovery with a no-op counter so a test can assert + whether a (re)connect treated itself as a gap, without running the real + recovery machinery.""" + scheduled = [] + broadcaster._schedule_gap_recovery = lambda: scheduled.append(1) + return scheduled + + +@pytest.mark.asyncio +async def test_fresh_reader_task_does_not_recover_on_first_connect(): + # F1: gap detection is reader-task-local. When the last client disconnects the + # upstream cancels the reader and clears _subscription_task; the NEXT client starts a + # fresh reader task, whose first connect must NOT be treated as a gap (no spurious + # flush + client-recycling resync). Only a reconnect WITHIN a task's own loop is a gap. + bus = FakeBus() + broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=FakeNotifier(), + channel="test", + broadcast_type=bus.channel_factory, + reconnect_backoff_min=0, + reconnect_backoff_max=0, + ) + scheduled = _count_gap_recoveries(broadcaster) + + # First reader task: its initial connect is not a gap. + first = await broadcaster.start_reader_task() + await _wait_for(lambda: bus.subscribes >= 1) + assert scheduled == [] + + # The last client disconnects: upstream cancels the reader and resets the handle. + first.cancel() + with pytest.raises(asyncio.CancelledError): + await first + broadcaster._subscription_task = None + + # A new client arrives: a FRESH reader task starts. Its first connect must NOT + # schedule a gap recovery, even though a prior connection existed on this instance. + second = await broadcaster.start_reader_task() + assert second is not first + await _wait_for(lambda: bus.subscribes >= 2) + await asyncio.sleep(0.05) + assert scheduled == [] # fresh task starts clean — no stale-instance-flag recovery + + try: + # A real backbone gap WITHIN this task's loop (a drop + reconnect) does recover. + await bus.drop() + await _wait_for(lambda: bus.subscribes >= 3) + await _wait_for(lambda: len(scheduled) == 1) + finally: + second.cancel() + with pytest.raises(asyncio.CancelledError): + await second + + +@pytest.mark.asyncio +async def test_flap_loop_counts_toward_give_up(): + # F3(b): a connect-OK / instant-close flap loop must increment the attempt counter + # (the subscriber never sustains a read), so reconnect_max_retries trips and the + # reader completes — rather than looping forever because a connect resets the counter. + bus = FakeBus() + give_up = [] + + broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=FakeNotifier(), + channel="test", + broadcast_type=bus.channel_factory, + reconnect_max_retries=3, + reconnect_backoff_min=0, + reconnect_backoff_max=0, + ) + + async def on_give_up(): + give_up.append(1) + + broadcaster.set_give_up_callback(on_give_up) + + # Each subscribe connects fine, then the subscriber ends immediately (a flap): the + # shared read queue is pre-loaded with _END markers, so every session reads one _END + # and stops without ever sustaining a read. Three such flaps exhaust max_retries=3. + for _ in range(5): + await bus.drop() + + task = await broadcaster.start_reader_task() + + # Three sub-threshold sessions exhaust the retry budget and the reader returns. + await asyncio.wait_for(task, timeout=2) + assert task.done() + assert task.exception() is None + assert give_up == [1] # the give-up hook fired exactly once (on the returning path) + + +@pytest.mark.asyncio +async def test_give_up_hook_not_fired_on_cancellation(): + # F3(a): the give-up hook is for give-up (return), NOT clean shutdown (cancellation). + bus = FakeBus() + give_up = [] + broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=FakeNotifier(), + channel="test", + broadcast_type=bus.channel_factory, + reconnect_backoff_min=0, + reconnect_backoff_max=0, + ) + + async def on_give_up(): + give_up.append(1) + + broadcaster.set_give_up_callback(on_give_up) + + task = await broadcaster.start_reader_task() + await _wait_for(lambda: bus.subscribes >= 1) + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + await asyncio.sleep(0.05) + assert give_up == [] # cancellation must not look like a give-up + + +@pytest.mark.asyncio +async def test_partial_replay_requeue_preserves_drop_oldest(): + # F5: when a transport failure re-enqueues the unsent (older) tail AND concurrent + # failures refilled the buffer (newer), the bounded deque must drop the OLDEST from + # the front, keeping order unsent-before-refill — not evict the newest refill. + broadcaster = ReconnectingBroadcaster( + "memory://", + notifier=FakeNotifier(), + channel="test", + replay_buffer_size=3, + ) + # Simulate a concurrent refill that landed during the lock-free publish: two newer + # entries already sit in the buffer when the unsent tail comes back. + broadcaster._outbound_buffer.append(("policy_data", {"n": "refill-1"})) + broadcaster._outbound_buffer.append(("policy_data", {"n": "refill-2"})) + + unsent = [("policy_data", {"n": "unsent-1"}), ("policy_data", {"n": "unsent-2"})] + await broadcaster._requeue_unsent(unsent) + + # maxlen=3: unsent (older) go in front, then refill (newer); overflow drops oldest. + assert broadcaster._outbound_buffer.maxlen == 3 + assert [data["n"] for _, data in broadcaster._outbound_buffer] == [ + "unsent-2", + "refill-1", + "refill-2", + ] + # The OLDEST (unsent-1) was dropped from the front, NOT the newest refill.