[Data] Make Zip a properly streaming operator#64395
Conversation
Signed-off-by: Hyunoh-Yeo <hyunoh.yeo@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request refactors the ZipOperator from a bulk implementation to a streaming implementation, allowing it to process blocks incrementally and align rows by splitting blocks as needed. It also enables throttling for the operator and adds comprehensive tests. The review feedback highlights critical performance concerns regarding synchronous ray.get calls inside _try_output() and _fill_block_deque(), which block the main streaming executor thread and can lead to scheduling bottlenecks or deadlocks.
Signed-off-by: Hyunoh-Yeo <hyunoh.yeo@gmail.com>
Signed-off-by: Hyunoh-Yeo <hyunoh.yeo@gmail.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request refactors ZipOperator from a bulk operator to a fully streaming operator. Blocks are now processed incrementally as they arrive from all inputs, aligned dynamically using a split-and-leftover mechanism, and zipped asynchronously via DataOpTasks. It also introduces asynchronous row-count resolution when metadata is missing and enables backpressure support. Since there are no review comments, I have no feedback to provide.
Signed-off-by: Hyunoh-Yeo <hyunoh.yeo@gmail.com>
Description
Reimplements
ZipOperatoras a streaming operator. Previously,zipbulk-materialized all blocks from both inputs into the object store before producing any output, withthrottling_disabled()returningTrueand the operator listed as a blocking-materializing operator. This reworks it to align and zip blocks incrementally as they arrive.How alignment works: whenever a block is available from every input, the operator takes one from each, splits any larger blocks down to the minimum row count (carrying the remainder forward as a per-input "leftover"), zips the aligned blocks, and emits the output bundle immediately. Mismatched total row counts are detected at
all_inputs_done()and raiseValueError.Related issues
Closes #56300
Additional information