From bc22380a21286a68566b742c6268669bab3ed8ce Mon Sep 17 00:00:00 2001 From: Sergey Nuyanzin Date: Wed, 13 May 2026 19:38:16 +0200 Subject: [PATCH] [FLINK-39763][table] Allow inferring constants with `IS NOT DISTINCT FROM` --- .../java/org/apache/calcite/rex/RexUtil.java | 9 +- .../batch/sql/DynamicFunctionPlanTest.xml | 9 +- .../plan/batch/sql/RemoveShuffleTest.xml | 54 ++++---- .../batch/sql/join/BroadcastHashJoinTest.xml | 85 ++++-------- .../batch/sql/join/NestedLoopJoinTest.xml | 49 +++---- .../batch/sql/join/ShuffledHashJoinTest.xml | 127 +++++++----------- .../plan/batch/sql/join/SortMergeJoinTest.xml | 127 +++++++----------- .../planner/plan/optimize/ScanReuseTest.xml | 26 ++-- .../planner/plan/stream/sql/join/JoinTest.xml | 86 ++++++------ .../sql/join/BroadcastHashJoinTest.scala | 7 + .../batch/sql/join/ShuffledHashJoinTest.scala | 7 + .../batch/sql/join/SortMergeJoinTest.scala | 6 + .../runtime/batch/table/DecimalITCase.scala | 20 --- 13 files changed, 259 insertions(+), 353 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rex/RexUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rex/RexUtil.java index bddc6a15239ad..a629af209bc6e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rex/RexUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rex/RexUtil.java @@ -79,10 +79,7 @@ * because of current Calcite way of inferring constants from IS NOT DISTINCT FROM clashes with * filter push down. * - *

Lines 402 ~ 404, Use Calcite 1.32.0 behavior for {@link RexUtil#gatherConstraints(Class, - * RexNode, Map, Set, RexBuilder)}. - * - *

FLINK modifications (backport of CALCITE-6764): Line 2481~2485 + *

FLINK modifications (backport of CALCITE-6764): Line 2489~2494 */ public class RexUtil { @@ -401,9 +398,7 @@ private static void gatherConstraints( final RexNode right; switch (predicate.getKind()) { case EQUALS: - // FLINK BEGIN MODIFICATION - // case IS_NOT_DISTINCT_FROM: - // FLINK END MODIFICATION + case IS_NOT_DISTINCT_FROM: left = ((RexCall) predicate).getOperands().get(0); right = ((RexCall) predicate).getOperands().get(1); break; diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFunctionPlanTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFunctionPlanTest.xml index c4442ef5dde99..041111d053893 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFunctionPlanTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFunctionPlanTest.xml @@ -60,10 +60,11 @@ LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)], EXPR$3=[COUNT()]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml index be2b0e58f2905..f08d5635de70e 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml @@ -490,19 +490,20 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], a0=[$6], b0=[$7], @@ -1158,19 +1159,20 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], a0=[$6], b0=[$7], diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml index 897e52cf7f783..dc0d9c3e721b3 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml @@ -138,14 +138,15 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3]) - - - - - - - - - - - @@ -311,14 +272,15 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3]) @@ -280,43 +282,6 @@ Calc(select=[c, g]) +- Exchange(distribution=[hash[e, d]]) +- Calc(select=[d, e, g]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[d, e, f, g, h]) -]]> - - - - - - - - - - - @@ -416,20 +381,22 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3]) @@ -656,20 +623,22 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml index 249f17770908a..b125f43544167 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml @@ -241,20 +241,22 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3]) @@ -280,43 +282,6 @@ Calc(select=[c, g]) +- Exchange(distribution=[hash[e, d]]) +- Calc(select=[d, e, g]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[d, e, f, g, h]) -]]> - - - - - - - - - - - @@ -416,20 +381,22 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3]) @@ -656,20 +623,22 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml index 0e508ef7691de..aa0a43e59b279 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml @@ -517,12 +517,10 @@ LogicalProject(a=[$0], c=[$2], c0=[$9]) @@ -767,10 +765,10 @@ LogicalProject(a=[$0], b=[$1], c=[$9]) @@ -467,15 +469,15 @@ LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3]) @@ -1423,18 +1427,20 @@ LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3]) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.scala index ec3ada5a2fb6f..fd3bc8c2b1816 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.scala @@ -124,4 +124,11 @@ class BroadcastHashJoinTest extends JoinTestBase { .hasMessageContaining("Cannot generate a valid execution plan for the given query") .isInstanceOf[TableException] } + + @Test + override def testInnerJoinWithJoinConditionPushDown(): Unit = { + assertThatThrownBy(() => super.testInnerJoinWithJoinConditionPushDown()) + .hasMessageContaining("Cannot generate a valid execution plan for the given query") + .isInstanceOf[TableException] + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.scala index e7ad20d02c49b..0370b2c4c0cc0 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.scala @@ -96,4 +96,11 @@ class ShuffledHashJoinTest extends JoinTestBase { .hasMessageContaining("Cannot generate a valid execution plan for the given query") .isInstanceOf[TableException] } + + @Test + override def testInnerJoinWithJoinConditionPushDown(): Unit = { + assertThatThrownBy(() => super.testInnerJoinWithJoinConditionPushDown()) + .hasMessageContaining("Cannot generate a valid execution plan for the given query") + .isInstanceOf[TableException] + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.scala index e10732166ee5e..dffbe8e8b8223 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.scala @@ -94,4 +94,10 @@ class SortMergeJoinTest extends JoinTestBase { .isInstanceOf[TableException] } + @Test + override def testInnerJoinWithJoinConditionPushDown(): Unit = { + assertThatThrownBy(() => super.testInnerJoinWithJoinConditionPushDown()) + .hasMessageContaining("Cannot generate a valid execution plan for the given query") + .isInstanceOf[TableException] + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala index d0e50a00da6ec..be8b8d791d042 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala @@ -179,9 +179,6 @@ class DecimalITCase extends BatchTestBase { @Test def testJoin1(): Unit = { - tEnv.getConfig - .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin") - checkQuery( Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE), s1r(d"1", d"1", 1, 1.0), @@ -193,9 +190,6 @@ class DecimalITCase extends BatchTestBase { @Test def testJoin2(): Unit = { - tEnv.getConfig - .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin") - checkQuery( Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE), s1r(d"1", d"1", 1, 1.0), @@ -207,9 +201,6 @@ class DecimalITCase extends BatchTestBase { @Test def testJoin3(): Unit = { - tEnv.getConfig - .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin") - checkQuery( Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE), s1r(d"1", d"1", 1, 1.0), @@ -222,9 +213,6 @@ class DecimalITCase extends BatchTestBase { @Test def testJoin4(): Unit = { - tEnv.getConfig - .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin") - checkQuery( Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE), s1r(d"1", d"1", 1, 1.0), @@ -236,9 +224,6 @@ class DecimalITCase extends BatchTestBase { @Test def testJoin5(): Unit = { - tEnv.getConfig - .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin") - checkQuery( Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE), s1r(d"1", d"1", 1, 1.0), @@ -250,9 +235,6 @@ class DecimalITCase extends BatchTestBase { @Test def testJoin6(): Unit = { - tEnv.getConfig - .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin") - checkQuery( Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE), s1r(d"1", d"1", 1, 1.0), @@ -264,8 +246,6 @@ class DecimalITCase extends BatchTestBase { @Test def testJoin7(): Unit = { - tEnv.getConfig - .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin") checkQuery( Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE), s1r(d"1", d"1", 1, 1.0),