Skip to content

feat(scheduler): broadcast-style hash join for small-side joins#1647

Open
andygrove wants to merge 22 commits intoapache:mainfrom
andygrove:feat/broadcast-join
Open

feat(scheduler): broadcast-style hash join for small-side joins#1647
andygrove wants to merge 22 commits intoapache:mainfrom
andygrove:feat/broadcast-join

Conversation

@andygrove
Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #.

Rationale for this change

Today, when DataFusion plans a hash join with a small side, Ballista's distributed planner relies on CoalescePartitionsExec to satisfy CollectLeft's single-partition requirement. That collapses build-side parallelism onto a single executor and creates a hot spot for both producing and serving the build shuffle file. To work around this, Ballista's default session sets DataFusion's hash_join_single_partition_threshold to 0, which avoids the hot spot but also gives up small-side optimization entirely.

This PR adds a true broadcast-style hash join. For small-side joins, the build subtree keeps its natural M-partition layout and writes M shuffle files; each downstream join task fetches all M files in parallel and builds its hash table locally. Build data flows executor-to-executor; the scheduler is not on the data path.

What changes are included in this PR?

  • New session config ballista.optimizer.broadcast_join_threshold_bytes (default 10 MB, 0 disables).
  • UnresolvedShuffleExec and ShuffleReaderExec get a broadcast flag and an explicit upstream_partition_count. Proto/codec round-trip the new fields.
  • ShuffleReaderExec::execute and partition_statistics aggregate across all upstream locations when broadcast.
  • DefaultDistributedPlanner adds:
    • maybe_promote_to_broadcast that promotes eligible HashJoinExec(Partitioned) to CollectLeft when one side is under the threshold.
    • A HashJoinExec(CollectLeft) lowering branch that strips CoalescePartitionsExec over the build, writes M-partition build shuffle, and inserts a broadcast UnresolvedShuffleExec.
    • A broadcast branch in remove_unresolved_shuffles that flattens upstream locations.
    • A broadcast branch in rollback_resolved_shuffles that preserves broadcast and upstream_partition_count on retry.
  • Unit tests in the scheduler planner (broadcast on / off, M>1 build).
  • Codec round-trip tests for the broadcast variants.
  • End-to-end correctness check in examples/.

AQE is deferred: the AdaptivePlanner does not yet apply this lowering. The hash_join_single_partition_threshold zero workaround is left in place.

Are there any user-facing changes?

Yes. New session config ballista.optimizer.broadcast_join_threshold_bytes (default 10 MB). When the build side of a hash join fits under the threshold, the executor task topology changes: build side stays distributed and probe tasks fetch all build files in parallel. No SQL semantics change.

andygrove added 20 commits May 2, 2026 08:07
…cations

Make remove_unresolved_shuffles flatten all M upstream partition locations
into a single broadcast ShuffleReaderExec, and fix rollback_resolved_shuffles
to preserve the broadcast flag and upstream_partition_count when reverting
a ShuffleReaderExec back to UnresolvedShuffleExec.
Add `ballista_broadcast_join_threshold_bytes` getter and
`with_ballista_broadcast_join_threshold_bytes` setter to `SessionConfigExt`,
following the same pattern as `with_ballista_standalone_parallelism`. Update
the standalone-broadcast-join example to use the new idiomatic API instead
of the raw `set_usize` call.
Revert BallistaConfig::with_settings to private, replace the four
planner tests that constructed settings maps manually with the typed
with_ballista_broadcast_join_threshold_bytes helper. Extract a shared
make_broadcast_test_ctx helper to eliminate duplication across the
three in-memory broadcast tests. Drop the explicit threshold=0 opt-out
from distributed_join_plan since CSV sources carry no byte stats.
@milenkovicm
Copy link
Copy Markdown
Contributor

milenkovicm commented May 2, 2026

Would it make sense to use DataFusion configuration for collect left ?

if not mistaken:

  • datafusion.optimizer.hash_join_single_partition_threshold
  • datafusion.optimizer.hash_join_single_partition_threshold_rows

@milenkovicm
Copy link
Copy Markdown
Contributor

i wonder if this is going to fix apache/datafusion#17494

@Dandandan
Copy link
Copy Markdown
Contributor

I found many of the TPC-DS issues are caused by wrong data types (so it will miss cardinality estimates and have wrong order).

apache/datafusion-benchmarks#31 should address this.

@andygrove
Copy link
Copy Markdown
Member Author

I plan on running benchmarks at scale tomorrow to see if this PR improves things. I cannot even run TPC-H q2 @ 1 TB with the current join.

@milenkovicm
Copy link
Copy Markdown
Contributor

milenkovicm commented May 3, 2026

I’m not sure the current AQE implementation is suitable. It might eagerly repartition both sides of the join.

We’ll likely need a rule for optimising broadcast join replanning as stages progress and new statistics are gathered.

To support this, I believe we should step back from the current AQE implementation. Instead of eagerly splitting the entire plan into stages, we should focus on the independent, executable parts of the plan. I’ve considered this as an alternative to the current approach but now I feel we have a compelling reason to do it.

This should simplify design making planning and reoptimization simpler

@milenkovicm
Copy link
Copy Markdown
Contributor

milenkovicm commented May 3, 2026

I’m not sure the current AQE implementation is suitable. It might eagerly repartition both sides of the join.

We’ll likely need a rule for optimising broadcast join replanning as stages progress and new statistics are gathered.

To support this, I believe we should step back from the current AQE implementation. Instead of eagerly splitting the entire plan into stages, we should focus on the independent, executable parts of the plan. I’ve considered this as an alternative to the current approach but now I feel we have a compelling reason to do it.

