Skip to content

[Core] Fail-fast on streaming generator replay object-count mismatch#64394

Open
dragongu wants to merge 1 commit into
ray-project:masterfrom
dragongu:fix/ray_data_hang
Open

[Core] Fail-fast on streaming generator replay object-count mismatch#64394
dragongu wants to merge 1 commit into
ray-project:masterfrom
dragongu:fix/ray_data_hang

Conversation

@dragongu

@dragongu dragongu commented Jun 27, 2026

Copy link
Copy Markdown
Contributor

Description

When a streaming generator task's replay emits a different number of objects than the original successful attempt, downstream consumers break — fewer objects leaves the pipeline silently hanging (the scenario we hit in production), and more objects silently drops data past the pinned end-of-stream index (theoretical). Nothing detects this mismatch today. This PR fails the task fast with a new STREAMING_GENERATOR_REPLAY_INCONSISTENT error so the failure propagates through lineage to downstream consumers instead. The underlying object-count non-determinism is not fixed here; #64393 removes one Ray-internal source, and a future opt-in rows_per_block parameter would let users with unstable UDFs make the count deterministic.

Symptom. We hit this on an elastic-resource pipeline (worker pods can be preempted at any time) running read_parquet → tokenize → predict → write_parquet. The job occasionally hung: one last predict task sat in PENDING_ARGS_AVAIL for 5h+ with no error logged, blocked on object 12 from an upstream tokenize task that was never produced.

Root cause. A worker preemption had triggered lineage reconstruction of that tokenize task — but the replay produced only 11 objects instead of the original 12, so object 12 was never created.

We added logging in production to confirm this is systematic, not a one-off. For the same task, two attempts received identical input (in_rows/in_bytes equal) and produced an identical row count, yet emitted a different number of objects (one per output block):

Task 377d5b47   in_rows=8,510   in_bytes=332,299,291   (identical across attempts)
  attempt 1 (pid=428):  out_blocks=3   out_rows=8,510   out_bytes=1,947,213,121
  attempt 2 (pid=652):  out_blocks=4   out_rows=8,510   out_bytes=1,520,930,614
  → out_blocks 3 → 4; out_bytes estimate drifted ~22% (228KB/row → 179KB/row)

Task 5668fd1c   in_rows=9,330   in_bytes=383,047,785    (identical across attempts)
  attempt 1 (pid=307):    out_blocks=5   out_rows=9,330   out_bytes=2,284,886,568
  attempt 2 (pid=12772):  out_blocks=4   out_rows=9,330   out_bytes=2,388,146,403
  → out_blocks 5 → 4; out_bytes estimate drifted ~4.5% (245KB/row → 256KB/row)

out_rows is identical across attempts but out_blocks is not — a replay genuinely produces a different object count.

A task's object count is non-deterministic: block boundaries drift across attempts (see Why the object count drifts below), so a replay can emit fewer or more objects than the original successful attempt. Downstream consumers were created against the original count:

  • fewer → the consumer blocks on stream indices that are never produced (the scenario we hit);
  • more → the extras past the pinned end-of-stream index are silently dropped by ObjectRefStream::InsertToStream.

Why this goes undetected: the first successful execution pins the stream's end-of-stream index (EOF), MarkEndOfStream early-returns afterward so EOF can never move, and CompletePendingTask only re-checks replays that fail with an application error — a normally-completing replay with a different count falls through unhandled.

