[Data] Fix streaming-generator hang on plasma-resident return object#64386
Conversation
PR ray-project#64014 made ObjectRefGenerator._next_sync honor the caller's timeout_s for the end-of-stream ray.get(generator_ref). Ray Data's DataOpTask.on_data_ready polls with timeout_s=0, so once the stream is exhausted that get runs with a 0 timeout. When the generator's return object lives in plasma (e.g. max_direct_call_object_size=0), a 0-timeout get issues an async plasma pull and then immediately cancels it on return, so the object never arrives, the task is never observed as finished, and the streaming executor spins forever (e.g. test_parquet_read_spread timed out at 180s). Fix at the Ray Data call site: poll with timeout_s=0 while the task is still producing, but once the stream is exhausted use a bounded blocking timeout (METADATA_GET_TIMEOUT_S) so the return object can be pulled. On timeout (e.g. object lost to a dead node) it falls through and retries, preserving PR ray-project#64014's intent of not blocking the scheduler indefinitely. Adds ObjectRefGenerator._stream_exhausted(), a non-blocking in-memory end-of-stream check used to select the timeout. Signed-off-by: goutam <goutam@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a non-blocking _stream_exhausted check on the object reference generator to determine if the end-of-stream marker has been reached. In physical_operator.py, this check is used to conditionally apply a bounded blocking timeout (METADATA_GET_TIMEOUT_S) instead of a zero timeout when pulling the generator's return object, preventing immediate cancellation of the pull. The review feedback suggests defensively checking for the existence of _stream_exhausted using getattr to avoid potential AttributeErrors if the generator is mocked or wrapped.
| next_timeout_s = ( | ||
| METADATA_GET_TIMEOUT_S | ||
| if self._streaming_gen._stream_exhausted() | ||
| else 0 | ||
| ) |
There was a problem hiding this comment.
To ensure backward compatibility and prevent potential AttributeErrors in unit tests or other environments where self._streaming_gen might be mocked or wrapped (and thus may not implement the newly added _stream_exhausted method), it is safer to use getattr to defensively check for the method's existence before calling it.
stream_exhausted_fn = getattr(self._streaming_gen, "_stream_exhausted", None)
next_timeout_s = (
METADATA_GET_TIMEOUT_S
if stream_exhausted_fn and stream_exhausted_fn()
else 0
)
Summary
test_parquet_read_spread(and any read whose generator task return object lands in plasma) hangs until the 180s test timeout. The streaming executor reaches200/200rows but the consumer starves forever, blocked incore_worker.get_objectsviaObjectRefGenerator._next_sync.Culprit: #64014
#64014 changed
ObjectRefGenerator._next_syncto honor the caller'stimeout_sfor the end-of-streamray.get(generator_ref)(previously unbounded). That get is what distinguishes "task finished cleanly" (StopIteration) from "task errored", once all yielded blocks are consumed.Ray Data's
DataOpTask.on_data_readypolls the generator non-blocking withtimeout_s=0. After #64014, once the stream is exhausted that same call runsray.get(generator_ref, timeout=0).Why
timeout=0deadlocks on a plasma return objecttest_parquet_read_spreadsets_system_config={"max_direct_call_object_size": 0}, which forces the generator's return object into plasma (it isn't inlined with the owner). Withtimeout=0,CoreWorkerPlasmaStoreProvider::Get:AsyncGetObjects), returning aScopedResponsecleanup handler,remaining_timeout=0→batch_timeout=0), andScopedResponsedestructor firesCancelGetRequest.So every poll re-issues and instantly cancels the pull; the return object never lands locally,
_next_syncalways returns a nil ref, the task is never observed as finished, and the executor spins forever.In normal runs the return object is tiny and inlined (in-memory store), where
timeout=0resolves instantly — which is why only configs that push the return object into plasma (this test'smax_direct_call_object_size=0, or the dead-node case #64014 targeted) expose it.Fix
Keep
_next_sync's singletimeout_sand decide the value at the Ray Data call site based on stream phase:timeout_s=0. The scheduling thread must not block waiting for a block that's still in flight; if it isn't ready, move on and service other operators. A blocking timeout here would stall the whole scheduling loop on every poll.METADATA_GET_TIMEOUT_S(1.0s). The call's purpose flips from "peek for more data" to "finalize this task": the only remaining work is the one-time terminalray.getof the return object, which needs a brief, bounded wait so a plasma-resident object can actually be pulled.The bounded value is needed specifically when the stream is exhausted and the generator return object is not yet local (e.g. resides in plasma on another node). On timeout — e.g. the object was lost to a dead node and needs reconstruction — it returns nil and the scheduler retries on a later loop, preserving #64014's "don't block the scheduler indefinitely" guarantee.