Skip to content

[Data] Fetch block metadata on a background thread (defer RefBundle emits + batched ray.get)#63701

Closed
xinyuangui2 wants to merge 60 commits into
ray-project:masterfrom
xinyuangui2:batch-meta-fetch
Closed

[Data] Fetch block metadata on a background thread (defer RefBundle emits + batched ray.get)#63701
xinyuangui2 wants to merge 60 commits into
ray-project:masterfrom
xinyuangui2:batch-meta-fetch

Conversation

@xinyuangui2

@xinyuangui2 xinyuangui2 commented May 28, 2026

Copy link
Copy Markdown
Contributor

What

At scale the streaming scheduler does a blocking ray.get(meta_ref) for every (block_ref, meta_ref) pair inside process_completed_tasks — the largest leaf cost on the scheduling thread (~1–2 ms/call, 50k+ pairs/iter at 5k workers). This PR moves that fetch off the scheduling thread onto a background thread.

Two changes make that possible:

  1. Defer emits; size blocks locally. on_data_ready no longer emits RefBundles inline. It pulls each ready pair, appends it to a per-op deferred list, and returns. The output-budget accounting uses the block's local object_size (get_local_object_locations, no RPC) instead of meta.size_bytes — the driver owns every block ref the streaming generator yields, so the size is available locally. (Rare fallback: a short, timeout-bounded ray.get(meta) + warning if the local size is missing.)
  2. Fetch metadata on a background thread. A dedicated MetadataPrefetcher thread does the ray.get(meta_refs). Each scheduling iteration submits the new pairs and drains whatever metadata has already landed, emitting each op's bundles in order.

Design

The blocking ray.get moves off the scheduling loop onto a dedicated thread. The two threads are coupled only by one thread-safe queue (_request_q); fetched bytes come back via _results. submit and drain both run inside the per-op loop / scheduling iteration; the background ray.get runs outside it.

flowchart TB
    subgraph SCHED["Scheduling thread — process_completed_tasks (each iteration)"]
        ODR["on_data_ready()<br/>• pull (block_ref, meta_ref)<br/>• read local object_size (no RPC)<br/>• append to op_deferred"]
        SUB["submit(op_deferred)"]
        DRAIN["drain()<br/>• emit pairs whose metadata is ready,<br/>&nbsp;&nbsp;in per-op FIFO order<br/>• fire task_done_callback once a<br/>&nbsp;&nbsp;task's pairs have all emitted"]
        OUT["RefBundle → downstream operator"]
        ODR --> SUB
        DRAIN --> OUT
    end

    Q[["_request_q<br/>thread-safe queue"]]
    RES[("_results<br/>meta_ref → bytes")]

    subgraph BG["MetadataPrefetcher thread (background)"]
        FETCH["ray.wait(fetch_local) →<br/>ray.get(meta_refs)"]
    end

    SUB -- "put(meta_refs)" --> Q
    Q -- "get" --> FETCH
    FETCH -- "publish bytes" --> RES
    RES -. "pop (non-blocking)" .-> DRAIN
Loading

Benchmark (this PR vs master)

worker_scaling release test, scheduling-loop cost. total sched = total time the scheduling thread spent in the loop; avg/p50 = per-iteration loop duration.

variant total sched (s) Δ avg loop (s) Δ p50 loop (s) Δ
2000_actors_1ops 51 → 36 -29% 1.00 → 0.60 -40% 1.43 → 0.46 -68%
2000_actors_15ops 920 → 666 -28% 10.33 → 6.40 -38% 14.29 → 3.68 -74%
2000_tasks_1ops 49 → 33 -32% 1.04 → 0.38 -64% 0.10 → 0.10 -3%
2000_tasks_15ops 927 → 702 -24% 9.09 → 4.65 -49% 11.34 → 4.29 -62%
5000_actors_1ops 204 → 218 +7% 3.19 → 0.27 -92% 4.23 → 0.24 -94%
5000_actors_15ops 2401 → 1739 -28% 30.78 → 19.54 -37% 42.69 → 13.53 -68%
5000_tasks_1ops 172 → 153 -11% 4.19 → 0.32 -92% 2.60 → 0.24 -91%
5000_tasks_15ops 2754 → 1696 -38% 29.62 → 11.54 -61% 36.43 → 9.42 -74%