Why the object count drifts. A streaming generator yields one object per output block, and OutputBuffer.next() cuts a new block once accumulated rows reach target_num_rows — a value derived from size_bytes(). The object count therefore tracks the running size estimate rather than the input, so anything that changes that estimate or the data across attempts shifts the boundaries and changes the count:

  • a non-deterministic size_bytes() estimate (e.g. PandasBlock.size_bytes sampling without a fixed seed — addressed in [Data] Make PandasBlock.size_bytes deterministic #64393);
  • a UDF whose output is unstable across runs — the same input produces slightly different results across attempts (nondeterministic ordering, floating-point jitter, or wall-clock/random-dependent content), shifting the block boundaries.

Fix. This PR surfaces the mismatch loudly: detect it in CompletePendingTask and fail the task with a new STREAMING_GENERATOR_REPLAY_INCONSISTENT error type, so the failure propagates through lineage to downstream tasks instead of leaving the pipeline stuck or dropping data silently. The behavior change is strictly hang/silent-loss → explicit error; no caller relies on the old silent behavior.

Implementation notes.

  • Detection runs in a new FailStreamingGeneratorReplayIfInconsistent helper called early in CompletePendingTaskbefore any return object is written to the store (so downstream consumers can't observe the inconsistent objects before the failure propagates) and before SetTaskStatus(FINISHED) (FailPendingTask RAY_CHECKs IsPending()).
  • expected_count comes from spec.NumStreamingGeneratorReturns() (recorded on the first successful attempt); actual_count is reply.streaming_generator_return_ids_size().
  • Both the fewer- and more-objects cases fail the task: either way the stream is already inconsistent with what downstream consumers were created against.
  • Known limitation: a first attempt that yields zero objects does not record a count, so a replay yielding a non-zero count is not flagged. This cannot hang (the stream's EOF is pinned to 0, so consumers finish immediately); detecting it would require tracking "count recorded" separately and is left out of scope here.

Related issues

None.

Additional information

Tests. Adds C++ unit tests in task_manager_test.cc that drive CompletePendingTask with controlled object counts — the deterministic repro of the bug:

  • fewer objects on replay → task fails with STREAMING_GENERATOR_REPLAY_INCONSISTENT;
  • more objects on replay → task fails with the same error;
  • same count → completes normally (no false positive).

End-to-end illustration. How the hang arises in practice — a streaming generator whose object count drifts across attempts, forced through lineage reconstruction by killing the producing node. The unit tests above are the reliable repro; this script is timing-dependent (reconstruction depends on when the owner detects the lost objects) and is here for intuition only.

import ray
from ray.cluster_utils import Cluster

# Head node holds the driver + the cross-attempt counter; a separate worker
# node runs the generator so we can kill it to force reconstruction.
cluster = Cluster()
cluster.add_node(num_cpus=0, resources={"head": 1})
ray.init(address=cluster.address)
worker = cluster.add_node(num_cpus=1, resources={"worker": 1})


# Detached actor survives the worker node death, so the replay sees a
# different attempt number and yields a different object count.
@ray.remote(num_cpus=0, resources={"head": 0.01})
class AttemptCounter:
    def __init__(self):
        self.n = 0

    def next(self):
        self.n += 1
        return self.n


AttemptCounter.options(name="counter", lifetime="detached").remote()


@ray.remote(num_returns="streaming", resources={"worker": 1}, max_retries=1)
def gen():
    attempt = ray.get(ray.get_actor("counter").next.remote())
    # First attempt yields 3 objects (EOF pinned to 3); the replay yields 2.
    num_objects = 3 if attempt == 1 else 2
    for i in range(num_objects):
        yield {"i": i}


# First attempt: collect the 3 ObjectRefs (the task runs, EOF pinned to 3).
# The objects live in the worker node's object store.
gen_ref = gen.remote()
refs = list(gen_ref)
print("first attempt produced", len(refs), "objects")  # 3

# Drop the generator handle and kill the producing node so the objects are
# lost; the next ray.get has to replay the task elsewhere.
del gen_ref
cluster.remove_node(worker)
cluster.add_node(num_cpus=1, resources={"worker": 1})

# The replay produces only 2 objects, but EOF is still pinned to 3. The first
# two ray.get calls succeed (served by the replay); the third blocks forever
# without this fix. With it, the task fails with
# STREAMING_GENERATOR_REPLAY_INCONSISTENT.
for i, ref in enumerate(refs):
    print("got object", i, "=", ray.get(ref))

@dragongu dragongu requested a review from a team as a code owner June 27, 2026 15:58

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a mechanism to detect and fail streaming generator tasks whose replays produce a different number of objects than their initial successful execution, preventing silent hangs or data loss. This is achieved by adding FailStreamingGeneratorReplayIfInconsistent in TaskManager and introducing a new error type STREAMING_GENERATOR_REPLAY_INCONSISTENT. The review feedback suggests moving the inconsistency check to the very beginning of CompletePendingTask to prevent side-effects from inconsistent replays being written to memory before the task is failed. Additionally, it recommends simplifying the method signature by checking the execution count internally and adding defensive checks against empty return objects.

Comment thread src/ray/core_worker/task_manager.cc Outdated
Comment thread src/ray/core_worker/task_manager.cc
Comment thread src/ray/core_worker/task_manager.h

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes using default effort and found 2 potential issues.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit 54b50cd. Configure here.

Comment thread src/ray/core_worker/task_manager.cc
Comment thread src/ray/core_worker/task_manager.cc
@dragongu dragongu force-pushed the fix/ray_data_hang branch from 54b50cd to 5812a1f Compare June 27, 2026 16:12
A streaming generator task that is replayed (e.g. for lineage
reconstruction) after its first successful attempt can produce a
different number of objects if the generator output is non-deterministic.
Downstream consumers were already created against the original object
count, so a replay with fewer objects hangs them on indices that are
never produced, and a replay with more objects silently drops the extras
beyond the pinned EOF.

Detect the mismatch at the start of CompletePendingTask, before any
return object is written to the store, and fail the task with a new
STREAMING_GENERATOR_REPLAY_INCONSISTENT error type so the failure
propagates through lineage instead of silently hanging or dropping data.

Adds C++ unit tests covering fewer/more/same object counts on replay.

Signed-off-by: dragongu <andrewgu@vip.qq.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@dragongu dragongu force-pushed the fix/ray_data_hang branch from 5812a1f to e139a5a Compare June 27, 2026 16:14
@ray-gardener ray-gardener Bot added core Issues that should be addressed in Ray Core community-contribution Contributed by the community labels Jun 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community core Issues that should be addressed in Ray Core

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants