Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,7 @@
* because of current Calcite way of inferring constants from IS NOT DISTINCT FROM clashes with
* filter push down.
*
* <p>Lines 402 ~ 404, Use Calcite 1.32.0 behavior for {@link RexUtil#gatherConstraints(Class,
* RexNode, Map, Set, RexBuilder)}.
*
* <p>FLINK modifications (backport of CALCITE-6764): Line 2481~2485
* <p>FLINK modifications (backport of CALCITE-6764): Line 2489~2494
*/
public class RexUtil {

Expand Down Expand Up @@ -401,9 +398,7 @@ private static <C extends RexNode> void gatherConstraints(
final RexNode right;
switch (predicate.getKind()) {
case EQUALS:
// FLINK BEGIN MODIFICATION
// case IS_NOT_DISTINCT_FROM:
// FLINK END MODIFICATION
case IS_NOT_DISTINCT_FROM:
left = ((RexCall) predicate).getOperands().get(0);
right = ((RexCall) predicate).getOperands().get(1);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)], EXPR$3=[COUNT()])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
HashAggregate(isMerge=[false], groupBy=[cat, hh], select=[cat, hh, SUM(cnt) AS EXPR$2, COUNT(*) AS EXPR$3])
+- Exchange(distribution=[hash[cat, hh]])
+- Calc(select=[cat, SUBSTR(CAST(LOCALTIME() AS VARCHAR(2147483647)), 1, 2) AS hh, cnt], where=[(SUBSTR(CAST(ts AS VARCHAR(2147483647)), 1, 2) = SUBSTR(CAST(LOCALTIME() AS VARCHAR(2147483647)), 1, 2))])
+- TableSourceScan(table=[[default_catalog, default_database, src, filter=[], project=[cat, cnt, ts], metadata=[]]], fields=[cat, cnt, ts])
Calc(select=[cat, SUBSTR(CAST(LOCALTIME() AS VARCHAR(2147483647)), 1, 2) AS hh, EXPR$2, EXPR$3])
+- HashAggregate(isMerge=[false], groupBy=[cat], select=[cat, SUM(cnt) AS EXPR$2, COUNT(*) AS EXPR$3])
+- Exchange(distribution=[hash[cat]])
+- Calc(select=[cat, cnt], where=[(SUBSTR(CAST(ts AS VARCHAR(2147483647)), 1, 2) = SUBSTR(CAST(LOCALTIME() AS VARCHAR(2147483647)), 1, 2))])
+- TableSourceScan(table=[[default_catalog, default_database, src, filter=[], project=[cat, cnt, ts], metadata=[]]], fields=[cat, cnt, ts])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,19 +490,20 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], a0=[$6], b0=[$7],
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(a = d0)], select=[a, b, c, d, e, f, a0, b0, c0, d0, e0, f0], build=[left])\n:- [#1] Exchange(distribution=[hash[a]])\n+- HashJoin(joinType=[RightOuterJoin], where=[(a = d)], select=[a, b, c, d, e, f], build=[right])\n :- [#2] Exchange(distribution=[hash[a]])\n +- [#3] Exchange(distribution=[hash[d]])\n])
:- Exchange(distribution=[hash[a]])
: +- HashJoin(joinType=[RightOuterJoin], where=[(a = d)], select=[a, b, c, d, e, f], build=[right])
: :- Exchange(distribution=[hash[a]])
: : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: +- Exchange(distribution=[hash[d]])
: +- Calc(select=[d, CAST(2 AS BIGINT) AS e, f], where=[(e = 2)])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
:- Exchange(distribution=[hash[a]])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[d]])
+- Calc(select=[d, CAST(2 AS BIGINT) AS e, f], where=[(e = 2)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
Calc(select=[a, b, c, d, CAST(2 AS BIGINT) AS e, f, a0, b0, c0, d0, CAST(2 AS BIGINT) AS e0, f0])
+- MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(a = d0)], select=[a, b, c, d, f, a0, b0, c0, d0, f0], build=[left])\n:- [#1] Exchange(distribution=[hash[a]])\n+- HashJoin(joinType=[RightOuterJoin], where=[(a = d)], select=[a, b, c, d, f], build=[right])\n :- [#2] Exchange(distribution=[hash[a]])\n +- [#3] Exchange(distribution=[hash[d]])\n])
:- Exchange(distribution=[hash[a]])
: +- HashJoin(joinType=[RightOuterJoin], where=[(a = d)], select=[a, b, c, d, f], build=[right])
: :- Exchange(distribution=[hash[a]])
: : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: +- Exchange(distribution=[hash[d]])
: +- Calc(select=[d, f], where=[(e = 2)])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
:- Exchange(distribution=[hash[a]])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[d]])
+- Calc(select=[d, f], where=[(e = 2)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -1158,19 +1159,20 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], a0=[$6], b0=[$7],
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
MultipleInput(members=[\nSortMergeJoin(joinType=[InnerJoin], where=[(a = d0)], select=[a, b, c, d, e, f, a0, b0, c0, d0, e0, f0])\n:- [#1] Exchange(distribution=[hash[a]])\n+- SortMergeJoin(joinType=[RightOuterJoin], where=[(a = d)], select=[a, b, c, d, e, f])\n :- [#2] Exchange(distribution=[hash[a]])\n +- [#3] Exchange(distribution=[hash[d]])\n])
:- Exchange(distribution=[hash[a]])
: +- SortMergeJoin(joinType=[RightOuterJoin], where=[(a = d)], select=[a, b, c, d, e, f])
: :- Exchange(distribution=[hash[a]])
: : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: +- Exchange(distribution=[hash[d]])
: +- Calc(select=[d, CAST(2 AS BIGINT) AS e, f], where=[(e = 2)])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
:- Exchange(distribution=[hash[a]])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[d]])
+- Calc(select=[d, CAST(2 AS BIGINT) AS e, f], where=[(e = 2)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
Calc(select=[a, b, c, d, CAST(2 AS BIGINT) AS e, f, a0, b0, c0, d0, CAST(2 AS BIGINT) AS e0, f0])
+- MultipleInput(members=[\nSortMergeJoin(joinType=[InnerJoin], where=[(a = d0)], select=[a, b, c, d, f, a0, b0, c0, d0, f0])\n:- [#1] Exchange(distribution=[hash[a]])\n+- SortMergeJoin(joinType=[RightOuterJoin], where=[(a = d)], select=[a, b, c, d, f])\n :- [#2] Exchange(distribution=[hash[a]])\n +- [#3] Exchange(distribution=[hash[d]])\n])
:- Exchange(distribution=[hash[a]])
: +- SortMergeJoin(joinType=[RightOuterJoin], where=[(a = d)], select=[a, b, c, d, f])
: :- Exchange(distribution=[hash[a]])
: : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
: +- Exchange(distribution=[hash[d]])
: +- Calc(select=[d, f], where=[(e = 2)])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
:- Exchange(distribution=[hash[a]])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[d]])
+- Calc(select=[d, f], where=[(e = 2)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,15 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, b, CAST(2 AS INTEGER) AS d, e])
+- MultipleInput(readOrder=[0,1], members=[\nHashJoin(joinType=[InnerJoin], where=[((a = d) AND (b = e))], select=[a, b, d, e], isBroadcast=[true], build=[right])\n:- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count$0) AS b])\n: +- [#2] Exchange(distribution=[hash[a]])\n+- [#1] Exchange(distribution=[broadcast])\n])
Calc(select=[CAST(2 AS INTEGER) AS a, b, CAST(2 AS INTEGER) AS d, e])
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the constant pulled up

+- MultipleInput(readOrder=[0,1], members=[\nHashJoin(joinType=[InnerJoin], where=[(b = e)], select=[b, e], isBroadcast=[true], build=[right])\n:- Calc(select=[b])\n: +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count$0) AS b])\n: +- [#2] Exchange(distribution=[hash[a]])\n+- [#1] Exchange(distribution=[broadcast])\n])
:- Exchange(distribution=[broadcast])
: +- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS e])
: +- Exchange(distribution=[hash[d]])
: +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS count$0])
: +- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[d, e, f, g, h])
: +- Calc(select=[e])
: +- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS e])
: +- Exchange(distribution=[hash[d]])
: +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS count$0])
: +- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[d, e, f, g, h])
+- Exchange(distribution=[hash[a]])
+- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(b) AS count$0])
+- Calc(select=[CAST(2 AS INTEGER) AS a, b], where=[(a = 2)])
Expand Down Expand Up @@ -174,46 +175,6 @@ Calc(select=[c, g])
+- Exchange(distribution=[broadcast])
+- Calc(select=[d, e, g])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[d, e, f, g, h])
]]>
</Resource>
</TestCase>
<TestCase name="testInnerJoinWithJoinConditionPushDown">
<Resource name="sql">
<![CDATA[
SELECT * FROM
(select a, count(b) as b from MyTable1 group by a)
join
(select d, count(e) as e from MyTable2 group by d)
on a = d and b = e and d = 2 and b = 1
]]>
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here and for other joins (except for NestedLoop)
now Calcite is able to understand that a == 2 and e == 1
after that it is able to push down these values to corresponding table sources
as a result non nested loop join will fail as there is only true in join condition.

