Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ jobs:
org.apache.comet.exec.CometWindowExecSuite
org.apache.comet.exec.CometJoinSuite
org.apache.comet.CometNativeSuite
org.apache.comet.CometSetOpWithGroupBySuite
org.apache.comet.CometSparkSessionExtensionsSuite
org.apache.spark.CometPluginsSuite
org.apache.spark.CometPluginsDefaultSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ jobs:
org.apache.comet.exec.CometWindowExecSuite
org.apache.comet.exec.CometJoinSuite
org.apache.comet.CometNativeSuite
org.apache.comet.CometSetOpWithGroupBySuite
org.apache.comet.CometSparkSessionExtensionsSuite
org.apache.spark.CometPluginsSuite
org.apache.spark.CometPluginsDefaultSuite
Expand Down
106 changes: 0 additions & 106 deletions dev/diffs/4.1.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -150,50 +150,6 @@ index 4410fe50912..43bcce2a038 100644
case _ => Map[String, String]()
}
val childrenInfo = children.flatMap {
diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out
index 69b4001ff34..6fda691652d 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out
@@ -1,7 +1,7 @@
-- Automatically generated by SQLQueryTestSuite
-- !query
CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
- (1, 2),
+ (1, 2),
(1, 2),
(1, 3),
(1, 3),
@@ -11,7 +11,7 @@ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
AS tab1(k, v)
-- !query analysis
CreateViewCommand `tab1`, SELECT * FROM VALUES
- (1, 2),
+ (1, 2),
(1, 2),
(1, 3),
(1, 3),
@@ -26,8 +26,8 @@ CreateViewCommand `tab1`, SELECT * FROM VALUES

-- !query
CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
- (1, 2),
- (1, 2),
+ (1, 2),
+ (1, 2),
(2, 3),
(3, 4),
(null, null),
@@ -35,8 +35,8 @@ CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
AS tab2(k, v)
-- !query analysis
CreateViewCommand `tab2`, SELECT * FROM VALUES
- (1, 2),
- (1, 2),
+ (1, 2),
+ (1, 2),
(2, 3),
(3, 4),
(null, null),
diff --git a/sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql b/sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql
index 13bbd9d81b7..541cdfb1e04 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql
Expand All @@ -211,18 +167,6 @@ index 13bbd9d81b7..541cdfb1e04 100644
CREATE TEMPORARY VIEW t AS SELECT 1.0 as a, 0.0 as b;

-- division, remainder and pmod by 0 return NULL
diff --git a/sql/core/src/test/resources/sql-tests/inputs/except-all.sql b/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
index e28f0721a64..788b43c242a 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
@@ -1,3 +1,7 @@
+-- TODO(https://github.com/apache/datafusion-comet/issues/4122)
+-- EXCEPT ALL with GROUP BY returns incorrect results on Spark 4.1
+--SET spark.comet.enabled = false
+
CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
(0), (1), (2), (2), (2), (2), (3), (null), (null) AS tab1(c1);
CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
index 7aef901da4f..f3d6e18926d 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
Expand Down Expand Up @@ -280,32 +224,6 @@ index 35128da97fd..25b873ae859 100644
-- Positive test cases
-- Create a table with some testing data.
DROP TABLE IF EXISTS t1;
diff --git a/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql b/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
index 077caa5dd44..697457d4251 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
@@ -1,5 +1,9 @@
+-- TODO(https://github.com/apache/datafusion-comet/issues/4122)
+-- INTERSECT ALL with GROUP BY returns incorrect results on Spark 4.1
+--SET spark.comet.enabled = false
+
CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
- (1, 2),
+ (1, 2),
(1, 2),
(1, 3),
(1, 3),
@@ -8,8 +12,8 @@ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
(null, null)
AS tab1(k, v);
CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
- (1, 2),
- (1, 2),
+ (1, 2),
+ (1, 2),
(2, 3),
(3, 4),
(null, null),
diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
index 41fd4de2a09..162d5a817b6 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
Expand Down Expand Up @@ -428,30 +346,6 @@ index 21a3ce1e122..f4762ab98f0 100644
SET spark.sql.ansi.enabled = false;

-- In COMPENSATION views get invalidated if the type can't cast
diff --git a/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out b/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
index 44f95f225ab..361866fc298 100644
--- a/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
@@ -1,7 +1,7 @@
-- Automatically generated by SQLQueryTestSuite
-- !query
CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
- (1, 2),
+ (1, 2),
(1, 2),
(1, 3),
(1, 3),
@@ -17,8 +17,8 @@ struct<>

-- !query
CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
- (1, 2),
- (1, 2),
+ (1, 2),
+ (1, 2),
(2, 3),
(3, 4),
(null, null),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 0d807aeae4d..6d7744e771b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
Expand Down
23 changes: 22 additions & 1 deletion spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1317,8 +1317,29 @@ case class CometUnionExec(
children: Seq[SparkPlan])
extends CometExec {

// CometExec's default outputPartitioning delegates to `originalPlan`, which captures the
// children that were live at CometExecRule conversion time. AQE post-stage rewrites
// (coalesce, skew join, etc.) later re-parent our `children` field but do not update
// `originalPlan`, so the partitioning read from the frozen snapshot can describe a
// pre-coalesce layout with more partitions than the RDDs will actually produce. Recompute
// from current children so SPARK-52921's union-output-partitioning inference is based on
// the live plan. Safe on older Spark too: UnionExec.outputPartitioning returns
// UnknownPartitioning when UNION_OUTPUT_PARTITIONING is off (the pre-4.1 default).
override def outputPartitioning: Partitioning = {
originalPlan.withNewChildren(children).outputPartitioning
}

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
sparkContext.union(children.map(_.executeColumnar()))
// Spark 4.1's UnionExec (SPARK-52921) can report a non-trivial output partitioning when all
// children share the same hash/single partitioning, and downstream plans may skip an
// otherwise-required shuffle in response. Plain `sparkContext.union` concatenates partitions
// (so partition i of the result holds only one child's partition i), which violates that
// partitioning claim and silently corrupts aggregates layered above the union. The shim
// routes through SQLPartitioningAwareUnionRDD on 4.1+ when a known partitioning is declared.
shims.ShimCometUnionExec.unionRDDs(
sparkContext,
children.map(_.executeColumnar()),
outputPartitioning)
}

override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet.shims

import scala.reflect.ClassTag

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.plans.physical.Partitioning

object ShimCometUnionExec {

/**
* Unions a sequence of RDDs while preserving the declared output partitioning. Before Spark
* 4.1, [[org.apache.spark.sql.execution.UnionExec]] always reports
* [[org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning]], so this shim simply
* concatenates partitions via `SparkContext.union`. The partitioning-aware path is only needed
* on Spark 4.1+ (see SPARK-52921).
*/
def unionRDDs[T: ClassTag](
sc: SparkContext,
rdds: Seq[RDD[T]],
@annotation.nowarn("cat=unused") outputPartitioning: Partitioning): RDD[T] = {
sc.union(rdds)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet.shims

import scala.reflect.ClassTag

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.plans.physical.Partitioning

object ShimCometUnionExec {

/**
* Unions a sequence of RDDs while preserving the declared output partitioning. Before Spark
* 4.1, [[org.apache.spark.sql.execution.UnionExec]] always reports
* [[org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning]], so this shim simply
* concatenates partitions via `SparkContext.union`. The partitioning-aware path is only needed
* on Spark 4.1+ (see SPARK-52921).
*/
def unionRDDs[T: ClassTag](
sc: SparkContext,
rdds: Seq[RDD[T]],
@annotation.nowarn("cat=unused") outputPartitioning: Partitioning): RDD[T] = {
sc.union(rdds)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet.shims

import scala.reflect.ClassTag

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.plans.physical.Partitioning

object ShimCometUnionExec {

/**
* Unions a sequence of RDDs while preserving the declared output partitioning. Before Spark
* 4.1, [[org.apache.spark.sql.execution.UnionExec]] always reports
* [[org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning]], so this shim simply
* concatenates partitions via `SparkContext.union`. The partitioning-aware path is only needed
* on Spark 4.1+ (see SPARK-52921).
*/
def unionRDDs[T: ClassTag](
sc: SparkContext,
rdds: Seq[RDD[T]],
@annotation.nowarn("cat=unused") outputPartitioning: Partitioning): RDD[T] = {
sc.union(rdds)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet.shims

import scala.reflect.ClassTag

import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{RDD, SQLPartitioningAwareUnionRDD}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}

object ShimCometUnionExec extends Logging {

/**
* Unions a sequence of RDDs while preserving the declared output partitioning. Spark 4.1
* introduced [[org.apache.spark.sql.internal.SQLConf.UNION_OUTPUT_PARTITIONING]] (SPARK-52921),
* which lets [[org.apache.spark.sql.execution.UnionExec]] report a non-trivial output
* partitioning when all children share the same partitioning. Downstream operators may then
* skip an otherwise-required shuffle, so the columnar Union path must honor that contract by
* routing through [[SQLPartitioningAwareUnionRDD]] rather than plain `SparkContext.union`,
* which concatenates partitions and breaks the partitioning invariant.
*/
def unionRDDs[T: ClassTag](
sc: SparkContext,
rdds: Seq[RDD[T]],
outputPartitioning: Partitioning): RDD[T] = {
outputPartitioning match {
case _: UnknownPartitioning => sc.union(rdds)
case _ =>
val numPartitions = outputPartitioning.numPartitions
val nonEmpty = rdds.filter(_.partitions.nonEmpty)
// SQLPartitioningAwareUnionRDD indexes every child at every output partition, so any
// child whose partition count diverges from the declared numPartitions would raise
// ArrayIndexOutOfBoundsException. That would only happen if the declared partitioning
// is stale relative to the RDDs (e.g. children were coalesced by AQE but the reported
// partitioning was not). Fall back to plain concat in that case.
if (nonEmpty.isEmpty || nonEmpty.exists(_.partitions.length != numPartitions)) {
val childCounts = rdds.map(_.partitions.length).mkString(", ")
logWarning(
s"CometUnionExec: child partition counts ($childCounts) do not match " +
s"declared output partitioning numPartitions=$numPartitions; " +
"falling back to SparkContext.union concat.")
sc.union(rdds)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if _.partitions.length != numPartitions fires, then we should probably log a warning message.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

} else {
new SQLPartitioningAwareUnionRDD(sc, nonEmpty, numPartitions)
}
}
}
}
Loading
Loading