Per-iteration loop duration drops sharply everywhere (p50 −62% to −94%) — the inline ray.get is gone from the loop. Total scheduling time falls 24–38% on the many-operator pipelines that matter at scale.

The one slight regression is 5000_actors_1ops (total +7%): with a single operator the loop isn't fetch-bound, so removing the inline ray.get makes each loop far cheaper but it then runs many more times, and the per-iteration ray.wait polling dominates. Per-loop latency still drops 92–94% there; only the summed total rises slightly.

Ray Data release tests

https://buildkite.com/ray-project/release/builds/97303 — no regressions.

… iter

Per-pair ray.get(meta_ref) in DataOpTask.on_data_ready is the largest
leaf cost in process_completed_tasks at high scale (~1-2 ms fixed
overhead per call, hits the raylet RPC + msgpack wrapper). At 5000
workers with ~10 blocks each per scheduler tick, that's ~50 k pairs
× ~1.5 ms ≈ 75 s of leaf time amortizable into one batched call per
iteration.

Approach (no Core API change required, no streaming-gen protocol
change):

1. DataOpTask.peek_pending_meta_ref() pulls the next
   (block_ref, meta_ref) pair into the pending slots without
   fetching metadata. Returns the meta_ref so the scheduler-loop
   driver can batch it; subsequent on_data_ready consumes the same
   pending slots.

2. process_completed_tasks peeks each ready DataOpTask, asks the
   local CoreWorker (via ray.experimental.get_local_object_locations
   — local-only, no RPC) which meta_refs have a non-None
   ``object_size``. A non-None size is the load-bearing signal that
   the owner has processed the task return for that ref and a
   ray.get won't block.

3. Refs with known size go into ONE batched ray.get; the resulting
   dict is passed to each task's on_data_ready via a new
   ``prefetched_meta`` kwarg.

4. on_data_ready consults the dict first and falls back to the
   existing per-ref ray.get + GetTimeoutError warning when the dict
   misses — preserving all error semantics for refs whose size
   wasn't yet known (rare warmup window).