when all joins are available then NestedLoop will be selected, there is also e2e case in DecimalITCase in this PR

</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
+- LogicalJoin(condition=[AND(=($0, $2), =($1, $3), =($2, 2), =($1, 1))], joinType=[inner])
:- LogicalAggregate(group=[{0}], b=[COUNT($1)])
: +- LogicalProject(a=[$0], b=[$1])
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+- LogicalAggregate(group=[{0}], e=[COUNT($1)])
+- LogicalProject(d=[$0], e=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, 1 AS b, CAST(2 AS INTEGER) AS d, 1 AS e])
+- MultipleInput(readOrder=[0,1], members=[\nHashJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, d], isBroadcast=[true], build=[right])\n:- Calc(select=[a], where=[(b = 1)])\n: +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count$0) AS b])\n: +- [#2] Exchange(distribution=[hash[a]])\n+- [#1] Exchange(distribution=[broadcast])\n])
:- Exchange(distribution=[broadcast])
: +- Calc(select=[d], where=[(e = 1)])
: +- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS e])
: +- Exchange(distribution=[hash[d]])
: +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS count$0])
: +- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[d, e, f, g, h])
+- Exchange(distribution=[hash[a]])
+- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(b) AS count$0])
+- Calc(select=[CAST(2 AS INTEGER) AS a, b], where=[(a = 2)])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -311,14 +272,15 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[CAST(2 AS INTEGER) AS a, b, d, CAST(e AS BIGINT) AS e])
+- MultipleInput(readOrder=[0,1], members=[\nHashJoin(joinType=[InnerJoin], where=[((a = d) AND (b = e))], select=[a, b, d, e], isBroadcast=[true], build=[right])\n:- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count$0) AS b])\n: +- [#2] Exchange(distribution=[hash[a]])\n+- [#1] Exchange(distribution=[broadcast])\n])
Calc(select=[CAST(2 AS INTEGER) AS a, b, CAST(2 AS INTEGER) AS d, CAST(e AS BIGINT) AS e])
+- MultipleInput(readOrder=[0,1], members=[\nHashJoin(joinType=[InnerJoin], where=[(b = e)], select=[b, e], isBroadcast=[true], build=[right])\n:- Calc(select=[b])\n: +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count$0) AS b])\n: +- [#2] Exchange(distribution=[hash[a]])\n+- [#1] Exchange(distribution=[broadcast])\n])
:- Exchange(distribution=[broadcast])
: +- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS e])
: +- Exchange(distribution=[hash[d]])
: +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS count$0])
: +- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[d, e, f, g, h])
: +- Calc(select=[e])
: +- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS e])
: +- Exchange(distribution=[hash[d]])
: +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS count$0])
: +- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[d, e, f, g, h])
+- Exchange(distribution=[hash[a]])
+- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(b) AS count$0])
+- Calc(select=[CAST(2 AS INTEGER) AS a, b], where=[(a = 2)])
Expand Down Expand Up @@ -547,14 +509,15 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, CAST(b AS BIGINT) AS b, CAST(2 AS INTEGER) AS d, e])
+- MultipleInput(readOrder=[0,1], members=[\nHashJoin(joinType=[InnerJoin], where=[((a = d) AND (b = e))], select=[a, b, d, e], isBroadcast=[true], build=[right])\n:- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count$0) AS b])\n: +- [#2] Exchange(distribution=[hash[a]])\n+- [#1] Exchange(distribution=[broadcast])\n])
Calc(select=[CAST(2 AS INTEGER) AS a, CAST(b AS BIGINT) AS b, CAST(2 AS INTEGER) AS d, e])
+- MultipleInput(readOrder=[0,1], members=[\nHashJoin(joinType=[InnerJoin], where=[(b = e)], select=[b, e], isBroadcast=[true], build=[right])\n:- Calc(select=[b])\n: +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count$0) AS b])\n: +- [#2] Exchange(distribution=[hash[a]])\n+- [#1] Exchange(distribution=[broadcast])\n])
:- Exchange(distribution=[broadcast])
: +- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS e])
: +- Exchange(distribution=[hash[d]])
: +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS count$0])
: +- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[d, e, f, g, h])
: +- Calc(select=[e])
: +- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS e])
: +- Exchange(distribution=[hash[d]])
: +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(e) AS count$0])
: +- Calc(select=[CAST(2 AS INTEGER) AS d, e], where=[(d = 2)])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[d, e, f, g, h])
+- Exchange(distribution=[hash[a]])
+- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(b) AS count$0])
+- Calc(select=[CAST(2 AS INTEGER) AS a, b], where=[(a = 2)])
Expand Down
Loading