Skip to content

fix: [Spark 4.1] preserve union output partitioning in CometUnionExec#4207

Open
andygrove wants to merge 5 commits intoapache:mainfrom
andygrove:issue-4122
Open

fix: [Spark 4.1] preserve union output partitioning in CometUnionExec#4207
andygrove wants to merge 5 commits intoapache:mainfrom
andygrove:issue-4122

Conversation

@andygrove
Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #4122.

Rationale for this change

On Spark 4.1, SPARK-52921 added UNION_OUTPUT_PARTITIONING: when all children of a UnionExec share the same hash/single partitioning, the union itself reports that same partitioning. Downstream operators (e.g. a final hash aggregate) then skip an otherwise-required shuffle, and Spark's row-based UnionExec.doExecute keeps the partitioning invariant by routing through SQLPartitioningAwareUnionRDD (each output partition unions partition i from every child).

CometUnionExec silently broke both halves of that contract:

  • doExecuteColumnar used sparkContext.union(...), which concatenates partitions — partition i of the output only holds partition i of a single child.
  • outputPartitioning delegated to the frozen originalPlan snapshot captured at CometExecRule time, so AQE's post-stage coalescing was invisible.

The result: EXCEPT ALL / INTERSECT ALL whose sides are themselves GROUP BY aggregates lost rows silently (e.g. EXCEPT ALL returning {2, 3} instead of {3}). Two Spark 4.1.1 SQLQueryTestSuite files (except-all.sql, intersect-all.sql) were disabled for Comet because of this.

What changes are included in this PR?

  • Override CometUnionExec.outputPartitioning to recompute from the live children rather than originalPlan.
  • Route doExecuteColumnar through a new ShimCometUnionExec.unionRDDs helper that uses SQLPartitioningAwareUnionRDD on Spark 4.1+ when a known partitioning is declared (with a partition-count sanity check and a safe fallback to plain concat), and retains sparkContext.union behavior on pre-4.1 Spark where UnionExec.outputPartitioning is always UnknownPartitioning.
  • Add CometSetOpWithGroupBySuite covering the two queries from the Spark SQL tests.
  • Remove the spark.comet.enabled = false guards at the top of except-all.sql and intersect-all.sql in dev/diffs/4.1.1.diff.

How are these changes tested?

  • New CometSetOpWithGroupBySuite passes on Spark 3.5 and Spark 4.1.1 profiles.
  • Existing CometExecSuite (246 tests) passes on Spark 3.5.

andygrove added 2 commits May 4, 2026 08:51
…apache#4122)

On Spark 4.1, SPARK-52921 (UNION_OUTPUT_PARTITIONING) lets UnionExec
report a non-trivial hash partitioning when all children share the same
partitioning, and downstream plans skip otherwise-required shuffles.
CometUnionExec was (a) concatenating partitions via `sparkContext.union`,
which breaks that partitioning contract, and (b) reading
`outputPartitioning` from the frozen `originalPlan` snapshot, so
post-AQE coalescing was invisible. The result was silent data-loss for
EXCEPT ALL / INTERSECT ALL where both sides are GROUP BY queries.

Override `outputPartitioning` to recompute from the live children, and
route `doExecuteColumnar` through SQLPartitioningAwareUnionRDD on 4.1+
via a new `ShimCometUnionExec` shim. Pre-4.1 shims preserve the
existing `sparkContext.union` behavior.
…ldens

apache#4122 removed the inputs/intersect-all.sql and inputs/except-all.sql
hunks from dev/diffs/4.1.1.diff but left two paired whitespace-trimming
hunks on analyzer-results/intersect-all.sql.out and
results/intersect-all.sql.out. The goldens came out trimmed while the
upstream .sql still had trailing spaces, so SQLQueryTestSuite echoed the
untrimmed SQL and failed to match the trimmed golden. Restore both .out
files to upstream by regenerating the diff.
// is stale relative to the RDDs (e.g. children were coalesced by AQE but the reported
// partitioning was not). Fall back to plain concat in that case.
if (nonEmpty.isEmpty || nonEmpty.exists(_.partitions.length != numPartitions)) {
sc.union(rdds)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if _.partitions.length != numPartitions fires, then we should probably log a warning message.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

val df = sql("""SELECT v FROM tab3 GROUP BY v
|EXCEPT ALL
|SELECT k FROM tab4 GROUP BY k""".stripMargin)
checkAnswer(df, Seq(Row(3)))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use checkSparkAnswerAndOperator ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query does have some operators that cannot be converted. I updated these tests to check for CometUnionExec though

val df = sql("""SELECT v FROM tab1 GROUP BY v
|INTERSECT ALL
|SELECT k FROM tab2 GROUP BY k""".stripMargin)
checkAnswer(df, Seq(Row(2), Row(3), Row(null)))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use checkSparkAnswerAndOperator here as well?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above

andygrove added 2 commits May 5, 2026 08:58
Log a warning when CometUnionExec falls back to plain SparkContext.union
because child partition counts diverge from the declared output
partitioning, so the unexpected state is observable.

Strengthen CometSetOpWithGroupBySuite by comparing results to vanilla
Spark via checkSparkAnswer and asserting CometUnionExec is present in
the executed plan, instead of asserting hardcoded row literals.
@andygrove
Copy link
Copy Markdown
Member Author

Could you take another look @parthchandra

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

EXCEPT ALL / INTERSECT ALL with GROUP BY return incorrect results on Spark 4.1.1

2 participants