From deb8fd43d9f6bca728bc8a7aff62254488ceb462 Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Thu, 21 May 2026 09:27:56 -0700 Subject: [PATCH 1/2] [FLINK-39726][table] Support USING CONNECTION clause in CREATE TABLE Add an optional 'USING CONNECTION ' clause to the CREATE TABLE grammar (FLIP-529), parsed between PARTITIONED BY and WITH. The parsed identifier is carried on SqlCreateTable and SqlCreateTableLike via a new optional connection field that round-trips through unparse. The clause is intentionally restricted to plain CREATE TABLE and CREATE TABLE ... LIKE; using it with CTAS / RTAS raises a parser error (usingConnectionWithAsUnsupported). --- .../src/main/codegen/includes/parserImpls.ftl | 12 +++ .../sql/parser/ddl/SqlReplaceTableAs.java | 1 + .../sql/parser/ddl/table/SqlCreateTable.java | 18 +++- .../parser/ddl/table/SqlCreateTableAs.java | 1 + .../parser/ddl/table/SqlCreateTableLike.java | 2 + .../sql/parser/utils/ParserResource.java | 4 + .../sql/parser/FlinkSqlParserImplTest.java | 87 +++++++++++++++++++ 7 files changed, 124 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index cf01aee2f7e9a..b3021950f12c9 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -1596,6 +1596,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) : SqlNodeList propertyList = SqlNodeList.EMPTY; SqlDistribution distribution = null; SqlNodeList partitionColumns = SqlNodeList.EMPTY; + SqlIdentifier connection = null; SqlParserPos pos = startPos; boolean isColumnsIdentifiersOnly = false; } @@ -1629,6 +1630,10 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) : partitionColumns = ParenthesizedSimpleIdentifierList() ] + [ + + connection = CompoundIdentifier() + ] [ propertyList = Properties() @@ -1652,6 +1657,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) : partitionColumns, watermark, comment, + connection, tableLike, isTemporary, ifNotExists); @@ -1660,6 +1666,11 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) : asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) { + if (connection != null) { + throw SqlUtil.newContextException( + pos, + ParserResource.RESOURCE.usingConnectionWithAsUnsupported()); + } if (replace) { return new SqlReplaceTableAs(startPos.plus(getPos()), tableName, @@ -1706,6 +1717,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) : partitionColumns, watermark, comment, + connection, isTemporary, ifNotExists); } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java index 943951f322402..db3cf838119ba 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java @@ -107,6 +107,7 @@ public SqlReplaceTableAs( partitionKeyList, watermark, comment, + null, isTemporary, ifNotExists, true); diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTable.java index 0da0686b66b7a..a8084acbf207b 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTable.java @@ -66,6 +66,8 @@ public class SqlCreateTable extends SqlCreateObject implements ExtendedSqlNode { private final SqlWatermark watermark; + private final SqlIdentifier connection; + public SqlCreateTable( SqlParserPos pos, SqlIdentifier tableName, @@ -76,6 +78,7 @@ public SqlCreateTable( SqlNodeList partitionKeyList, @Nullable SqlWatermark watermark, @Nullable SqlCharStringLiteral comment, + @Nullable SqlIdentifier connection, boolean isTemporary, boolean ifNotExists) { this( @@ -89,6 +92,7 @@ public SqlCreateTable( partitionKeyList, watermark, comment, + connection, isTemporary, ifNotExists, false); @@ -105,6 +109,7 @@ protected SqlCreateTable( SqlNodeList partitionKeyList, @Nullable SqlWatermark watermark, @Nullable SqlCharStringLiteral comment, + @Nullable SqlIdentifier connection, boolean isTemporary, boolean ifNotExists, boolean replace) { @@ -117,6 +122,7 @@ protected SqlCreateTable( this.partitionKeyList = requireNonNull(partitionKeyList, "partitionKeyList should not be null"); this.watermark = watermark; + this.connection = connection; } @Override @@ -128,7 +134,8 @@ protected SqlCreateTable( properties, partitionKeyList, watermark, - comment); + comment, + connection); } public SqlNodeList getColumnList() { @@ -151,6 +158,10 @@ public Optional getWatermark() { return Optional.ofNullable(watermark); } + public Optional getConnection() { + return Optional.ofNullable(connection); + } + @Override protected String getScope() { return "TABLE"; @@ -214,6 +225,11 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { SqlUnparseUtils.unparseComment(comment, true, writer, leftPrec, rightPrec); SqlUnparseUtils.unparseDistribution(distribution, writer, leftPrec, rightPrec); SqlUnparseUtils.unparsePartitionKeyList(partitionKeyList, writer, leftPrec, rightPrec); + if (connection != null) { + writer.newlineAndIndent(); + writer.keyword("USING CONNECTION"); + connection.unparse(writer, leftPrec, rightPrec); + } SqlUnparseUtils.unparseProperties(properties, writer, leftPrec, rightPrec); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableAs.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableAs.java index 3f897199a6976..631d37ea71729 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableAs.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableAs.java @@ -99,6 +99,7 @@ public SqlCreateTableAs( partitionKeyList, watermark, comment, + null, isTemporary, ifNotExists, false); diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableLike.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableLike.java index b9e99470940c3..b36b5ee983ec6 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableLike.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableLike.java @@ -86,6 +86,7 @@ public SqlCreateTableLike( SqlNodeList partitionKeyList, @Nullable SqlWatermark watermark, @Nullable SqlCharStringLiteral comment, + @Nullable SqlIdentifier connection, SqlTableLike tableLike, boolean isTemporary, boolean ifNotExists) { @@ -100,6 +101,7 @@ public SqlCreateTableLike( partitionKeyList, watermark, comment, + connection, isTemporary, ifNotExists, false); diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java index 5e1672865a498..d459ca28dc8fd 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java @@ -83,4 +83,8 @@ public interface ParserResource { @Resources.BaseMessage("DROP TEMPORARY MATERIALIZED TABLE is not supported.") Resources.ExInst dropTemporaryMaterializedTableUnsupported(); + + @Resources.BaseMessage( + "USING CONNECTION clause is not supported with CREATE TABLE AS SELECT or REPLACE TABLE AS SELECT statements.") + Resources.ExInst usingConnectionWithAsUnsupported(); } diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index 1932f2f4a90ca..298eccbd17712 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -1050,6 +1050,93 @@ void testCreateTable() { sql(sql).ok(expected); } + @Test + void testCreateTableUsingConnection() { + final String sql = + "CREATE TABLE orders (\n" + + " order_id INT,\n" + + " customer_id INT,\n" + + " amount DECIMAL(10, 2)\n" + + ") USING CONNECTION mycat.mydb.mysql_prod\n" + + "WITH (\n" + + " 'connector' = 'jdbc',\n" + + " 'tables' = 'orders'\n" + + ")"; + final String expected = + "CREATE TABLE `ORDERS` (\n" + + " `ORDER_ID` INTEGER,\n" + + " `CUSTOMER_ID` INTEGER,\n" + + " `AMOUNT` DECIMAL(10, 2)\n" + + ")\n" + + "USING CONNECTION `MYCAT`.`MYDB`.`MYSQL_PROD`\n" + + "WITH (\n" + + " 'connector' = 'jdbc',\n" + + " 'tables' = 'orders'\n" + + ")"; + sql(sql).ok(expected); + } + + @Test + void testCreateTableUsingConnectionWithPartitionAndDistribution() { + final String sql = + "CREATE TABLE tbl1 (\n" + + " a bigint,\n" + + " h varchar,\n" + + " b varchar\n" + + ")\n" + + "DISTRIBUTED BY HASH(a) INTO 3 BUCKETS\n" + + "PARTITIONED BY (a, h)\n" + + "USING CONNECTION cat1.db1.conn1\n" + + "WITH (\n" + + " 'connector' = 'jdbc'\n" + + ")"; + final String expected = + "CREATE TABLE `TBL1` (\n" + + " `A` BIGINT,\n" + + " `H` VARCHAR,\n" + + " `B` VARCHAR\n" + + ")\n" + + "DISTRIBUTED BY HASH(`A`) INTO 3 BUCKETS\n" + + "PARTITIONED BY (`A`, `H`)\n" + + "USING CONNECTION `CAT1`.`DB1`.`CONN1`\n" + + "WITH (\n" + + " 'connector' = 'jdbc'\n" + + ")"; + sql(sql).ok(expected); + } + + @Test + void testCreateTableAsWithUsingConnectionFails() { + final String sql = + "^CREATE^ TABLE t1\n" + + "USING CONNECTION cat1.db1.conn1\n" + + "WITH ('connector' = 'jdbc')\n" + + "AS SELECT 1 AS a"; + sql(sql).fails( + "(?s).*USING CONNECTION clause is not supported with CREATE TABLE AS SELECT or REPLACE TABLE AS SELECT statements.*"); + } + + @Test + void testCreateTableLikeUsingConnection() { + final String sql = + "CREATE TABLE t1 (\n" + + " a INT\n" + + ")\n" + + "USING CONNECTION cat1.db1.conn1\n" + + "WITH ('connector' = 'jdbc')\n" + + "LIKE base_table"; + final String expected = + "CREATE TABLE `T1` (\n" + + " `A` INTEGER\n" + + ")\n" + + "USING CONNECTION `CAT1`.`DB1`.`CONN1`\n" + + "WITH (\n" + + " 'connector' = 'jdbc'\n" + + ")\n" + + "LIKE `BASE_TABLE`"; + sql(sql).ok(expected); + } + String buildDistributionInput(final String distributionClause) { return "CREATE TABLE tbl1 (\n" + " a bigint,\n" From 2c188af68198d182c4abb0d0955d8a35802d0bc3 Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Fri, 22 May 2026 09:35:43 -0700 Subject: [PATCH 2/2] [FLINK-39726][table] Address PR review on USING CONNECTION unparse and tests Move the USING CONNECTION unparse logic to SqlUnparseUtils.unparseUsingConnection and emit USING / CONNECTION as separate SqlWriter.keyword calls. Add negative tests covering REPLACE TABLE AS and CREATE OR REPLACE TABLE AS, and tighten the failure-message regexes to match the exact resource string / line+column. --- .../flink/sql/parser/SqlUnparseUtils.java | 12 +++++++ .../sql/parser/ddl/table/SqlCreateTable.java | 6 +--- .../sql/parser/FlinkSqlParserImplTest.java | 31 ++++++++++++++++++- 3 files changed, 43 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java index 619f7a5e93218..c66e2e4f6e0b5 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java @@ -25,6 +25,7 @@ import org.apache.flink.sql.parser.ddl.materializedtable.SqlStartMode; import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlIntervalLiteral; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; @@ -100,6 +101,17 @@ public static void unparsePartitionKeyList( writer.endList(partitionedByFrame); } + public static void unparseUsingConnection( + SqlIdentifier connection, SqlWriter writer, int leftPrec, int rightPrec) { + if (connection == null) { + return; + } + writer.newlineAndIndent(); + writer.keyword("USING"); + writer.keyword("CONNECTION"); + connection.unparse(writer, leftPrec, rightPrec); + } + public static void unparseFreshness( SqlIntervalLiteral freshness, boolean withNewLine, diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTable.java index a8084acbf207b..1da49e5132403 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTable.java @@ -225,11 +225,7 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { SqlUnparseUtils.unparseComment(comment, true, writer, leftPrec, rightPrec); SqlUnparseUtils.unparseDistribution(distribution, writer, leftPrec, rightPrec); SqlUnparseUtils.unparsePartitionKeyList(partitionKeyList, writer, leftPrec, rightPrec); - if (connection != null) { - writer.newlineAndIndent(); - writer.keyword("USING CONNECTION"); - connection.unparse(writer, leftPrec, rightPrec); - } + SqlUnparseUtils.unparseUsingConnection(connection, writer, leftPrec, rightPrec); SqlUnparseUtils.unparseProperties(properties, writer, leftPrec, rightPrec); } } diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index 298eccbd17712..1d1e7d639fa00 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -1107,13 +1107,42 @@ void testCreateTableUsingConnectionWithPartitionAndDistribution() { @Test void testCreateTableAsWithUsingConnectionFails() { + // CTAS goes through SqlCreateTable with replace=false; the parser explicitly rejects + // USING CONNECTION + AS via ParserResource.usingConnectionWithAsUnsupported(). final String sql = "^CREATE^ TABLE t1\n" + "USING CONNECTION cat1.db1.conn1\n" + "WITH ('connector' = 'jdbc')\n" + "AS SELECT 1 AS a"; sql(sql).fails( - "(?s).*USING CONNECTION clause is not supported with CREATE TABLE AS SELECT or REPLACE TABLE AS SELECT statements.*"); + "(?s).*USING CONNECTION clause is not supported with " + + "CREATE TABLE AS SELECT or REPLACE TABLE AS SELECT statements\\..*"); + } + + @Test + void testReplaceTableAsWithUsingConnectionFails() { + // REPLACE TABLE always has an AS clause, so USING CONNECTION is never valid; + // it's not even accepted by the SqlReplaceTable production. + final String sql = + "REPLACE TABLE t1\n" + + "^USING^ CONNECTION cat1.db1.conn1\n" + + "WITH ('connector' = 'jdbc')\n" + + "AS SELECT 1 AS a"; + sql(sql).fails("(?s).*Encountered \"USING\" at line 2, column 1.\n.*"); + } + + @Test + void testCreateOrReplaceTableAsWithUsingConnectionFails() { + // CREATE OR REPLACE TABLE AS goes through SqlCreateTable with replace=true and hits + // the same usingConnectionWithAsUnsupported() error path as CTAS. + final String sql = + "^CREATE^ OR REPLACE TABLE t1\n" + + "USING CONNECTION cat1.db1.conn1\n" + + "WITH ('connector' = 'jdbc')\n" + + "AS SELECT 1 AS a"; + sql(sql).fails( + "(?s).*USING CONNECTION clause is not supported with " + + "CREATE TABLE AS SELECT or REPLACE TABLE AS SELECT statements\\..*"); } @Test