From bb408f428d91a03a93c5422927a9f1507e8a6158 Mon Sep 17 00:00:00 2001 From: rohitsudhakar1 Date: Sat, 27 Jun 2026 20:54:54 -0700 Subject: [PATCH 1/2] [serve] Invalidate replica queue-length cache on gRPC request failures When a DeploymentHandle uses gRPC transport (`_by_reference=False`), the AsyncioRouter's request-completion done-callback received the raw `grpc.aio.Call` object instead of the request result/error that the actor transport delivers. As a result `Router._process_finished_request` could never detect a failed replica, so the queue-length cache was not invalidated and power-of-two-choices routing kept selecting the dead replica until either the next request's rejection path or a controller long-poll refreshed it. Normalize a failed gRPC call (`StatusCode.UNAVAILABLE`) into `ActorUnavailableError` inside `gRPCReplicaResult.add_done_callback`, matching the actor-transport callback contract, so the existing router logic invalidates the cache and reroutes with no router changes. CANCELLED is intentionally left as-is to avoid invalidating the cache on client-initiated cancellations. Adds unit tests covering UNAVAILABLE, OK, and CANCELLED outcomes. Fixes #63261 Signed-off-by: rohitsudhakar1 --- python/ray/serve/_private/replica_result.py | 62 +++++++++++++- .../tests/unit/test_grpc_replica_result.py | 83 +++++++++++++++++++ 2 files changed, 144 insertions(+), 1 deletion(-) diff --git a/python/ray/serve/_private/replica_result.py b/python/ray/serve/_private/replica_result.py index 28207ca7c29c..7dbfa8b1fa91 100644 --- a/python/ray/serve/_private/replica_result.py +++ b/python/ray/serve/_private/replica_result.py @@ -575,7 +575,67 @@ async def __anext__(self): return await asyncio.wrap_future(fut) def add_done_callback(self, callback: Callable): - self._call.add_done_callback(callback) + """Register ``callback``, invoked when the underlying RPC completes. + + The actor transport invokes done-callbacks with the request's result + or a ``RayError`` (see ``ActorReplicaResult.add_done_callback``). + ``grpc.aio`` instead invokes them with the raw ``grpc.aio.Call``, which + is opaque to consumers that rely on the actor-transport contract -- most + importantly the router's request-completion handler, which invalidates + the queue-length cache when a replica becomes unavailable (see + ``Router._process_finished_request``). Without normalization a failed + gRPC request is silently ignored, so a dead replica keeps getting + selected by power-of-two-choices routing until another code path probes + it (https://github.com/ray-project/ray/issues/63261). + + Normalize a failed call into the same ``ActorUnavailableError`` the data + path raises (see ``_process_grpc_response`` / ``get_rejection_response``) + so behavior is consistent across transports. + """ + + def _on_done(call: grpc.aio.Call): + # The status code is only available via a coroutine, so resolve it + # on the call's loop before invoking ``callback``. The call is + # already complete here, so this resolves without blocking. + async def _invoke(): + callback(await self._normalize_done_result(call)) + + coro = _invoke() + try: + run_coroutine_threadsafe(coro, self._grpc_call_loop) + except RuntimeError: + # Event loop is no longer running (e.g. interpreter shutdown); + # fall back to the previous behavior rather than dropping the + # callback entirely. + coro.close() + callback(call) + + self._call.add_done_callback(_on_done) + + async def _normalize_done_result(self, call: grpc.aio.Call) -> Any: + """Map a completed gRPC call to the actor-transport callback contract. + + Returns an ``ActorUnavailableError`` if the replica was unreachable, + otherwise the ``call`` itself (preserving previous behavior). + """ + try: + code = await call.code() + except Exception: + # If the status can't be determined, don't mask the outcome. + return call + + # UNAVAILABLE means the replica's gRPC server was unreachable, so the + # request never completed on a live replica. Treat it like a + # RayActorError so the router invalidates its cache and reroutes; if the + # replica is actually dead the router learns that via active probing. + # (CANCELLED is intentionally excluded: in the done-callback path it is + # most often a client-initiated cancellation, not a replica failure.) + if code == grpc.StatusCode.UNAVAILABLE: + return ActorUnavailableError( + "Actor is unavailable.", self._actor_id.binary() + ) + + return call def cancel(self): self._call.cancel() diff --git a/python/ray/serve/tests/unit/test_grpc_replica_result.py b/python/ray/serve/tests/unit/test_grpc_replica_result.py index d618a4671c88..1aecfc4bc9a7 100644 --- a/python/ray/serve/tests/unit/test_grpc_replica_result.py +++ b/python/ray/serve/tests/unit/test_grpc_replica_result.py @@ -2,10 +2,12 @@ import sys import threading +import grpc import pytest from ray import ActorID, cloudpickle from ray._common.test_utils import wait_for_condition +from ray.exceptions import ActorUnavailableError from ray.serve._private.common import RequestMetadata from ray.serve._private.replica_result import gRPCReplicaResult from ray.serve.generated import serve_pb2 @@ -387,5 +389,86 @@ async def test_streaming_error_async(self, create_asyncio_event_loop_in_thread): await replica_result.__anext__() +class FakegRPCCallWithStatus: + """Minimal fake of a completed ``grpc.aio.Call`` for done-callback tests. + + ``grpc.aio`` invokes done-callbacks with the call object itself and exposes + the final status only via the async ``code()`` method, so we mirror that. + """ + + def __init__(self, code: grpc.StatusCode): + self._loop = asyncio.get_running_loop() + self._code = code + self._done_callbacks = [] + + def add_done_callback(self, cb): + self._done_callbacks.append(cb) + + async def code(self) -> grpc.StatusCode: + return self._code + + def complete(self): + # grpc invokes done-callbacks with the call object itself. + for cb in self._done_callbacks: + cb(self) + + +@pytest.mark.asyncio +class TestDoneCallbackNormalization: + """gRPCReplicaResult.add_done_callback must normalize a failed call into the + same shape the actor transport delivers, so the router's completion handler + can invalidate its queue-length cache for the dead replica. See + https://github.com/ray-project/ray/issues/63261. + """ + + def make_result(self, code: grpc.StatusCode): + fake_call = FakegRPCCallWithStatus(code) + result = gRPCReplicaResult( + fake_call, + metadata=RequestMetadata( + request_id="", + internal_request_id="", + is_streaming=False, + _on_separate_loop=False, + ), + actor_id=ActorID(b"2" * 16), + loop=asyncio.get_running_loop(), + ) + return result, fake_call + + async def _fire_and_capture(self, code: grpc.StatusCode): + result, fake_call = self.make_result(code) + received = [] + result.add_done_callback(lambda r: received.append(r)) + fake_call.complete() + # Normalization resolves the status code on the call's loop, so yield + # control until the callback has been invoked. + for _ in range(100): + if received: + break + await asyncio.sleep(0.01) + assert received, "done-callback was never invoked" + return received[0], fake_call + + async def test_unavailable_normalized_to_actor_unavailable(self): + """A failed (UNAVAILABLE) call is surfaced as ActorUnavailableError so the + router invalidates its cache instead of silently ignoring the failure.""" + received, _ = await self._fire_and_capture(grpc.StatusCode.UNAVAILABLE) + assert isinstance(received, ActorUnavailableError) + + async def test_ok_passes_through_call(self): + """A successful call preserves the previous behavior: the callback + receives the call object, not a synthesized error.""" + received, fake_call = await self._fire_and_capture(grpc.StatusCode.OK) + assert received is fake_call + + async def test_cancelled_not_treated_as_failure(self): + """CANCELLED (typically a client-initiated cancellation) must NOT be + converted into a retryable ActorUnavailableError.""" + received, fake_call = await self._fire_and_capture(grpc.StatusCode.CANCELLED) + assert not isinstance(received, ActorUnavailableError) + assert received is fake_call + + if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__])) From aba9c202c0fba222dd5cabe5481a812eee1cd539 Mon Sep 17 00:00:00 2001 From: rohitsudhakar1 Date: Sat, 27 Jun 2026 21:26:55 -0700 Subject: [PATCH 2/2] [serve] Address review: schedule on call loop directly; deterministic test Use loop.create_task when the gRPC done-callback already runs on the call's loop (the common case), avoiding run_coroutine_threadsafe overhead, and fall back to run_coroutine_threadsafe only when invoked from another thread or during shutdown. Replace the test's polling loop with an asyncio.Event for deterministic waiting. Signed-off-by: rohitsudhakar1 --- python/ray/serve/_private/replica_result.py | 23 +++++++++++++++---- .../tests/unit/test_grpc_replica_result.py | 21 ++++++++++------- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/python/ray/serve/_private/replica_result.py b/python/ray/serve/_private/replica_result.py index 7dbfa8b1fa91..a23a8c554a09 100644 --- a/python/ray/serve/_private/replica_result.py +++ b/python/ray/serve/_private/replica_result.py @@ -595,18 +595,31 @@ def add_done_callback(self, callback: Callable): def _on_done(call: grpc.aio.Call): # The status code is only available via a coroutine, so resolve it - # on the call's loop before invoking ``callback``. The call is - # already complete here, so this resolves without blocking. + # before invoking ``callback``. The call is already complete here, so + # the coroutine resolves without blocking. async def _invoke(): callback(await self._normalize_done_result(call)) + # grpc.aio fires done-callbacks on the call's own loop, so in the + # common case schedule directly there and avoid the thread-safe + # queue/locking overhead of run_coroutine_threadsafe. + try: + current_loop = asyncio.get_running_loop() + except RuntimeError: + current_loop = None + + if current_loop is self._grpc_call_loop and current_loop.is_running(): + current_loop.create_task(_invoke()) + return + + # Called from a different thread, or no running loop (e.g. + # interpreter shutdown): hop onto the call's loop, falling back to + # the previous behavior if that loop is gone rather than dropping + # the callback entirely. coro = _invoke() try: run_coroutine_threadsafe(coro, self._grpc_call_loop) except RuntimeError: - # Event loop is no longer running (e.g. interpreter shutdown); - # fall back to the previous behavior rather than dropping the - # callback entirely. coro.close() callback(call) diff --git a/python/ray/serve/tests/unit/test_grpc_replica_result.py b/python/ray/serve/tests/unit/test_grpc_replica_result.py index 1aecfc4bc9a7..1e474f375037 100644 --- a/python/ray/serve/tests/unit/test_grpc_replica_result.py +++ b/python/ray/serve/tests/unit/test_grpc_replica_result.py @@ -438,16 +438,21 @@ def make_result(self, code: grpc.StatusCode): async def _fire_and_capture(self, code: grpc.StatusCode): result, fake_call = self.make_result(code) + event = asyncio.Event() received = [] - result.add_done_callback(lambda r: received.append(r)) + + def callback(r): + received.append(r) + event.set() + + result.add_done_callback(callback) fake_call.complete() - # Normalization resolves the status code on the call's loop, so yield - # control until the callback has been invoked. - for _ in range(100): - if received: - break - await asyncio.sleep(0.01) - assert received, "done-callback was never invoked" + # Normalization resolves the status code asynchronously on the call's + # loop; wait deterministically for the callback to be invoked. + try: + await asyncio.wait_for(event.wait(), timeout=2.0) + except asyncio.TimeoutError: + pytest.fail("done-callback was never invoked") return received[0], fake_call async def test_unavailable_normalized_to_actor_unavailable(self):