Also adds a debug-log emitted every 100 iterations summarizing the
size-known rate (e.g. "82 / 90 (91.1%) had known size and were
batched"). The rate tells us how much of the win we're actually
capturing vs falling through to the per-ref path.

Tests: three new mock-friendly tests in TestDataOpTask exercise
peek_pending_meta_ref, the prefetched-meta hit path, and the empty-
dict fallback to the per-ref path.

No behavior change for refs that miss the local-locations check; the
existing per-ref path with its timeout + worker-crash warning still
catches them.

Signed-off-by: xgui <xgui@anyscale.com>
@xinyuangui2 xinyuangui2 requested a review from a team as a code owner May 28, 2026 17:39

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces metadata prefetching for streaming executor tasks to batch ray.get calls, utilizing get_local_object_locations to identify ready metadata references without RPC overhead. The feedback suggests making the prefetch peeking non-blocking by setting the timeout to zero, wrapping the local location lookup in a try-except block to prevent scheduler crashes, and resetting telemetry counters after logging to track interval-based hit rates.

Comment thread python/ray/data/_internal/execution/interfaces/physical_operator.py Outdated
Comment thread python/ray/data/_internal/execution/streaming_executor_state.py Outdated
Comment thread python/ray/data/_internal/execution/streaming_executor_state.py Outdated
Comment thread python/ray/data/_internal/execution/streaming_executor_state.py Outdated
The previous revision of this PR queried get_local_object_locations
on the meta_ref. That returns the size of the pickled metadata blob
(a few KB) — not a useful signal for the budget loop in
on_data_ready, which compares against meta.size_bytes (the block's
data size).

Switch to querying the block_ref's object_size:

- block_ref.object_size is the block's actual data size in plasma,
  which is what maps to meta.size_bytes in the budget loop.
- A known block size also implies the worker has finished producing
  the block AND yielded the meta_ref (streaming gens yield block
  then meta), so the batched ray.get(meta_refs) is safe.

Rename peek_pending_meta_ref -> peek_pending_pair returning the full
(block_ref, meta_ref) tuple so the caller can inspect block size and
still address the meta_ref for the batched fetch.

Updated tests accordingly.

Signed-off-by: xgui <xgui@anyscale.com>
Comment thread python/ray/data/_internal/execution/streaming_executor_state.py Outdated
@ray-gardener ray-gardener Bot added the data Ray Data-related issues label May 28, 2026
…pass

Restructures process_completed_tasks into a 3-phase pipeline that
preserves bit-identical emission order while collapsing N per-pair
ray.get(meta_ref) calls into one batched call per scheduler iter.

1. Per-op loop (unchanged order): each DataOpTask.on_data_ready
   appends every pulled (block_ref, meta_ref) pair to a shared
   deferred: List[DeferredEmit]. No RefBundle is emitted inside
   on_data_ready in this mode; the loop only manages budget.

   - Known block size (from ray.experimental.get_local_object_locations,
     local-only no-RPC lookup): defer with meta_bytes=None; budget
     uses the local object_size.
   - Unknown block size: synchronous per-ref ray.get(meta_ref, timeout)
     to obtain size for budget; stash the bytes on the deferred entry
     so the caller doesn't refetch. Per-ref GetTimeoutError warning
     and retry-next-iter semantics preserved exactly.

2. _replay_deferred_emits: one batched ray.get covering deferred
   entries with meta_bytes=None. On failure: per-ref retry fallback,
   then skip refs that still fail with a warning. Replay deferred
   list in append order, calling _output_ready_callback and updating
   _last_block_meta — emission order matches today's per-op,
   per-task, per-pair traversal exactly.

3. Postponed task_done_callback: when the streaming gen raises
   StopIteration during on_data_ready in deferred mode, we set
   _task_done_pending instead of firing immediately, so
   _last_block_meta has time to reach its final value during replay.
   After replay, _replay_deferred_emits fires task_done_callback for
   any task with the flag set.

Removes peek_pending_pair (API) and _batched_fetch_meta (helper) —
the new design does the local lookup per-pair inside on_data_ready
instead of cross-op peeking.

Debug log every ~100 scheduler iterations summarizes the size-known
rate. Flip the data logger to DEBUG to see it.

Legacy mode (deferred_emits=None) preserves the original behavior
unchanged for direct callers in tests and utilities.

Three tests in TestDataOpTask cover the legacy path, the deferred
path with replay, and emission-order preservation across multiple
tasks.

Signed-off-by: xgui <xgui@anyscale.com>
@xinyuangui2 xinyuangui2 changed the title [Data] Batch ray.get(meta_ref) across ready DataOpTasks per scheduler iter [Data] Defer RefBundle emits + batched ray.get(meta_refs) in process_completed_tasks May 28, 2026
Comment thread python/ray/data/_internal/execution/interfaces/physical_operator.py Outdated
Comment thread python/ray/data/_internal/execution/streaming_executor_state.py Outdated
deferred_emits is now a required parameter. Test callers route through
the new DataOpTask.drain_and_emit helper (= deferred + sync replay in
one call) so on_data_ready has a single code path.

Signed-off-by: xgui <xgui@anyscale.com>
Comment thread python/ray/data/_internal/execution/interfaces/physical_operator.py Outdated
Comment thread python/ray/data/_internal/execution/interfaces/physical_operator.py Outdated
xinyuangui2 and others added 2 commits June 7, 2026 21:33
Signed-off-by: xgui <xgui@anyscale.com>

# Conflicts:
#	python/ray/data/_internal/execution/interfaces/physical_operator.py
…obe stats

Update 1 on the deferred-emit pipeline:

- On a local-size miss, estimate the output budget from the operator's
  running average block size (recorded from exact meta.size_bytes in
  replay) instead of a per-ref ray.get. All pairs now defer
  (meta_bytes=None) so metadata flows through ONE batched ray.get — a
  single metadata-fetch path. Misses are rare (object_size is locally
  known for ~all pairs), so the estimate is a small, transient budget
  approximation; the exact metadata still arrives via the batched fetch.

- Replace the old size-known counter (which keyed off meta_bytes, now
  always None) with the ray-project#63904-style probe recorded in replay_deferred_emits
  where both the local object_size and exact meta.size_bytes are known:
  hit rate, object_size/meta ratio, within-1%, and p50/p90/max relative
  diff. Logged periodically and surfaced in the worker_scaling result
  metrics (local_size_probe_*).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
@xinyuangui2 xinyuangui2 requested a review from a team as a code owner June 7, 2026 21:43
Comment thread python/ray/data/_internal/execution/interfaces/physical_operator.py Outdated
Comment thread python/ray/data/_internal/execution/interfaces/physical_operator.py Outdated
Comment thread python/ray/thirdparty_files Outdated
Move the batched `ray.get(meta_refs)` for deferred RefBundle emits off the
scheduling-loop thread into a dedicated `MetadataPrefetcher` thread, so the
executor thread never blocks on metadata fetches.

- New `MetadataPrefetcher`: executor thread calls `submit()` (enqueue an op's
  deferred meta_refs + append to a per-op FIFO) then `drain()` (emit pairs
  whose metadata is back, front-first per op). A background thread blocks on
  `ray.get`, coalescing queued batches into one fetch.
- Per-op in-order emit: each operator's FIFO is emitted front-first and stops
  at the first pair still in flight, so RefBundle emission order is preserved
  exactly; an op waiting on metadata is skipped this round and retried next
  (matches the synchronous break-and-retry). Ops are independent.
- Postponed `task_done_callback` fires only once a task's pending emits reach
  zero (`_pending_emit_count`), so a task stays active until fully drained.
- `process_completed_tasks` takes an optional `metadata_prefetcher`; without
  one it falls back to the synchronous batched get + `replay_deferred_emits`.
  StreamingExecutor creates/starts/stops the prefetcher.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
The deferred-emit path constructed RefBundle with a legacy (ref, metadata)
tuple, but RefBundle.blocks now requires BlockEntry instances (upstream
replaced the 2-tuple shape). Wrap the block in BlockEntry so the emit
helper used by both replay_deferred_emits and the MetadataPrefetcher
produces a valid bundle.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
@xinyuangui2 xinyuangui2 changed the title [Data] Defer RefBundle emits + batched ray.get(meta_refs) in process_completed_tasks [Data] Fetch block metadata on a background thread (defer RefBundle emits + batched ray.get) Jun 10, 2026
xinyuangui2 and others added 8 commits June 10, 2026 19:23
Signed-off-by: xgui <xgui@anyscale.com>

# Conflicts:
#	python/ray/data/_internal/execution/interfaces/physical_operator.py
#	release/nightly_tests/dataset/worker_scaling_benchmark.py
…lback and probe

- on_data_ready: the driver owns every block ref the streaming generator
  yields, so get_local_object_locations always knows object_size — assert
  it instead of estimating, and drop the per-op running-average fallback.
- Remove the local-size probe telemetry (hit rate / size-diff counters)
  and its worker_scaling_benchmark hooks.
- Deferred emit via MetadataPrefetcher is now the only mode: remove the
  synchronous replay fallback (replay_deferred_emits / drain_and_emit);
  make metadata_prefetcher required in process_completed_tasks. Tests use
  a new MetadataPrefetcher.flush() + a sync drive helper in tests/util.py.
- Remove stray python/ray/thirdparty_files artifact.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
…on overhead

bytes_read is now the block's object-store object_size, which is a few KB
larger than meta.size_bytes (per-object serialization overhead), so the
exact pytest.approx default tolerance is too tight.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
Drain each task in a loop until end-of-stream: on a 1-CPU cluster the
second stub generator hasn't started when the first is ready, so a single
on_data_ready call pulls nothing from it. The failure also leaked a
backpressured generator (held by the pytest traceback) that pinned the
only CPU and hung every later test needing a task.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
…tations; hoist local imports

- MetadataPrefetcher fetch thread: accumulate meta_refs in a pending set,
  ray.wait(num_returns=all, timeout=0.1, fetch_local=True), and ray.get only
  the ready refs — a ref stuck on a bad node stays pending instead of
  blocking the whole thread and starving other operators' metadata.
- Deferred-emit tests: block sizes now carry a small format overhead
  (+8 bytes after the master merge); compare with pytest.approx(abs=64).
- Hoist function-local imports (BlockMetadataWithSchema, DeferredEmit,
  _emit_deferred_entry, MetadataPrefetcher) to module level; no cycles.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
Restore the pre-deferred gating semantics, without blocking calls:

- A task whose previously pulled pairs are still awaiting their background
  metadata fetch (_pending_emit_count > 0) pulls nothing this iteration —
  the producer must not get arbitrarily far ahead of unfetched metadata.
- If a block's local object_size isn't known (e.g. lost to node failure),
  leave the pair pending and retry later instead of asserting/consuming.
  Consuming it advanced the generator stream to end-of-stream, whose
  handling blocks on the generator ref — with no node to reconstruct on,
  that deadlocked the scheduling thread (caught by
  test_on_data_ready_with_preemption_after_wait). A zero-timeout
  ray.wait(fetch_local=True) nudges reconstruction in the background.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
@xinyuangui2 xinyuangui2 requested a review from iamjustinhsu June 15, 2026 19:43
xinyuangui2 and others added 2 commits June 17, 2026 01:56
The deferred design can spin the scheduling loop while metadata is in
flight: each iteration re-runs per-iteration work (resource accounting,
actor-state refresh, etc.) but drain() finds nothing ready to emit. Add
drain(block_timeout_s): when there are pending pairs but nothing freshly
published, block up to the timeout for the fetch thread's first result
(an Event it sets on publish) instead of returning an empty drain. It
only blocks when the iteration would otherwise emit nothing — if results
are ready it returns immediately. process_completed_tasks now block-drains
with WAIT_FOR_TASK_COMPLETION_TIMEOUT_S.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit e6f87ca. Configure here.

Comment thread python/ray/data/_internal/execution/interfaces/physical_operator.py

@iamjustinhsu iamjustinhsu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work!! I was a bit pedantic on the comments this time because this part of the code is pretty sensitive to changes. Anyways, I like the idea, but we'll probably need eyes from balaji too

Comment thread python/ray/data/_internal/execution/interfaces/physical_operator.py Outdated
Comment thread python/ray/data/_internal/execution/interfaces/physical_operator.py Outdated
Comment thread python/ray/data/_internal/execution/interfaces/physical_operator.py Outdated
Comment thread python/ray/data/_internal/execution/interfaces/physical_operator.py Outdated
Comment thread python/ray/data/_internal/execution/interfaces/physical_operator.py Outdated
"""
if (
block_timeout_s > 0
and not self._published.is_set()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this self._published? Like what happens if we just use any(self._fifos.values()) and don't have a block_timeout_s?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In experiments, we find sometimes the result queue is empty but scheduling loop will continue and waste some calculation. Thus, we use this event to mitigate that. Release tests show this can avoid some regressions in tests.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm not sure I completely understand this part:

we find sometimes the result queue is empty but scheduling loop will continue and waste some calculation

When the queue is empty, do u know where the scheduling loop is spending time? Why does adding block_timeout_s and self._published make the scheduling thread faster?

Comment thread python/ray/data/_internal/execution/metadata_prefetcher.py Outdated
try:
values = ray.get(batch)
results: Dict["ray.ObjectRef", Any] = dict(zip(batch, values))
except Exception:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to narrow which exceptions could be raised?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aligned with the original implementation, and store the exceptions whatever the type is. I didn't take a closer look at the possible types.

item = ()
else:
item = self._request_q.get()
if item is None:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we reach the sentinel, should there be pending meta refs?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should not be. Since the STOP happens after the thread joins.

# otherwise poll so pending refs keep making progress.
if pending:
try:
item = self._request_q.get_nowait()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple questions:

  • If you call get_nowait(), rather than one with a timeout, will this busy-wait, burning CPU cycles since we are continously calling get_nowait?
  • How come you case with or without pending?
  • How come you have another step to "Coalesce any other already-queued batches.", which looks very similar to what's above?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified the loop. Hope it is more clear

xinyuangui2 and others added 19 commits June 17, 2026 17:44
… docs, drain-timeout constant

- Replace the _task_done_pending/_has_finished bool pair with a
  TaskGeneratorState enum (ACTIVE -> DRAINED -> FINISHED) as a small state
  machine; has_finished/is_drained read it.
- Move _emit_deferred_entry/_fire_task_done off module level into
  DataOpTask.emit_block()/mark_done(), so the prefetcher no longer reaches
  into _-prefixed task internals. The prefetcher calls task.emit_block(...)
  / task.mark_done().
- on_data_ready: rewrite the docstring (deferred-emit + graceful
  block/metadata-not-ready handling + local-size gating with fallback);
  stop claiming object_size is 'always' known; add type annotations; clarify
  the failure-path comment.
- Give the drain block its own DRAIN_BLOCK_TIMEOUT_S constant so it can be
  tuned independently of WAIT_FOR_TASK_COMPLETION_TIMEOUT_S.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
…apsulation

- Bound the rare inline metadata fetch (when local object_size is missing)
  with a 0.1s timeout; on timeout leave the pair pending and retry next round
  instead of blocking the scheduling loop.
- Move pending-emit accounting onto DataOpTask (mark_pending/mark_emitted),
  called as d.task.mark_pending(); drop the wrappers from DeferredEmit.
- Type the prefetcher FIFOs as defaultdict[Hashable, deque[DeferredEmit]] and
  op_key as Hashable; make the _NOT_READY sentinel a class constant.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
…ut=0)

The fetch thread only gets refs that ray.wait(fetch_local=True) reported
ready, so ray.get can use timeout=0 to guarantee it never blocks on data
transfer. A ref that resolved to a task error is still available and raises
that error immediately (captured per-ref); a ref that raced out of the local
store raises GetTimeoutError and is re-queued for a later pass rather than
being miscounted as a block error.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
Replace the bare ``None`` enqueued on the request queue to stop the fetch
thread with a dedicated ``_STOP`` class constant, matching ``_NOT_READY``.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
Cross-operator iteration order in drain() is irrelevant — each op's bundles
go to its own downstream queue. Only the per-op deque's front-first order
matters, which deque guarantees naturally. Drop the misleading dict
insertion-order justification.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
A reviewer asked whether dropping a pair on a metadata-fetch error leaks
output budget. It doesn't: the budget is a per-iteration read quota
recomputed each scheduling loop, so there is no balance to refund. Add a
comment at the charge site explaining the deliberate choice.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
Cut redundant prose and fold the per-iteration outcomes into a single list
in the docstring. Add a note on what the size-fallback timeout `break` does
(leaves the pair pending for the next call).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
…essors

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
Drop the redundant class docstring and the break-and-retry parenthetical,
and annotate _request_q as Queue[List[ObjectRef]] instead of a comment.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
Replace the if-pending/else split and duplicated first-pull with one
get(block=not pending) followed by a non-blocking drain. Same operating
point: block on the queue only when idle, go straight to ray.wait while refs
are in flight. _STOP still exits immediately (fast teardown).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
_fetch now takes the full pending set, waits for the locally-available refs,
fetches + publishes them, and returns those still in flight. _run is left to
just manage the request queue and pending set.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
None means "don't block"; a set value must be > 0. Clearer than overloading
0.0 as the no-block default.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
The method both queues new deferred pairs and registers end-of-stream tasks
for the postponed done callback; the name now reflects both roles.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
submit_or_register_done read as more confusing than submit; revert it. Also
remove the self-evident comment above the prefetcher start() call.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
Only per-op append order is a guarantee; the order ops emit relative to one
another is irrelevant (each emits into its own downstream queue) and depends
on which op's metadata lands first. Assert per-op sequences instead of the
full interleaving, which could otherwise flake.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
- Type the stop sentinel as a single-member _Stop enum so the request queue
  stays typed as Queue[list[ObjectRef] | _Stop] and narrows cleanly; check it
  with isinstance.
- Import GetTimeoutError explicitly instead of via ray.exceptions.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
@xinyuangui2 xinyuangui2 requested a review from iamjustinhsu June 24, 2026 16:34

@iamjustinhsu iamjustinhsu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More feedback, I still need to look at the tests, but at a glance, I don't see any usage of callback seams to use in test. What I mean is we have a metadata callback and block ready callback to make it easier to test. You could theoretically have callbacks as well for when object_size is known or when the ray.get after a fetch_local True returns an exception and you need to ray.get each individual ref, and then use them in the test like so. By doing this you'll have more control on entering each if statement, or try catch statement. lmk if that doesn't make sense

# get arbitrarily far ahead of unfetched metadata (the pre-deferred
# code fetched each pair's metadata before pulling the next one).
# Retry once the pending pairs have been emitted.
if self._state is not TaskGeneratorState.ACTIVE or self._pending_emit_count > 0:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason that we don't want to pull more if there are pending emits? This is my thinking:

  • The decision to pull only depends the size_bytes, which will always be known through 2 ways: get_local_object_locations, or a fallback to ray.get
  • This decision does not include if it has pending emits, so theoretically it seems to me it's possible

oh also not saying we should change this, just want to understand the reasoning

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to match the old behavior: orders are kept and pull is blocked when previous metadata is not ready.

if object_size is None:
# Rare: no local size record. Fall back to a short metadata
# ``ray.get`` for the size.
logger.warning(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to double check, this warning doesn't get emitted very often, or at least in ur testing it didn't emit once? Regardless, I'm not sure if this is possible, but if the metadata starts not showing up for whatever reason for multiple tasks, this could be spamming. Do u think it would be good to gate this warning behind a log_once call. We have this function used to log stuff once, and it could be handy here

Comment thread python/ray/data/_internal/execution/interfaces/physical_operator.py
Comment thread python/ray/data/_internal/execution/interfaces/physical_operator.py
if task.is_drained():
# ``set`` dedupes: a still-pending task re-seen next iteration
# must not be registered (or later fired) twice.
self._done_pending.add(task)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think to keep it consistent, you can rename self._done_pending to self._drained_tasks or smth like that

"""
if (
block_timeout_s > 0
and not self._published.is_set()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm not sure I completely understand this part:

we find sometimes the result queue is empty but scheduling loop will continue and waste some calculation

When the queue is empty, do u know where the scheduling loop is spending time? Why does adding block_timeout_s and self._published make the scheduling thread faster?

class MetadataPrefetcher:
# Sentinel for "ref not yet fetched" in the result store. A fetched result
# is either the metadata bytes or an ``Exception`` captured during fetch.
_NOT_READY = object()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this sentinel also be a part of the _Stop enum? For example, we could rename it to _Signal, and then host 2 values: Stop (for streaming executor done), and not_ready (for object not ready for ray.get)

for ref in ready:
try:
results[ref] = ray.get(ref, timeout=0)
except GetTimeoutError:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also log a warning here too, but only log_once. I don't think this should be a common case, but we should at least know if we are heading down this path


# Fetches deferred block metadata on a background thread so the
# scheduling loop never blocks on ``ray.get(meta_refs)``.
self._metadata_prefetcher = MetadataPrefetcher()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this change is risky, consider having an env flag to turn this off just in case. You can set this to default True, and that way people can go back to before, but we'll be able to test this with the community

Comment on lines +119 to +120
"""Queue ``deferred`` pairs (from one operator) for fetching + emission,
and register any end-of-stream tasks for a postponed done callback.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Queue ``deferred`` pairs (from one operator) for fetching + emission,
and register any end-of-stream tasks for a postponed done callback.
"""Queue ``deferred`` pairs (from one operator) for fetching + emission,
and register any end-of-stream tasks (ready_tasks) for a postponed done callback.

I was thinking u may want to refer to the argument parameters.

But also, this function appears to do to separate things (one for new metadata_ref, one for making sure we call task_done_callback), so I'm thinking u could split this function up into those 2 phases. It seems like drain also does the same 2 things as well (correct me if i'm wrong). wdyt?

- Replace the size-unavailable logger.warning with log_once (per operator).
- Give DeferredEmit __slots__ (created per pulled pair).
- TODO: add object_size_bytes to BlockMetadata (object-store vs in-memory size).
- Rename _done_pending -> _drained_tasks.
- Drop the _published block-wait in drain (and DRAIN_BLOCK_TIMEOUT_S) -- added
  complexity for little gain.
- Fold the _NOT_READY/_Stop sentinels into a single _Signal enum (STOP /
  NOT_READY).
- log_once a warning when a ray.wait-ready ref isn't locally gettable.
- Add RAY_DATA_METADATA_PREFETCH_ON_THREAD kill switch: when off, fetch
  metadata synchronously on the executor thread (no background thread).
- Split submit -> submit + register_drained_tasks and drain -> emit_ready +
  fire_done_callbacks, so each method does one thing.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Signed-off-by: xgui <xgui@anyscale.com>
@xinyuangui2

Copy link
Copy Markdown
Contributor Author

close in favor of #64378

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants