[core] impl backpressure for async streaming generators#64383
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces non-blocking backpressure support for async streaming generators in Ray by utilizing an asyncio.Event and a C++ trampoline callback to wake the event loop when objects are consumed, preventing thread blockage. The changes span the Cython interface, C++ core worker, and include comprehensive test coverage. The review feedback highlights a potential segmentation fault in NotifyAsyncGeneratorBackpressureUnblock due to a missing null check on the callback, and suggests explicitly clearing the event and loop references in the finally block of execute_streaming_generator_async to avoid reference leaks.
5cc2a06 to
6c12d76
Compare
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
6c12d76 to
c541446
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c541446aa1
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if context.actor_backpressure_metadata.get() != NULL: | ||
| await loop.run_in_executor( | ||
| executor, | ||
| _reserve_actor_generator_slot, | ||
| context, | ||
| ) | ||
| await _async_reserve_actor_generator_slot(context) | ||
| output = await gen.asend(stats) |
There was a problem hiding this comment.
Check cancellation before resuming async generators
When an async streaming generator's owner dies while it is backpressured, HandleOwnerDied tears down the actor metadata and marks generator_id.TaskId() canceled; TryReserveSlot then returns true for the dead task so this await can complete, but the async path immediately calls gen.asend(...) without the IsTaskCanceled guard that the sync path has. In that owner-death scenario the actor can run another iteration of user code after the caller is gone, causing side effects or expensive work and delaying the actor slot from being released.
Useful? React with 👍 / 👎.
|
@karticam @sampan-s-nayak can you help review pls? |
Allow setting
_actor_generator_backpressure_num_objectsand_generator_backpressure_num_objectson async streaming generators by using asyncio.Event.