This should simplify design making planning and reoptimization simpler

updated AQE in #1649

@andygrove
Copy link
Copy Markdown
Member Author

@milenkovicm @Dandandan I can now run q2 with this PR! Will do more testing and push some minor updates soon

…s absent for broadcast promotion

When tables are registered via CREATE EXTERNAL TABLE on S3, DataFusion
often does not populate total_byte_size statistics. This caused
maybe_promote_to_broadcast to skip all promotions since it only checked
total_byte_size. Now falls back to estimating byte size from num_rows
and schema column types when total_byte_size is unavailable.

Also adds diagnostic logging to maybe_promote_to_broadcast to aid
debugging broadcast join decisions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@andygrove andygrove marked this pull request as ready for review May 3, 2026 15:51
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@andygrove
Copy link
Copy Markdown
Member Author

andygrove commented May 3, 2026

Trying full suite now with 8 executors x 8 cores. I will keep updating this comment as more results come in.

Running TPCH-1 (2 iteration(s))...
  iteration 1: 29.516s
  iteration 2: 23.523s
Running TPCH-2 (2 iteration(s))...
  iteration 1: 40.169s
  iteration 2: 39.397s
Running TPCH-3 (2 iteration(s))...
  iteration 1: 54.692s
  iteration 2: 52.710s
Running TPCH-4 (2 iteration(s))...
  iteration 1: 33.524s
  iteration 2: 33.472s
Running TPCH-5 (2 iteration(s))...
Job gPiHrd1 failed: Job failed due to stage 10 failed: Task failed due to runtime execution error: Cancelled

  iteration 1: ERROR: DataFusion error: Arrow error: External error: Execution error: Job gPiHrd1 failed: Job failed due to stage 10 failed: Task failed due to runtime execution error: Cancelled

Traceback (most recent call last):
  File "/opt/ballista/ballista-tpch-runner.py", line 74, in run_benchmark
    elapsed = run_query(ctx, sql)
              ^^^^^^^^^^^^^^^^^^^
  File "/opt/ballista/ballista-tpch-runner.py", line 58, in run_query
    df._to_internal_df().collect()
Exception: DataFusion error: Arrow error: External error: Execution error: Job gPiHrd1 failed: Job failed due to stage 10 failed: Task failed due to runtime execution error: Cancelled

Running TPCH-6 (2 iteration(s))...
  iteration 1: 23.634s
  iteration 2: 19.931s
Running TPCH-7 (2 iteration(s))...
  iteration 1: 103.652s
  iteration 2: 180.954s
Running TPCH-8 (2 iteration(s))...
  iteration 1: 326.430s
  iteration 2: 364.488s

q9 got stuck so I had to kill the full run and start running queries individually with a fresh cluster each time

Running TPCH-10 (2 iteration(s))...
  iteration 1: 57.136s
  iteration 2: 59.242s
Running TPCH-11 (2 iteration(s))...
  iteration 1: 20.104s
  iteration 2: 22.321s
Running TPCH-12 (2 iteration(s))...
  iteration 1: 26.080s
  iteration 2: 65.110s

@andygrove
Copy link
Copy Markdown
Member Author

I'm out of time for this weekend - this is an improvement compared to main branch, because I can now run q2 to completion. There are still other issues to investigate.

@milenkovicm perhaps we can merge this and keep iterating?

@milenkovicm
Copy link
Copy Markdown
Contributor

thanks @andygrove, let me have a look, will merge it

Copy link
Copy Markdown
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andygrove its gret to get broadcast join finally, I'm not sure am I more excited for broadcast join introduction or use of AQE to implement it!

I have few suggestions:

  • with changes proposed in #1649 could we rely on datafusion rules to select CollectLeft joins? There is quite few lines of code doing that logic in this PR, I'm not sure if this is duplication of datafusion logic. Is there a case where datafusion logic does not follow proposed logic?
  • Join Reordering and other common datafusion physical rules should apply after each stage completion, we do not need to add them separately
  • current AQE architecture tries to make planning pluggable, there is a part of the code which might be extracted as a planner rule rather than part of core planner (that part of the planer needs a bit of simplification evan without this change)

I suggest we take a moment to consider splitting this PR into a planner rule. This will mainly help us assess the proposed AQE architecture, as you have proven broadcast join feasibility.

info!("planning query stages for job {job_id}");
let (new_plan, mut stages) =
self.plan_query_stages_internal(job_id, execution_plan, config)?;
let broadcast_threshold = config
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use atafusion.optimizer.hash_join_single_partition_threshold and/or datafusion.optimizer.hash_join_single_partition_threshold_rows instead of bringing new config value?

return Ok((execution_plan, vec![]));
}

// Broadcast-join lowering: HashJoinExec(CollectLeft) gets its own
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AQE planner has a set of optimizers which we could add or remove, they capture isolated planning rule, i have a feeling that this should be a additional rule

job_id: &'a str,
execution_plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
broadcast_threshold_bytes: usize,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a config as parameter, broadcast should be part of it

/// `HashJoinExec(CollectLeft)` (with a swap if the small side was on
/// the right) wrapped so the build subtree is a single-partition input.
/// Otherwise returns the input unchanged.
fn maybe_promote_to_broadcast(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be captured with current datafusion rules, should it? Also current planners will do join side swap

use std::time::{SystemTime, UNIX_EPOCH};
use std::vec;

// TODO: the AQE planner does not yet apply the broadcast-join lowering
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still relevant?

/// (which holds the flattened concatenation of all upstream partition
/// locations) regardless of the partition index. Used for the
/// distributed broadcast hash-join lowering.
pub broadcast: bool,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may need this property in ExchangeExec

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants