Skip to content

feat(sort-shuffle): accept Option<Partitioning>#1638

Draft
andygrove wants to merge 4 commits intoapache:mainfrom
andygrove:feat/sort-shuffle-none
Draft

feat(sort-shuffle): accept Option<Partitioning>#1638
andygrove wants to merge 4 commits intoapache:mainfrom
andygrove:feat/sort-shuffle-none

Conversation

@andygrove
Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #.

Rationale for this change

SortShuffleWriterExec previously only supported Partitioning::Hash. Stages with partitioning=None (final output, CoalescePartitionsExec, SortPreservingMergeExec) had to fall back to ShuffleWriterExec, leaving two writer code paths and two on-disk shuffle formats coexisting in executor work directories. Extending sort-shuffle to handle None lets every stage use one writer when sort-based shuffle is enabled and is a prerequisite for removing the legacy hash-based writer entirely.

What changes are included in this PR?

  • SortShuffleWriterExec::try_new now takes Option<Partitioning> (mirroring ShuffleWriterExec). When None, the writer skips hashing and writes a single-partition data+index file with every input row in bucket 0.
  • shuffle_output_partitioning() returns Option<&Partitioning>.
  • Encoder/decoder in ballista_core::serde accept the absent output_partitioning field on SortShuffleWriterExecNode (the proto field is implicitly optional, so no proto change).
  • DefaultDistributedPlanner::create_shuffle_writer_with_config selects sort-shuffle for any supported partitioning (Hash or None) when BALLISTA_SHUFFLE_SORT_BASED_ENABLED is on, instead of falling back to ShuffleWriterExec.
  • New unit tests exercise the writer with None and the planner routing decision.

Are there any user-facing changes?

The signature of SortShuffleWriterExec::try_new and shuffle_output_partitioning() change shape (Partitioning -> Option<Partitioning>). The BALLISTA_SHUFFLE_SORT_BASED_ENABLED config still controls the choice of writer; the difference is that when enabled, all stages use sort-shuffle, not just the hash-partitioned ones.

@andygrove andygrove changed the title feat(sort-shuffle): accept Option<Partitioning> feat(sort-shuffle): accept Option<Partitioning> [WIP] May 1, 2026
@andygrove andygrove marked this pull request as ready for review May 1, 2026 16:50
@andygrove andygrove force-pushed the feat/sort-shuffle-none branch from 87b92a5 to 3f21142 Compare May 1, 2026 17:03
…itioning

When `SortShuffleWriterExec` was constructed with `partitioning=None`
every task wrote `partition_id=0` to the scheduler, so the scheduler's
partition_locations map collapsed all 16 hash buckets from the upstream
stage under a single key. The downstream `SortPreservingMergeExec` then
saw one stream containing 16 sorted sub-streams concatenated together,
which is not globally sorted, and produced rows out of order.

Fix matches legacy `ShuffleWriterExec` semantics: report
`partition_id = input_partition` so each input task lands in its own
output slot. The on-disk file still has a single logical bucket
(bucket 0); both the local and Flight readers now map any request on a
single-bucket file to bucket 0.

Also update test expectations that still referenced `ShuffleWriterExec`
for the final stage now that None-partitioning is routed through
sort-shuffle.
@andygrove andygrove changed the title feat(sort-shuffle): accept Option<Partitioning> [WIP] feat(sort-shuffle): accept Option<Partitioning> May 2, 2026
@andygrove andygrove marked this pull request as draft May 2, 2026 22:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant