Skip to content

[serve] Invalidate replica queue-length cache on gRPC request failures#64398

Open
rohitsudhakar1 wants to merge 2 commits into
ray-project:masterfrom
rohitsudhakar1:fix-63261-grpc-completion-callback
Open

[serve] Invalidate replica queue-length cache on gRPC request failures#64398
rohitsudhakar1 wants to merge 2 commits into
ray-project:masterfrom
rohitsudhakar1:fix-63261-grpc-completion-callback

Conversation

@rohitsudhakar1

Copy link
Copy Markdown

Why are these changes needed?

When a DeploymentHandle uses gRPC transport (_by_reference=False), the AsyncioRouter's request-completion done-callback receives the raw grpc.aio.Call object, whereas the actor transport invokes the same callback with the request's result or a RayError.

Because of this mismatch, Router._process_finished_request can never detect a failed replica on the gRPC path:

  • _get_actor_died_error(result) returns None (the result is a grpc.aio.Call, not an ActorDiedError/RayTaskError).
  • isinstance(result, ActorUnavailableError) is False.

Both branches are skipped silently, so the replica's queue-length cache entry is not invalidated. After a gRPC failure, power-of-two-choices routing keeps selecting the dead replica until either the next request's rejection path invalidates the cache, or the controller's long-poll pushes a new replica set. The actor path (_by_reference=True, default) is unaffected.

Fix

Normalize a failed gRPC call (StatusCode.UNAVAILABLE) into ActorUnavailableError inside gRPCReplicaResult.add_done_callback, matching the actor-transport callback contract. This reuses the existing translation already used on the data path (_process_grpc_response / get_rejection_response), so the router's existing logic invalidates the cache and reroutes with no router changes.

CANCELLED is intentionally left untouched in the done-callback path, since there it is most often a client-initiated cancellation rather than a replica failure (turning it into a retryable error would invalidate the cache spuriously).

Related issue number

Closes #63261

Checks

  • I've signed off every commit (-s) ... (will add DCO sign-off if required by CI)
  • I've added unit tests covering the UNAVAILABLE, OK, and CANCELLED outcomes (python/ray/serve/tests/unit/test_grpc_replica_result.py).
  • black==22.10.0 and ruff==0.8.4 (repo-pinned) pass on the changed files.

@rohitsudhakar1 rohitsudhakar1 requested a review from a team as a code owner June 28, 2026 04:18
When a DeploymentHandle uses gRPC transport (`_by_reference=False`), the
AsyncioRouter's request-completion done-callback received the raw
`grpc.aio.Call` object instead of the request result/error that the actor
transport delivers. As a result `Router._process_finished_request` could
never detect a failed replica, so the queue-length cache was not invalidated
and power-of-two-choices routing kept selecting the dead replica until either
the next request's rejection path or a controller long-poll refreshed it.

Normalize a failed gRPC call (`StatusCode.UNAVAILABLE`) into
`ActorUnavailableError` inside `gRPCReplicaResult.add_done_callback`, matching
the actor-transport callback contract, so the existing router logic invalidates
the cache and reroutes with no router changes. CANCELLED is intentionally left
as-is to avoid invalidating the cache on client-initiated cancellations.

Adds unit tests covering UNAVAILABLE, OK, and CANCELLED outcomes.

Fixes ray-project#63261

Signed-off-by: rohitsudhakar1 <maxiboi1611@gmail.com>

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces normalization of failed gRPC calls into ActorUnavailableError within gRPCReplicaResult.add_done_callback to align with the actor-transport contract and prevent dead replicas from being silently ignored. It also adds corresponding unit tests to verify this behavior. The review feedback suggests optimizing the callback execution by avoiding run_coroutine_threadsafe when already running on the target event loop, and replacing the polling loop in the unit tests with an asyncio.Event to prevent flakiness.

Comment on lines +603 to +611
coro = _invoke()
try:
run_coroutine_threadsafe(coro, self._grpc_call_loop)
except RuntimeError:
# Event loop is no longer running (e.g. interpreter shutdown);
# fall back to the previous behavior rather than dropping the
# callback entirely.
coro.close()
callback(call)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When _on_done is called, it is almost always executed on the same event loop thread (self._grpc_call_loop). Using run_coroutine_threadsafe in this case introduces unnecessary overhead (such as thread-safe queues and locking). We can optimize this by checking if we are already running on the target event loop and using create_task directly, falling back to run_coroutine_threadsafe only if called from a different thread or during shutdown.

            try:
                current_loop = asyncio.get_running_loop()
            except RuntimeError:
                current_loop = None

            if current_loop is self._grpc_call_loop and current_loop.is_running():
                current_loop.create_task(_invoke())
            else:
                coro = _invoke()
                try:
                    run_coroutine_threadsafe(coro, self._grpc_call_loop)
                except RuntimeError:
                    coro.close()
                    callback(call)

Comment on lines +441 to +450
received = []
result.add_done_callback(lambda r: received.append(r))
fake_call.complete()
# Normalization resolves the status code on the call's loop, so yield
# control until the callback has been invoked.
for _ in range(100):
if received:
break
await asyncio.sleep(0.01)
assert received, "done-callback was never invoked"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using a polling loop with asyncio.sleep(0.01) to wait for the callback can be flaky and inefficient. We can make this test completely deterministic and faster by using an asyncio.Event to wait for the callback to be invoked.

        event = asyncio.Event()
        received = []

        def callback(r):
            received.append(r)
            event.set()

        result.add_done_callback(callback)
        fake_call.complete()

        try:
            await asyncio.wait_for(event.wait(), timeout=1.0)
        except asyncio.TimeoutError:
            pytest.fail("done-callback was never invoked")

… test

Use loop.create_task when the gRPC done-callback already runs on the call's
loop (the common case), avoiding run_coroutine_threadsafe overhead, and fall
back to run_coroutine_threadsafe only when invoked from another thread or
during shutdown. Replace the test's polling loop with an asyncio.Event for
deterministic waiting.

Signed-off-by: rohitsudhakar1 <maxiboi1611@gmail.com>
@rohitsudhakar1

Copy link
Copy Markdown
Author

Thanks for the review — addressed both points in aba9c20:

  1. Avoid run_coroutine_threadsafe on the hot path: since grpc.aio fires done-callbacks on the call's own loop, _on_done now uses loop.create_task(...) when already running on self._grpc_call_loop, and only falls back to run_coroutine_threadsafe when invoked from another thread or during shutdown.
  2. Deterministic test: replaced the asyncio.sleep polling loop with an asyncio.Event + asyncio.wait_for.

@ray-gardener ray-gardener Bot added serve Ray Serve Related Issue community-contribution Contributed by the community labels Jun 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community serve Ray Serve Related Issue

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[serve] Router skips cache invalidation on gRPC request failure

1 participant