Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ jobs:
org.apache.comet.exec.CometWindowExecSuite
org.apache.comet.exec.CometJoinSuite
org.apache.comet.CometNativeSuite
org.apache.comet.CometSetOpWithGroupBySuite
org.apache.comet.CometSparkSessionExtensionsSuite
org.apache.spark.CometPluginsSuite
org.apache.spark.CometPluginsDefaultSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ jobs:
org.apache.comet.exec.CometWindowExecSuite
org.apache.comet.exec.CometJoinSuite
org.apache.comet.CometNativeSuite
org.apache.comet.CometSetOpWithGroupBySuite
org.apache.comet.CometSparkSessionExtensionsSuite
org.apache.spark.CometPluginsSuite
org.apache.spark.CometPluginsDefaultSuite
Expand Down
106 changes: 0 additions & 106 deletions dev/diffs/4.1.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -150,50 +150,6 @@ index 4410fe50912..43bcce2a038 100644
case _ => Map[String, String]()
}
val childrenInfo = children.flatMap {
diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out
index 69b4001ff34..6fda691652d 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/intersect-all.sql.out
@@ -1,7 +1,7 @@
-- Automatically generated by SQLQueryTestSuite
-- !query
CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
- (1, 2),
+ (1, 2),
(1, 2),
(1, 3),
(1, 3),
@@ -11,7 +11,7 @@ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
AS tab1(k, v)
-- !query analysis
CreateViewCommand `tab1`, SELECT * FROM VALUES
- (1, 2),
+ (1, 2),
(1, 2),
(1, 3),
(1, 3),
@@ -26,8 +26,8 @@ CreateViewCommand `tab1`, SELECT * FROM VALUES

-- !query
CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
- (1, 2),
- (1, 2),
+ (1, 2),
+ (1, 2),
(2, 3),
(3, 4),
(null, null),
@@ -35,8 +35,8 @@ CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
AS tab2(k, v)
-- !query analysis
CreateViewCommand `tab2`, SELECT * FROM VALUES
- (1, 2),
- (1, 2),
+ (1, 2),
+ (1, 2),
(2, 3),
(3, 4),
(null, null),
diff --git a/sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql b/sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql
index 13bbd9d81b7..541cdfb1e04 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql
Expand All @@ -211,18 +167,6 @@ index 13bbd9d81b7..541cdfb1e04 100644
CREATE TEMPORARY VIEW t AS SELECT 1.0 as a, 0.0 as b;

-- division, remainder and pmod by 0 return NULL
diff --git a/sql/core/src/test/resources/sql-tests/inputs/except-all.sql b/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
index e28f0721a64..788b43c242a 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/except-all.sql
@@ -1,3 +1,7 @@
+-- TODO(https://github.com/apache/datafusion-comet/issues/4122)
+-- EXCEPT ALL with GROUP BY returns incorrect results on Spark 4.1
+--SET spark.comet.enabled = false
+
CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
(0), (1), (2), (2), (2), (2), (3), (null), (null) AS tab1(c1);
CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
index 7aef901da4f..f3d6e18926d 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
Expand Down Expand Up @@ -280,32 +224,6 @@ index 35128da97fd..25b873ae859 100644
-- Positive test cases
-- Create a table with some testing data.
DROP TABLE IF EXISTS t1;
diff --git a/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql b/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
index 077caa5dd44..697457d4251 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/intersect-all.sql
@@ -1,5 +1,9 @@
+-- TODO(https://github.com/apache/datafusion-comet/issues/4122)
+-- INTERSECT ALL with GROUP BY returns incorrect results on Spark 4.1
+--SET spark.comet.enabled = false
+
CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
- (1, 2),
+ (1, 2),
(1, 2),
(1, 3),
(1, 3),
@@ -8,8 +12,8 @@ CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
(null, null)
AS tab1(k, v);
CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
- (1, 2),
- (1, 2),
+ (1, 2),
+ (1, 2),
(2, 3),
(3, 4),
(null, null),
diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
index 41fd4de2a09..162d5a817b6 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
Expand Down Expand Up @@ -428,30 +346,6 @@ index 21a3ce1e122..f4762ab98f0 100644
SET spark.sql.ansi.enabled = false;

-- In COMPENSATION views get invalidated if the type can't cast
diff --git a/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out b/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
index 44f95f225ab..361866fc298 100644
--- a/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out
@@ -1,7 +1,7 @@
-- Automatically generated by SQLQueryTestSuite
-- !query
CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES
- (1, 2),
+ (1, 2),
(1, 2),
(1, 3),
(1, 3),
@@ -17,8 +17,8 @@ struct<>

-- !query
CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES
- (1, 2),
- (1, 2),
+ (1, 2),
+ (1, 2),
(2, 3),
(3, 4),
(null, null),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 0d807aeae4d..6d7744e771b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
Expand Down
23 changes: 22 additions & 1 deletion spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1317,8 +1317,29 @@ case class CometUnionExec(
children: Seq[SparkPlan])
extends CometExec {

// CometExec's default outputPartitioning delegates to `originalPlan`, which captures the
// children that were live at CometExecRule conversion time. AQE post-stage rewrites
// (coalesce, skew join, etc.) later re-parent our `children` field but do not update
// `originalPlan`, so the partitioning read from the frozen snapshot can describe a
// pre-coalesce layout with more partitions than the RDDs will actually produce. Recompute
// from current children so SPARK-52921's union-output-partitioning inference is based on
// the live plan. Safe on older Spark too: UnionExec.outputPartitioning returns
// UnknownPartitioning when UNION_OUTPUT_PARTITIONING is off (the pre-4.1 default).
override def outputPartitioning: Partitioning = {
originalPlan.withNewChildren(children).outputPartitioning
}

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
sparkContext.union(children.map(_.executeColumnar()))
// Spark 4.1's UnionExec (SPARK-52921) can report a non-trivial output partitioning when all
// children share the same hash/single partitioning, and downstream plans may skip an
// otherwise-required shuffle in response. Plain `sparkContext.union` concatenates partitions
// (so partition i of the result holds only one child's partition i), which violates that
// partitioning claim and silently corrupts aggregates layered above the union. The shim
// routes through SQLPartitioningAwareUnionRDD on 4.1+ when a known partitioning is declared.
shims.ShimCometUnionExec.unionRDDs(
sparkContext,
children.map(_.executeColumnar()),
outputPartitioning)
}

override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet.shims

import scala.reflect.ClassTag

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.plans.physical.Partitioning

object ShimCometUnionExec {

/**
* Unions a sequence of RDDs while preserving the declared output partitioning. Before Spark
* 4.1, [[org.apache.spark.sql.execution.UnionExec]] always reports
* [[org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning]], so this shim simply
* concatenates partitions via `SparkContext.union`. The partitioning-aware path is only needed
* on Spark 4.1+ (see SPARK-52921).
*/
def unionRDDs[T: ClassTag](
sc: SparkContext,
rdds: Seq[RDD[T]],
@annotation.nowarn("cat=unused") outputPartitioning: Partitioning): RDD[T] = {
sc.union(rdds)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet.shims

import scala.reflect.ClassTag

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.plans.physical.Partitioning

object ShimCometUnionExec {

/**
* Unions a sequence of RDDs while preserving the declared output partitioning. Before Spark
* 4.1, [[org.apache.spark.sql.execution.UnionExec]] always reports
* [[org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning]], so this shim simply
* concatenates partitions via `SparkContext.union`. The partitioning-aware path is only needed
* on Spark 4.1+ (see SPARK-52921).
*/
def unionRDDs[T: ClassTag](
sc: SparkContext,
rdds: Seq[RDD[T]],
@annotation.nowarn("cat=unused") outputPartitioning: Partitioning): RDD[T] = {
sc.union(rdds)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet.shims

import scala.reflect.ClassTag

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.plans.physical.Partitioning

object ShimCometUnionExec {

/**
* Unions a sequence of RDDs while preserving the declared output partitioning. Before Spark
* 4.1, [[org.apache.spark.sql.execution.UnionExec]] always reports
* [[org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning]], so this shim simply
* concatenates partitions via `SparkContext.union`. The partitioning-aware path is only needed
* on Spark 4.1+ (see SPARK-52921).
*/
def unionRDDs[T: ClassTag](
sc: SparkContext,
rdds: Seq[RDD[T]],
@annotation.nowarn("cat=unused") outputPartitioning: Partitioning): RDD[T] = {
sc.union(rdds)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet.shims

import scala.reflect.ClassTag

import org.apache.spark.SparkContext
import org.apache.spark.rdd.{RDD, SQLPartitioningAwareUnionRDD}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}

object ShimCometUnionExec {

/**
* Unions a sequence of RDDs while preserving the declared output partitioning. Spark 4.1
* introduced [[org.apache.spark.sql.internal.SQLConf.UNION_OUTPUT_PARTITIONING]] (SPARK-52921),
* which lets [[org.apache.spark.sql.execution.UnionExec]] report a non-trivial output
* partitioning when all children share the same partitioning. Downstream operators may then
* skip an otherwise-required shuffle, so the columnar Union path must honor that contract by
* routing through [[SQLPartitioningAwareUnionRDD]] rather than plain `SparkContext.union`,
* which concatenates partitions and breaks the partitioning invariant.
*/
def unionRDDs[T: ClassTag](
sc: SparkContext,
rdds: Seq[RDD[T]],
outputPartitioning: Partitioning): RDD[T] = {
outputPartitioning match {
case _: UnknownPartitioning => sc.union(rdds)
case _ =>
val numPartitions = outputPartitioning.numPartitions
val nonEmpty = rdds.filter(_.partitions.nonEmpty)
// SQLPartitioningAwareUnionRDD indexes every child at every output partition, so any
// child whose partition count diverges from the declared numPartitions would raise
// ArrayIndexOutOfBoundsException. That would only happen if the declared partitioning
// is stale relative to the RDDs (e.g. children were coalesced by AQE but the reported
// partitioning was not). Fall back to plain concat in that case.
if (nonEmpty.isEmpty || nonEmpty.exists(_.partitions.length != numPartitions)) {
sc.union(rdds)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if _.partitions.length != numPartitions fires, then we should probably log a warning message.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

} else {
new SQLPartitioningAwareUnionRDD(sc, nonEmpty, numPartitions)
}
}
}
}
Loading
Loading