[Data] Make PandasBlock.size_bytes deterministic#64393
Open
dragongu wants to merge 1 commit into
Open
Conversation
Contributor
There was a problem hiding this comment.
Code Review
This pull request ensures that size estimation in PandasBlockAccessor.size_bytes() is deterministic by using a fixed random_state=0 when sampling columns. This prevents issues like silent hangs or data loss during lineage reconstruction. A corresponding unit test was added to verify determinism. The reviewer suggested a performance optimization to bypass sampling when the sample size equals the total size.
PandasBlock.size_bytes estimates the byte size of string/object columns by sampling up to 200 rows with pandas.DataFrame.sample(n=), which uses a fresh random seed on each call by default. The estimate is therefore non-deterministic: two blocks holding identical data can report different sizes. OutputBuffer.next uses size_bytes to decide how to split blocks, so this non-determinism makes a streaming generator task produce a different number of objects across replay attempts (e.g. lineage reconstruction), which can hang or silently drop data downstream. This passes a fixed random_state to sample() so the estimate is deterministic. When the whole column is sampled (common for blocks under 200 rows), it reads the values directly and skips sample() entirely, avoiding the permutation/RNG/copy overhead; that path has no randomness and is trivially deterministic. Adds a unit test asserting two blocks with identical data report the same size_bytes. Signed-off-by: dragongu <andrewgu@vip.qq.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
4839bbe to
1d3d4ec
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
PandasBlockAccessor.size_bytes()estimates string/object column sizes by sampling up to 200 rows viapandas.DataFrame.sample(n=...)without a seed, so two blocks holding identical data report different sizes. That estimate drives block splitting in streaming generators, so a re-executed task can emit a different number of objects than the original attempt — which leaves downstream consumers silently hanging or silently dropping data (the companion #64394 fails fast when that mismatch occurs). This PR fixes the sampling non-determinism by passingrandom_state=0, and skips sampling entirely when the column fits in the sample. User UDFs remain responsible for their own determinism.Symptom. A streaming generator task re-executed for lineage reconstruction can leave the pipeline silently hanging or silently dropping data, with no error logged.
Root cause.
PandasBlockAccessor.size_bytes()estimates string/object columns by sampling up to 200 rows viapandas.DataFrame.sample(n=...). No seed is passed, so each call samples different rows and two blocks holding identical data report different sizes.How it turns into a bug. That estimate drives block splitting:
OutputBuffer.next()callssize_bytes()to computetarget_num_rows, so a task can split the same output into a different number of blocks each time it runs.ObjectRefStream::InsertToStream(silent data loss).Fix. Pass
random_state=0tosample(), and skip sampling when the column has fewer thansample_sizerows. Identical input now always yields the same size estimate. This removes one Ray-internal source of streaming-generator object-count non-determinism; user UDFs remain responsible for their own determinism.Scope. Only
PandasBlockAccessor.size_bytes()is affected — the Arrow equivalent already usestable.nbytesand is deterministic. Pandas blocks are produced along several common paths:pandas.DataFrame(e.g.map_batches(fn, batch_format="pandas")), kept as a pandas block byBlockAccessor.batch_to_block;split_repartitionpreserving a pandas input schema; andencoder,vectorizer,imputer).On released branches these paths default to pandas, so the bug is broadly reachable. On
master, #61733 added a default that converts many UDF outputs to Arrow, but the fallback and pandas-schema paths above still build pandas blocks.Related issues
None.
Additional information
Reproduction (before this PR, the two printed sizes frequently differ):
TestSizeBytes.test_deterministic_across_blocks, asserting that two blocks holding identical data report the samesize_bytes().