From d21d37b9c9b9ef75fed3a374629c3f2416554f10 Mon Sep 17 00:00:00 2001 From: Luca Castelli Date: Tue, 10 Mar 2026 14:11:43 -0400 Subject: [PATCH 1/6] Add testing compile endpoint (v2 clean implementation) Single compile endpoint that analyzes a pipeline in an isolated session: - Loads statements via configureSession (tables then views) - DESCRIBE for exact mock schemas with watermark info - EXPLAIN for window function and temporal join detection - Computes needed views between mock boundaries and target - Returns compile plan with mock specs, session statements, query SQL No custom parsers, no regex patterns, no type inference. Uses Flink's own catalog, parser, and planner APIs. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../table/gateway/api/SqlGatewayService.java | 17 + .../api/testing/TestCompileRequest.java | 66 +++ .../api/testing/TestCompileResponse.java | 81 +++ .../gateway/api/testing/TestMockSpec.java | 100 ++++ .../gateway/api/testing/TestSchemaColumn.java | 63 +++ .../table/gateway/api/testing/TestingApi.java | 28 + .../api/utils/MockedSqlGatewayService.java | 8 + .../gateway/rest/SqlGatewayRestEndpoint.java | 18 + .../handler/testing/CompileTestHandler.java | 113 ++++ .../testing/GetTestingInfoHandler.java | 57 ++ .../header/testing/CompileTestHeaders.java | 79 +++ .../header/testing/GetTestingInfoHeaders.java | 79 +++ .../testing/CompileTestRequestBody.java | 75 +++ .../testing/CompileTestResponseBody.java | 181 ++++++ .../testing/TestingInfoResponseBody.java | 43 ++ .../service/SqlGatewayServiceImpl.java | 20 + .../service/testing/TestPlanService.java | 535 ++++++++++++++++++ 17 files changed, 1563 insertions(+) create mode 100644 flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestCompileRequest.java create mode 100644 flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestCompileResponse.java create mode 100644 flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestMockSpec.java create mode 100644 flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestSchemaColumn.java create mode 100644 flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestingApi.java create mode 100644 flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/testing/CompileTestHandler.java create mode 100644 flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/testing/GetTestingInfoHandler.java create mode 100644 flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/testing/CompileTestHeaders.java create mode 100644 flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/testing/GetTestingInfoHeaders.java create mode 100644 flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/testing/CompileTestRequestBody.java create mode 100644 flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/testing/CompileTestResponseBody.java create mode 100644 flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/testing/TestingInfoResponseBody.java create mode 100644 flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java index bb69fdc658db4..3bb287f9cd764 100644 --- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java @@ -37,6 +37,8 @@ import org.apache.flink.table.gateway.api.results.TableInfo; import org.apache.flink.table.gateway.api.session.SessionEnvironment; import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.api.testing.TestCompileRequest; +import org.apache.flink.table.gateway.api.testing.TestCompileResponse; import org.apache.flink.table.gateway.api.utils.SqlGatewayException; import javax.annotation.Nullable; @@ -368,4 +370,19 @@ ClusterID deployScript( @Nullable String script, Configuration executionConfig) throws SqlGatewayException; + + // ------------------------------------------------------------------------- + // Testing API + // ------------------------------------------------------------------------- + + /** + * Compile a test plan for a pipeline target. + * + *

Loads the pipeline into an analysis session, resolves mock schemas via DESCRIBE, detects + * window functions and temporal joins via EXPLAIN, and returns the compiled plan. + * + * @param request the compile request + * @return the compiled test plan + */ + TestCompileResponse compileTestPlan(TestCompileRequest request) throws SqlGatewayException; } diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestCompileRequest.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestCompileRequest.java new file mode 100644 index 0000000000000..e71c34cba5a4f --- /dev/null +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestCompileRequest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.api.testing; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Request to compile a test plan for a pipeline target. + * + *

The CLI pre-splits the rendered pipeline SQL into individual statements and sends them in + * pipeline order (tables before views). The gateway loads them into an analysis session to resolve + * schemas and dependencies. + */ +public final class TestCompileRequest { + + private final List statements; + private final String target; + private final String mode; + private final List mockTargets; + + public TestCompileRequest( + List statements, String target, String mode, List mockTargets) { + this.statements = Collections.unmodifiableList(Objects.requireNonNull(statements)); + this.target = Objects.requireNonNull(target); + this.mode = Objects.requireNonNull(mode); + this.mockTargets = Collections.unmodifiableList(Objects.requireNonNull(mockTargets)); + } + + /** Pre-split pipeline SQL statements in dependency order. */ + public List getStatements() { + return statements; + } + + /** The top-level view or table to test. */ + public String getTarget() { + return target; + } + + /** Execution mode: {@code "batch"} or {@code "changelog"}. */ + public String getMode() { + return mode; + } + + /** Names of pipeline objects to be replaced with mock data. */ + public List getMockTargets() { + return mockTargets; + } +} diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestCompileResponse.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestCompileResponse.java new file mode 100644 index 0000000000000..0500d0ce27cc7 --- /dev/null +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestCompileResponse.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.api.testing; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Response from compiling a test plan. + * + *

Contains everything the CLI needs to set up an execution session: mock specifications with + * exact schemas, the ordered DDL statements to register between mocks and the target, and the query + * to execute. + */ +public final class TestCompileResponse { + + private final String contractVersion; + private final List sessionStatements; + private final String querySql; + private final List warnings; + private final List mocks; + + public TestCompileResponse( + String contractVersion, + List sessionStatements, + String querySql, + List warnings, + List mocks) { + this.contractVersion = Objects.requireNonNull(contractVersion); + this.sessionStatements = + Collections.unmodifiableList(Objects.requireNonNull(sessionStatements)); + this.querySql = Objects.requireNonNull(querySql); + this.warnings = Collections.unmodifiableList(Objects.requireNonNull(warnings)); + this.mocks = Collections.unmodifiableList(Objects.requireNonNull(mocks)); + } + + /** The testing API contract version. */ + public String getContractVersion() { + return contractVersion; + } + + /** + * Ordered DDL statements the CLI should register after creating mocks. Includes view + * definitions between mock boundaries and the target, plus any config/function statements. + */ + public List getSessionStatements() { + return sessionStatements; + } + + /** The final query to execute against the target. */ + public String getQuerySql() { + return querySql; + } + + /** Non-fatal warnings (e.g., temporal join detection). */ + public List getWarnings() { + return warnings; + } + + /** Mock specifications with exact resolved schemas and materialization strategies. */ + public List getMocks() { + return mocks; + } +} diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestMockSpec.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestMockSpec.java new file mode 100644 index 0000000000000..7038d46c9ab4b --- /dev/null +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestMockSpec.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.api.testing; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Specification for a required mock in a compiled test plan. + * + *

The mock name matches the original pipeline object name so it can be created as a direct + * replacement in the execution session. The schema contains the exact resolved column types from the + * Flink catalog. + */ +public final class TestMockSpec { + + /** Create mock as a VALUES-based temporary view. Suitable for batch and non-windowed cases. */ + public static final String MATERIALIZATION_VIEW = "view"; + + /** + * Create mock as a filesystem-backed temporary table with watermarks. Required for windowed + * streaming/changelog tests where watermark advancement is needed. + */ + public static final String MATERIALIZATION_FILESYSTEM = "filesystem"; + + private final String requestedName; + private final String sessionObject; + private final String materialization; + private final List schema; + @Nullable private final String watermarkColumn; + private final boolean includeSentinel; + + public TestMockSpec( + String requestedName, + String sessionObject, + String materialization, + List schema, + @Nullable String watermarkColumn, + boolean includeSentinel) { + this.requestedName = Objects.requireNonNull(requestedName, "requestedName"); + this.sessionObject = Objects.requireNonNull(sessionObject, "sessionObject"); + this.materialization = Objects.requireNonNull(materialization, "materialization"); + this.schema = Collections.unmodifiableList(Objects.requireNonNull(schema, "schema")); + this.watermarkColumn = watermarkColumn; + this.includeSentinel = includeSentinel; + } + + /** The canonical name of the mock target as specified in the test suite. */ + public String getRequestedName() { + return requestedName; + } + + /** + * The SQL identifier the toolkit should use when creating the mock object. Matches the original + * pipeline object name so downstream views can reference it. + */ + public String getSessionObject() { + return sessionObject; + } + + /** The materialization strategy: {@code "view"} or {@code "filesystem"}. */ + public String getMaterialization() { + return materialization; + } + + /** The exact resolved schema columns with SQL type strings. */ + public List getSchema() { + return schema; + } + + /** The watermark column name, if filesystem materialization requires watermarks. */ + @Nullable + public String getWatermarkColumn() { + return watermarkColumn; + } + + /** Whether a sentinel row should be appended to advance watermarks past all windows. */ + public boolean isIncludeSentinel() { + return includeSentinel; + } +} diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestSchemaColumn.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestSchemaColumn.java new file mode 100644 index 0000000000000..eec9845ef6d7c --- /dev/null +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestSchemaColumn.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.api.testing; + +import java.util.Objects; + +/** A column name and SQL type pair in a resolved mock schema. */ +public final class TestSchemaColumn { + + private final String name; + private final String type; + + public TestSchemaColumn(String name, String type) { + this.name = Objects.requireNonNull(name, "name"); + this.type = Objects.requireNonNull(type, "type"); + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TestSchemaColumn)) { + return false; + } + TestSchemaColumn that = (TestSchemaColumn) o; + return name.equals(that.name) && type.equals(that.type); + } + + @Override + public int hashCode() { + return Objects.hash(name, type); + } + + @Override + public String toString() { + return name + " " + type; + } +} diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestingApi.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestingApi.java new file mode 100644 index 0000000000000..0119c5a05ba3a --- /dev/null +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestingApi.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.api.testing; + +/** Constants for the testing API surface. */ +public final class TestingApi { + + /** Contract version advertised by the testing endpoints. */ + public static final String CONTRACT_VERSION = "v2"; + + private TestingApi() {} +} diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java index d7b4577962926..d095de931ebba 100644 --- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java +++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java @@ -36,6 +36,8 @@ import org.apache.flink.table.gateway.api.results.TableInfo; import org.apache.flink.table.gateway.api.session.SessionEnvironment; import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.api.testing.TestCompileRequest; +import org.apache.flink.table.gateway.api.testing.TestCompileResponse; import javax.annotation.Nullable; @@ -218,4 +220,10 @@ public ResolvedCatalogBaseTable getTable( throws SqlGatewayException { throw new UnsupportedOperationException(); } + + @Override + public TestCompileResponse compileTestPlan(TestCompileRequest request) + throws SqlGatewayException { + throw new UnsupportedOperationException(); + } } diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java index e48df09a64d9e..e169d61fd85dc 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java @@ -42,6 +42,8 @@ import org.apache.flink.table.gateway.rest.handler.statement.CompleteStatementHandler; import org.apache.flink.table.gateway.rest.handler.statement.ExecuteStatementHandler; import org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler; +import org.apache.flink.table.gateway.rest.handler.testing.CompileTestHandler; +import org.apache.flink.table.gateway.rest.handler.testing.GetTestingInfoHandler; import org.apache.flink.table.gateway.rest.handler.util.GetApiVersionHandler; import org.apache.flink.table.gateway.rest.handler.util.GetInfoHandler; import org.apache.flink.table.gateway.rest.header.application.DeployScriptHeaders; @@ -61,6 +63,8 @@ import org.apache.flink.table.gateway.rest.header.statement.CompleteStatementHeaders; import org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders; import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders; +import org.apache.flink.table.gateway.rest.header.testing.CompileTestHeaders; +import org.apache.flink.table.gateway.rest.header.testing.GetTestingInfoHeaders; import org.apache.flink.table.gateway.rest.header.util.GetApiVersionHeaders; import org.apache.flink.table.gateway.rest.header.util.GetInfoHeaders; import org.apache.flink.table.gateway.workflow.scheduler.EmbeddedQuartzScheduler; @@ -102,6 +106,7 @@ protected List> initiali addEmbeddedSchedulerRelatedHandlers(handlers); addMaterializedTableRelatedHandlers(handlers); addDeployScriptRelatedHandlers(handlers); + addTestingRelatedHandlers(handlers); return handlers; } @@ -268,6 +273,19 @@ private void addDeployScriptRelatedHandlers( handlers.add(Tuple2.of(DeployScriptHeaders.getInstance(), handler)); } + private void addTestingRelatedHandlers( + List> handlers) { + GetTestingInfoHandler testingInfoHandler = + new GetTestingInfoHandler( + service, responseHeaders, GetTestingInfoHeaders.getInstance()); + handlers.add(Tuple2.of(GetTestingInfoHeaders.getInstance(), testingInfoHandler)); + + CompileTestHandler compileTestHandler = + new CompileTestHandler( + service, responseHeaders, CompileTestHeaders.getInstance()); + handlers.add(Tuple2.of(CompileTestHeaders.getInstance(), compileTestHandler)); + } + @Override protected void startInternal() { quartzScheduler.start(); diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/testing/CompileTestHandler.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/testing/CompileTestHandler.java new file mode 100644 index 0000000000000..73c285b206e07 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/testing/CompileTestHandler.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.handler.testing; + +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.table.gateway.api.SqlGatewayService; +import org.apache.flink.table.gateway.api.testing.TestCompileRequest; +import org.apache.flink.table.gateway.api.testing.TestCompileResponse; +import org.apache.flink.table.gateway.api.testing.TestMockSpec; +import org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler; +import org.apache.flink.table.gateway.rest.message.testing.CompileTestRequestBody; +import org.apache.flink.table.gateway.rest.message.testing.CompileTestResponseBody; +import org.apache.flink.table.gateway.rest.message.testing.CompileTestResponseBody.MockSpecBody; +import org.apache.flink.table.gateway.rest.message.testing.CompileTestResponseBody.SchemaColumnBody; +import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** Handler for {@code POST /v1/testing/compile}. */ +public class CompileTestHandler + extends AbstractSqlGatewayRestHandler< + CompileTestRequestBody, CompileTestResponseBody, EmptyMessageParameters> { + + public CompileTestHandler( + SqlGatewayService service, + Map responseHeaders, + MessageHeaders< + CompileTestRequestBody, + CompileTestResponseBody, + EmptyMessageParameters> + messageHeaders) { + super(service, responseHeaders, messageHeaders); + } + + @Override + protected CompletableFuture handleRequest( + @Nullable SqlGatewayRestAPIVersion version, + @Nonnull HandlerRequest request) + throws RestHandlerException { + + CompileTestRequestBody body = request.getRequestBody(); + TestCompileRequest apiRequest = + new TestCompileRequest( + body.getStatements() != null + ? body.getStatements() + : Collections.emptyList(), + body.getTarget(), + body.getMode(), + body.getMockTargets() != null + ? body.getMockTargets() + : Collections.emptyList()); + + try { + TestCompileResponse apiResponse = service.compileTestPlan(apiRequest); + return CompletableFuture.completedFuture(toResponseBody(apiResponse)); + } catch (IllegalArgumentException e) { + throw new RestHandlerException(e.getMessage(), HttpResponseStatus.BAD_REQUEST, e); + } catch (Exception e) { + throw new RestHandlerException( + e.getMessage(), HttpResponseStatus.INTERNAL_SERVER_ERROR, e); + } + } + + private static CompileTestResponseBody toResponseBody(TestCompileResponse response) { + return new CompileTestResponseBody( + response.getContractVersion(), + response.getSessionStatements(), + response.getQuerySql(), + response.getWarnings(), + response.getMocks().stream() + .map(CompileTestHandler::toMockSpecBody) + .collect(Collectors.toList())); + } + + private static MockSpecBody toMockSpecBody(TestMockSpec spec) { + return new MockSpecBody( + spec.getRequestedName(), + spec.getSessionObject(), + spec.getMaterialization(), + spec.getSchema().stream() + .map(col -> new SchemaColumnBody(col.getName(), col.getType())) + .collect(Collectors.toList()), + spec.getWatermarkColumn(), + spec.isIncludeSentinel() ? true : null); + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/testing/GetTestingInfoHandler.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/testing/GetTestingInfoHandler.java new file mode 100644 index 0000000000000..c5dd0d262d0e9 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/testing/GetTestingInfoHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.handler.testing; + +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.table.gateway.api.SqlGatewayService; +import org.apache.flink.table.gateway.api.testing.TestingApi; +import org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler; +import org.apache.flink.table.gateway.rest.message.testing.TestingInfoResponseBody; +import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** Handler for {@code GET /v1/testing/info}. */ +public class GetTestingInfoHandler + extends AbstractSqlGatewayRestHandler< + EmptyRequestBody, TestingInfoResponseBody, EmptyMessageParameters> { + + public GetTestingInfoHandler( + SqlGatewayService service, + Map responseHeaders, + MessageHeaders + messageHeaders) { + super(service, responseHeaders, messageHeaders); + } + + @Override + protected CompletableFuture handleRequest( + @Nullable SqlGatewayRestAPIVersion version, + @Nonnull HandlerRequest request) { + return CompletableFuture.completedFuture( + new TestingInfoResponseBody(TestingApi.CONTRACT_VERSION)); + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/testing/CompileTestHeaders.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/testing/CompileTestHeaders.java new file mode 100644 index 0000000000000..71c527710c045 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/testing/CompileTestHeaders.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.header.testing; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.table.gateway.rest.header.SqlGatewayMessageHeaders; +import org.apache.flink.table.gateway.rest.message.testing.CompileTestRequestBody; +import org.apache.flink.table.gateway.rest.message.testing.CompileTestResponseBody; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** Message headers for {@code POST /v1/testing/compile}. */ +public class CompileTestHeaders + implements SqlGatewayMessageHeaders< + CompileTestRequestBody, CompileTestResponseBody, EmptyMessageParameters> { + + private static final CompileTestHeaders INSTANCE = new CompileTestHeaders(); + + @Override + public Class getResponseClass() { + return CompileTestResponseBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public String getDescription() { + return "Compile a test plan for a pipeline target."; + } + + @Override + public Class getRequestClass() { + return CompileTestRequestBody.class; + } + + @Override + public EmptyMessageParameters getUnresolvedMessageParameters() { + return EmptyMessageParameters.getInstance(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.POST; + } + + @Override + public String getTargetRestEndpointURL() { + return "/testing/compile"; + } + + public static CompileTestHeaders getInstance() { + return INSTANCE; + } + + @Override + public String operationId() { + return "compileTestPlan"; + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/testing/GetTestingInfoHeaders.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/testing/GetTestingInfoHeaders.java new file mode 100644 index 0000000000000..f5f7868fe186f --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/testing/GetTestingInfoHeaders.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.header.testing; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.table.gateway.rest.header.SqlGatewayMessageHeaders; +import org.apache.flink.table.gateway.rest.message.testing.TestingInfoResponseBody; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** Message headers for {@code GET /v1/testing/info}. */ +public class GetTestingInfoHeaders + implements SqlGatewayMessageHeaders< + EmptyRequestBody, TestingInfoResponseBody, EmptyMessageParameters> { + + private static final GetTestingInfoHeaders INSTANCE = new GetTestingInfoHeaders(); + + @Override + public Class getResponseClass() { + return TestingInfoResponseBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public String getDescription() { + return "Get testing API contract version."; + } + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public EmptyMessageParameters getUnresolvedMessageParameters() { + return EmptyMessageParameters.getInstance(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return "/testing/info"; + } + + public static GetTestingInfoHeaders getInstance() { + return INSTANCE; + } + + @Override + public String operationId() { + return "getTestingInfo"; + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/testing/CompileTestRequestBody.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/testing/CompileTestRequestBody.java new file mode 100644 index 0000000000000..326b10bd80b1c --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/testing/CompileTestRequestBody.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.message.testing; + +import org.apache.flink.runtime.rest.messages.RequestBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +/** Request body for {@code POST /v1/testing/compile}. */ +public class CompileTestRequestBody implements RequestBody { + + private static final String FIELD_STATEMENTS = "statements"; + private static final String FIELD_TARGET = "target"; + private static final String FIELD_MODE = "mode"; + private static final String FIELD_MOCK_TARGETS = "mock_targets"; + + @JsonProperty(FIELD_STATEMENTS) + private final List statements; + + @JsonProperty(FIELD_TARGET) + private final String target; + + @JsonProperty(FIELD_MODE) + private final String mode; + + @JsonProperty(FIELD_MOCK_TARGETS) + private final List mockTargets; + + @JsonCreator + public CompileTestRequestBody( + @JsonProperty(FIELD_STATEMENTS) List statements, + @JsonProperty(FIELD_TARGET) String target, + @JsonProperty(FIELD_MODE) String mode, + @JsonProperty(FIELD_MOCK_TARGETS) List mockTargets) { + this.statements = statements; + this.target = target; + this.mode = mode; + this.mockTargets = mockTargets; + } + + public List getStatements() { + return statements; + } + + public String getTarget() { + return target; + } + + public String getMode() { + return mode; + } + + public List getMockTargets() { + return mockTargets; + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/testing/CompileTestResponseBody.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/testing/CompileTestResponseBody.java new file mode 100644 index 0000000000000..720eae9d03ad6 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/testing/CompileTestResponseBody.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.message.testing; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.util.List; + +/** Response body for {@code POST /v1/testing/compile}. */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class CompileTestResponseBody implements ResponseBody { + + private static final String FIELD_CONTRACT_VERSION = "contract_version"; + private static final String FIELD_SESSION_STATEMENTS = "session_statements"; + private static final String FIELD_QUERY_SQL = "query_sql"; + private static final String FIELD_WARNINGS = "warnings"; + private static final String FIELD_MOCKS = "mocks"; + + @JsonProperty(FIELD_CONTRACT_VERSION) + private final String contractVersion; + + @JsonProperty(FIELD_SESSION_STATEMENTS) + private final List sessionStatements; + + @JsonProperty(FIELD_QUERY_SQL) + private final String querySql; + + @JsonProperty(FIELD_WARNINGS) + private final List warnings; + + @JsonProperty(FIELD_MOCKS) + private final List mocks; + + @JsonCreator + public CompileTestResponseBody( + @JsonProperty(FIELD_CONTRACT_VERSION) String contractVersion, + @JsonProperty(FIELD_SESSION_STATEMENTS) List sessionStatements, + @JsonProperty(FIELD_QUERY_SQL) String querySql, + @JsonProperty(FIELD_WARNINGS) List warnings, + @JsonProperty(FIELD_MOCKS) List mocks) { + this.contractVersion = contractVersion; + this.sessionStatements = sessionStatements; + this.querySql = querySql; + this.warnings = warnings; + this.mocks = mocks; + } + + public String getContractVersion() { + return contractVersion; + } + + public List getSessionStatements() { + return sessionStatements; + } + + public String getQuerySql() { + return querySql; + } + + public List getWarnings() { + return warnings; + } + + public List getMocks() { + return mocks; + } + + /** JSON representation of a mock specification. */ + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class MockSpecBody { + + @JsonProperty("requested_name") + private final String requestedName; + + @JsonProperty("session_object") + private final String sessionObject; + + @JsonProperty("materialization") + private final String materialization; + + @JsonProperty("schema") + private final List schema; + + @JsonProperty("watermark_column") + @Nullable + private final String watermarkColumn; + + @JsonProperty("include_sentinel") + @Nullable + private final Boolean includeSentinel; + + @JsonCreator + public MockSpecBody( + @JsonProperty("requested_name") String requestedName, + @JsonProperty("session_object") String sessionObject, + @JsonProperty("materialization") String materialization, + @JsonProperty("schema") List schema, + @Nullable @JsonProperty("watermark_column") String watermarkColumn, + @Nullable @JsonProperty("include_sentinel") Boolean includeSentinel) { + this.requestedName = requestedName; + this.sessionObject = sessionObject; + this.materialization = materialization; + this.schema = schema; + this.watermarkColumn = watermarkColumn; + this.includeSentinel = includeSentinel; + } + + public String getRequestedName() { + return requestedName; + } + + public String getSessionObject() { + return sessionObject; + } + + public String getMaterialization() { + return materialization; + } + + public List getSchema() { + return schema; + } + + @Nullable + public String getWatermarkColumn() { + return watermarkColumn; + } + + @Nullable + public Boolean getIncludeSentinel() { + return includeSentinel; + } + } + + /** JSON representation of a schema column. */ + public static class SchemaColumnBody { + + @JsonProperty("name") + private final String name; + + @JsonProperty("type") + private final String type; + + @JsonCreator + public SchemaColumnBody( + @JsonProperty("name") String name, @JsonProperty("type") String type) { + this.name = name; + this.type = type; + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/testing/TestingInfoResponseBody.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/testing/TestingInfoResponseBody.java new file mode 100644 index 0000000000000..f0bde54f27d38 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/testing/TestingInfoResponseBody.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.message.testing; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +/** Response body for {@code GET /v1/testing/info}. */ +public class TestingInfoResponseBody implements ResponseBody { + + private static final String FIELD_CONTRACT_VERSION = "contract_version"; + + @JsonProperty(FIELD_CONTRACT_VERSION) + private final String contractVersion; + + @JsonCreator + public TestingInfoResponseBody( + @JsonProperty(FIELD_CONTRACT_VERSION) String contractVersion) { + this.contractVersion = contractVersion; + } + + public String getContractVersion() { + return contractVersion; + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java index 371d8aa899343..dc3d9a9c26bf9 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java @@ -41,10 +41,13 @@ import org.apache.flink.table.gateway.api.results.TableInfo; import org.apache.flink.table.gateway.api.session.SessionEnvironment; import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.api.testing.TestCompileRequest; +import org.apache.flink.table.gateway.api.testing.TestCompileResponse; import org.apache.flink.table.gateway.api.utils.SqlGatewayException; import org.apache.flink.table.gateway.service.operation.OperationManager; import org.apache.flink.table.gateway.service.session.Session; import org.apache.flink.table.gateway.service.session.SessionManager; +import org.apache.flink.table.gateway.service.testing.TestPlanService; import org.apache.flink.table.runtime.application.SqlDriver; import org.apache.flink.util.StringUtils; @@ -445,6 +448,23 @@ public List completeStatement( } } + // -------------------------------------------------------------------------------------------- + // Testing API + // -------------------------------------------------------------------------------------------- + + @Override + public TestCompileResponse compileTestPlan(TestCompileRequest request) + throws SqlGatewayException { + try { + return new TestPlanService(sessionManager).compile(request); + } catch (IllegalArgumentException e) { + throw e; + } catch (Throwable t) { + LOG.error("Failed to compile test plan.", t); + throw new SqlGatewayException("Failed to compile test plan.", t); + } + } + // -------------------------------------------------------------------------------------------- @VisibleForTesting diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java new file mode 100644 index 0000000000000..a57bb93a78cf2 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.testing; + +import org.apache.flink.table.gateway.api.endpoint.EndpointVersion; +import org.apache.flink.table.gateway.api.results.ResultSet; +import org.apache.flink.table.gateway.api.session.SessionEnvironment; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.api.testing.TestCompileRequest; +import org.apache.flink.table.gateway.api.testing.TestCompileResponse; +import org.apache.flink.table.gateway.api.testing.TestMockSpec; +import org.apache.flink.table.gateway.api.testing.TestSchemaColumn; +import org.apache.flink.table.gateway.api.testing.TestingApi; +import org.apache.flink.table.gateway.service.operation.OperationManager; +import org.apache.flink.table.gateway.service.session.Session; +import org.apache.flink.table.gateway.service.session.SessionManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Compiles a test plan by analyzing a pipeline in an isolated session. + * + *

The service loads all pipeline statements into an analysis session, uses {@code DESCRIBE} to + * resolve mock schemas, {@code EXPLAIN} to detect window functions and temporal joins, and computes + * the minimal set of views needed between mock boundaries and the test target. + * + *

The compiled plan is returned to the CLI, which creates mock objects and registers views in a + * separate execution session using the standard SQL Gateway v3 API. + */ +public class TestPlanService { + + private static final Logger LOG = LoggerFactory.getLogger(TestPlanService.class); + + private static final String MODE_BATCH = "batch"; + private static final String MODE_CHANGELOG = "changelog"; + + private final SessionManager sessionManager; + + public TestPlanService(SessionManager sessionManager) { + this.sessionManager = sessionManager; + } + + /** Compile a test plan for the given request. */ + public TestCompileResponse compile(TestCompileRequest request) { + validateRequest(request); + String mode = normalizeMode(request.getMode()); + Set mockTargetSet = + request.getMockTargets().stream() + .map(TestPlanService::normalizeId) + .collect(Collectors.toCollection(LinkedHashSet::new)); + + // Open an isolated analysis session. + Session session = + sessionManager.openSession( + SessionEnvironment.newBuilder() + .setSessionName("test-compile-analysis") + .setSessionEndpointVersion(new EndpointVersion() {}) + .build()); + SessionHandle sessionHandle = session.getSessionHandle(); + try { + // Phase 1: Load all pipeline statements in order. + PipelineInfo pipeline = loadPipeline(session, request.getStatements()); + + // Phase 2: Validate the target exists. + String normalizedTarget = normalizeId(request.getTarget()); + if (!pipeline.allObjects.containsKey(normalizedTarget)) { + throw new IllegalArgumentException( + "Target '" + + request.getTarget() + + "' not found in pipeline. Available: " + + String.join(", ", pipeline.allObjects.keySet())); + } + + // Phase 3: DESCRIBE each mock target to get exact schemas. + Map resolvedMocks = new LinkedHashMap<>(); + for (String mockName : mockTargetSet) { + if (!pipeline.allObjects.containsKey(mockName)) { + throw new IllegalArgumentException( + "Mock target '" + + mockName + + "' not found in pipeline. Available: " + + String.join(", ", pipeline.allObjects.keySet())); + } + resolvedMocks.put(mockName, describeMock(session, mockName)); + } + + // Phase 4: EXPLAIN the target to detect windows and temporal joins. + String explainPlan = explain(session, normalizedTarget); + boolean isWindowed = containsWindowFunction(explainPlan); + boolean hasTemporal = containsTemporalJoin(explainPlan); + + // Phase 5: Determine needed views (between mocks and target). + List neededStatements = + computeNeededStatements(normalizedTarget, mockTargetSet, pipeline); + + // Phase 6: Build mock specs. + List mocks = new ArrayList<>(); + for (Map.Entry entry : resolvedMocks.entrySet()) { + String mockName = entry.getKey(); + MockInfo info = entry.getValue(); + String materialization; + String watermarkColumn = null; + boolean includeSentinel = false; + + if (MODE_CHANGELOG.equals(mode) && isWindowed && info.watermarkColumn != null) { + materialization = TestMockSpec.MATERIALIZATION_FILESYSTEM; + watermarkColumn = info.watermarkColumn; + includeSentinel = true; + } else { + materialization = TestMockSpec.MATERIALIZATION_VIEW; + } + + mocks.add( + new TestMockSpec( + mockName, + mockName, // session object = original name + materialization, + info.columns, + watermarkColumn, + includeSentinel)); + } + + // Phase 7: Build warnings. + List warnings = new ArrayList<>(); + if (hasTemporal) { + warnings.add( + "Temporal joins detected in the target's execution plan. " + + "Temporal join semantics are unsupported/unstable in v1 test " + + "mocking; prefer an integration test or mock downstream of " + + "the temporal join."); + } + + // Build response. + return new TestCompileResponse( + TestingApi.CONTRACT_VERSION, + neededStatements, + "SELECT * FROM " + quoteId(normalizedTarget), + warnings, + mocks); + } finally { + try { + sessionManager.closeSession(sessionHandle); + } catch (Exception e) { + LOG.warn("Failed to close analysis session.", e); + } + } + } + + // ------------------------------------------------------------------------- + // Pipeline loading + // ------------------------------------------------------------------------- + + /** + * Loads all pipeline statements into the session via configureSession. Records which objects are + * tables vs views and stores the original DDL for each. + */ + private PipelineInfo loadPipeline(Session session, List statements) { + Map allObjects = new LinkedHashMap<>(); // name → original DDL + Map tables = new LinkedHashMap<>(); + Map views = new LinkedHashMap<>(); + List configStatements = new ArrayList<>(); + + for (String sql : statements) { + String trimmed = sql.trim(); + if (trimmed.isEmpty()) { + continue; + } + + // Execute the statement to load it into the catalog. + configureSession(session, trimmed); + + // Classify based on SQL text (simple prefix matching after execution). + String upper = trimmed.toUpperCase(); + if (upper.startsWith("CREATE") && upper.contains("TABLE")) { + String name = extractObjectName(trimmed); + if (name != null) { + tables.put(normalizeId(name), trimmed); + allObjects.put(normalizeId(name), trimmed); + } + } else if (upper.startsWith("CREATE") && upper.contains("VIEW")) { + String name = extractObjectName(trimmed); + if (name != null) { + views.put(normalizeId(name), trimmed); + allObjects.put(normalizeId(name), trimmed); + } + } else { + // SET, CREATE FUNCTION, USE, etc. + configStatements.add(trimmed); + } + } + + return new PipelineInfo(allObjects, tables, views, configStatements); + } + + // ------------------------------------------------------------------------- + // Schema resolution via DESCRIBE + // ------------------------------------------------------------------------- + + /** DESCRIBE a pipeline object to get its exact schema and watermark info. */ + private MockInfo describeMock(Session session, String objectName) { + List columns = new ArrayList<>(); + String watermarkColumn = null; + + ResultSet result = executeAndFetch(session, "DESCRIBE " + quoteId(objectName)); + for (org.apache.flink.table.data.RowData row : result.getData()) { + String colName = row.getString(0).toString(); + String colType = row.getString(1).toString(); + String watermark = row.isNullAt(5) ? null : row.getString(5).toString(); + + columns.add(new TestSchemaColumn(colName, colType)); + if (watermark != null && !watermark.isEmpty()) { + watermarkColumn = colName; + } + } + + return new MockInfo(columns, watermarkColumn); + } + + // ------------------------------------------------------------------------- + // Window and temporal join detection via EXPLAIN + // ------------------------------------------------------------------------- + + /** EXPLAIN the target query and return the plan text. */ + private String explain(Session session, String target) { + ResultSet result = + executeAndFetch(session, "EXPLAIN SELECT * FROM " + quoteId(target)); + if (result.getData().isEmpty()) { + return ""; + } + return result.getData().get(0).getString(0).toString(); + } + + /** Check if the EXPLAIN plan contains window table function calls. */ + private static boolean containsWindowFunction(String plan) { + String upper = plan.toUpperCase(); + return upper.contains("TUMBLE(") || upper.contains("HOP(") || upper.contains("CUMULATE(") + || upper.contains("SESSION("); + } + + /** Check if the EXPLAIN plan contains temporal join patterns. */ + private static boolean containsTemporalJoin(String plan) { + return plan.toUpperCase().contains("FOR SYSTEM_TIME AS OF") + || plan.contains("TemporalJoin") + || plan.contains("LogicalSnapshot"); + } + + // ------------------------------------------------------------------------- + // Needed statement computation + // ------------------------------------------------------------------------- + + /** + * Computes the ordered list of DDL statements the CLI needs to register after creating mocks. + * This includes config statements and view DDL for views between mock boundaries and the target. + */ + private List computeNeededStatements( + String target, Set mockTargets, PipelineInfo pipeline) { + // Collect views needed between mocks and target. + Set neededViews = new LinkedHashSet<>(); + collectNeededViews(target, mockTargets, pipeline, neededViews, new LinkedHashSet<>()); + + // Build ordered list: config statements first, then needed view DDL in pipeline order. + List result = new ArrayList<>(pipeline.configStatements); + for (Map.Entry entry : pipeline.views.entrySet()) { + if (neededViews.contains(entry.getKey())) { + result.add(entry.getValue()); + } + } + return result; + } + + /** + * Recursively collects views that are needed between mock boundaries and the target. Stops at + * mock boundaries and at tables (which are either mocked or real sources). + */ + private void collectNeededViews( + String viewName, + Set mockTargets, + PipelineInfo pipeline, + Set neededViews, + Set visited) { + if (visited.contains(viewName) || mockTargets.contains(viewName)) { + return; + } + visited.add(viewName); + + if (!pipeline.views.containsKey(viewName)) { + return; // It's a table or unknown — not a view to register. + } + + neededViews.add(viewName); + + // Find what this view references by checking which known objects appear in its DDL. + String ddl = pipeline.views.get(viewName); + for (String candidate : pipeline.allObjects.keySet()) { + if (candidate.equals(viewName)) { + continue; + } + if (sqlReferencesObject(ddl, candidate)) { + collectNeededViews(candidate, mockTargets, pipeline, neededViews, visited); + } + } + } + + /** + * Checks if a SQL statement references an object name using word-boundary matching. Skips + * matches inside single-quoted string literals. + */ + static boolean sqlReferencesObject(String sql, String objectName) { + String lower = sql.toLowerCase(); + String target = objectName.toLowerCase(); + int idx = 0; + while ((idx = lower.indexOf(target, idx)) >= 0) { + int end = idx + target.length(); + boolean leftOk = + idx == 0 + || (!Character.isLetterOrDigit(lower.charAt(idx - 1)) + && lower.charAt(idx - 1) != '_'); + boolean rightOk = + end >= lower.length() + || (!Character.isLetterOrDigit(lower.charAt(end)) + && lower.charAt(end) != '_'); + if (leftOk && rightOk && !insideStringLiteral(lower, idx)) { + return true; + } + idx = end; + } + return false; + } + + /** Check if position is inside a single-quoted string literal. */ + private static boolean insideStringLiteral(String sql, int position) { + int quotes = 0; + for (int i = 0; i < position; i++) { + if (sql.charAt(i) == '\'' && (i == 0 || sql.charAt(i - 1) != '\\')) { + quotes++; + } + } + return quotes % 2 != 0; + } + + // ------------------------------------------------------------------------- + // Session helpers + // ------------------------------------------------------------------------- + + /** Execute a statement via configureSession (synchronous). */ + private void configureSession(Session session, String statement) { + try { + OperationManager opManager = session.getOperationManager(); + org.apache.flink.table.gateway.api.operation.OperationHandle handle = + opManager.submitOperation( + h -> session.createExecutor().configureSession(h, statement)); + opManager.awaitOperationTermination(handle); + opManager.closeOperation(handle); + } catch (Exception e) { + throw new RuntimeException("Failed to configure session: " + statement, e); + } + } + + /** Execute a statement and fetch all results synchronously. */ + private ResultSet executeAndFetch(Session session, String statement) { + try { + OperationManager opManager = session.getOperationManager(); + org.apache.flink.table.gateway.api.operation.OperationHandle handle = + opManager.submitOperation( + h -> session.createExecutor().executeStatement(h, statement)); + opManager.awaitOperationTermination(handle); + ResultSet result = opManager.fetchResults(handle, 0L, Integer.MAX_VALUE); + opManager.closeOperation(handle); + return result; + } catch (Exception e) { + throw new RuntimeException("Failed to execute: " + statement, e); + } + } + + // ------------------------------------------------------------------------- + // Validation and normalization + // ------------------------------------------------------------------------- + + private static void validateRequest(TestCompileRequest request) { + if (request.getStatements() == null || request.getStatements().isEmpty()) { + throw new IllegalArgumentException("statements must not be empty"); + } + if (request.getTarget() == null || request.getTarget().trim().isEmpty()) { + throw new IllegalArgumentException("target must not be empty"); + } + if (request.getMode() == null || request.getMode().trim().isEmpty()) { + throw new IllegalArgumentException("mode must not be empty"); + } + } + + private static String normalizeMode(String mode) { + String lower = mode.trim().toLowerCase(); + if (MODE_BATCH.equals(lower) || MODE_CHANGELOG.equals(lower)) { + return lower; + } + throw new IllegalArgumentException( + "Invalid mode '" + mode + "'. Must be 'batch' or 'changelog'."); + } + + /** Normalize an identifier to lowercase, removing backtick/double-quote wrapping. */ + static String normalizeId(String name) { + if (name == null) { + return ""; + } + String trimmed = name.trim(); + if ((trimmed.startsWith("`") && trimmed.endsWith("`")) + || (trimmed.startsWith("\"") && trimmed.endsWith("\""))) { + trimmed = trimmed.substring(1, trimmed.length() - 1); + } + return trimmed.toLowerCase(); + } + + private static String quoteId(String name) { + return "`" + name.replace("`", "``") + "`"; + } + + /** + * Extract the object name from a CREATE TABLE/VIEW statement. Handles backtick-quoted and + * plain identifiers, including fully-qualified names (extracts the last part). + */ + static String extractObjectName(String ddl) { + String upper = ddl.toUpperCase().trim(); + // Find position after TABLE or VIEW keyword. + int pos = -1; + for (String keyword : new String[] {"TABLE", "VIEW"}) { + int idx = upper.indexOf(keyword); + if (idx >= 0) { + pos = idx + keyword.length(); + break; + } + } + if (pos < 0) { + return null; + } + + // Skip whitespace and optional IF NOT EXISTS / IF EXISTS. + String rest = ddl.substring(pos).trim(); + if (rest.toUpperCase().startsWith("IF NOT EXISTS")) { + rest = rest.substring("IF NOT EXISTS".length()).trim(); + } else if (rest.toUpperCase().startsWith("IF EXISTS")) { + rest = rest.substring("IF EXISTS".length()).trim(); + } + + // Extract the identifier (possibly dot-separated, possibly quoted). + StringBuilder name = new StringBuilder(); + int i = 0; + while (i < rest.length()) { + char c = rest.charAt(i); + if (c == '`') { + // Backtick-quoted identifier. + i++; + while (i < rest.length() && rest.charAt(i) != '`') { + name.append(rest.charAt(i)); + i++; + } + i++; // skip closing backtick + } else if (c == '"') { + i++; + while (i < rest.length() && rest.charAt(i) != '"') { + name.append(rest.charAt(i)); + i++; + } + i++; + } else if (c == '.') { + // Dot separator in qualified name — reset to take the last part. + name.setLength(0); + i++; + } else if (Character.isLetterOrDigit(c) || c == '_' || c == '-') { + name.append(c); + i++; + } else { + break; // Hit space, paren, or other delimiter. + } + } + return name.length() > 0 ? name.toString() : null; + } + + // ------------------------------------------------------------------------- + // Internal data classes + // ------------------------------------------------------------------------- + + private static class PipelineInfo { + final Map allObjects; // normalized name → DDL + final Map tables; + final Map views; + final List configStatements; + + PipelineInfo( + Map allObjects, + Map tables, + Map views, + List configStatements) { + this.allObjects = allObjects; + this.tables = tables; + this.views = views; + this.configStatements = configStatements; + } + } + + private static class MockInfo { + final List columns; + final String watermarkColumn; + + MockInfo(List columns, String watermarkColumn) { + this.columns = columns; + this.watermarkColumn = watermarkColumn; + } + } +} From 90a77acf99900e0088d565f358225e92ad48323f Mon Sep 17 00:00:00 2001 From: Luca Castelli Date: Tue, 10 Mar 2026 14:23:47 -0400 Subject: [PATCH 2/6] Clean type strings from DESCRIBE output Strip *ROWTIME*, NOT NULL, and METADATA FROM annotations from type strings so they're usable in CAST expressions. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../service/testing/TestPlanService.java | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java index a57bb93a78cf2..9d60900fc33d3 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java @@ -230,7 +230,7 @@ private MockInfo describeMock(Session session, String objectName) { ResultSet result = executeAndFetch(session, "DESCRIBE " + quoteId(objectName)); for (org.apache.flink.table.data.RowData row : result.getData()) { String colName = row.getString(0).toString(); - String colType = row.getString(1).toString(); + String colType = cleanType(row.getString(1).toString()); String watermark = row.isNullAt(5) ? null : row.getString(5).toString(); columns.add(new TestSchemaColumn(colName, colType)); @@ -440,6 +440,27 @@ private static String quoteId(String name) { return "`" + name.replace("`", "``") + "`"; } + /** + * Clean a Flink type string from DESCRIBE output for use in CAST expressions. Removes + * annotations like {@code *ROWTIME*}, {@code NOT NULL}, and {@code METADATA FROM}. + */ + static String cleanType(String type) { + String cleaned = type; + // Remove *ROWTIME* watermark marker. + cleaned = cleaned.replace("*ROWTIME*", "").trim(); + // Remove METADATA FROM clause. + int metaIdx = cleaned.toUpperCase().indexOf(" METADATA"); + if (metaIdx > 0) { + cleaned = cleaned.substring(0, metaIdx).trim(); + } + // Remove NOT NULL constraint (mocks allow nulls). + int notNullIdx = cleaned.toUpperCase().indexOf(" NOT NULL"); + if (notNullIdx > 0) { + cleaned = cleaned.substring(0, notNullIdx).trim(); + } + return cleaned; + } + /** * Extract the object name from a CREATE TABLE/VIEW statement. Handles backtick-quoted and * plain identifiers, including fully-qualified names (extracts the last part). From 3895f34108f00ab5d2c49eab0baaa8ff449193af Mon Sep 17 00:00:00 2001 From: Luca Castelli Date: Tue, 10 Mar 2026 15:07:10 -0400 Subject: [PATCH 3/6] Fix object classification and name extraction - Use SHOW TABLES / SHOW VIEWS catalog introspection for classification instead of fragile SQL text prefix matching - Fix extractObjectName to search for full CREATE patterns (CREATE TEMPORARY VIEW, CREATE TEMPORARY TABLE, etc.) to avoid false matches on TABLE keyword in query body - Clean type strings (remove *ROWTIME*, NOT NULL, METADATA) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../service/testing/TestPlanService.java | 84 ++++++++++++------- 1 file changed, 55 insertions(+), 29 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java index 9d60900fc33d3..ea8f8f8e5557e 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java @@ -129,11 +129,11 @@ public TestCompileResponse compile(TestCompileRequest request) { boolean includeSentinel = false; if (MODE_CHANGELOG.equals(mode) && isWindowed && info.watermarkColumn != null) { - materialization = TestMockSpec.MATERIALIZATION_FILESYSTEM; + materialization = "filesystem"; watermarkColumn = info.watermarkColumn; includeSentinel = true; } else { - materialization = TestMockSpec.MATERIALIZATION_VIEW; + materialization = "view"; } mocks.add( @@ -177,47 +177,64 @@ public TestCompileResponse compile(TestCompileRequest request) { // ------------------------------------------------------------------------- /** - * Loads all pipeline statements into the session via configureSession. Records which objects are - * tables vs views and stores the original DDL for each. + * Loads all pipeline statements into the session and classifies them using Flink's own catalog + * introspection. No SQL text parsing is needed for classification — SHOW TABLES and SHOW VIEWS + * tell us exactly what Flink created. */ private PipelineInfo loadPipeline(Session session, List statements) { - Map allObjects = new LinkedHashMap<>(); // name → original DDL - Map tables = new LinkedHashMap<>(); - Map views = new LinkedHashMap<>(); - List configStatements = new ArrayList<>(); + List allStatements = new ArrayList<>(); + // Execute all statements in pipeline order. for (String sql : statements) { String trimmed = sql.trim(); if (trimmed.isEmpty()) { continue; } - - // Execute the statement to load it into the catalog. configureSession(session, trimmed); + allStatements.add(trimmed); + } - // Classify based on SQL text (simple prefix matching after execution). - String upper = trimmed.toUpperCase(); - if (upper.startsWith("CREATE") && upper.contains("TABLE")) { - String name = extractObjectName(trimmed); - if (name != null) { - tables.put(normalizeId(name), trimmed); - allObjects.put(normalizeId(name), trimmed); - } - } else if (upper.startsWith("CREATE") && upper.contains("VIEW")) { - String name = extractObjectName(trimmed); - if (name != null) { - views.put(normalizeId(name), trimmed); - allObjects.put(normalizeId(name), trimmed); + // Query the catalog to see what was created. + Set catalogTables = fetchNames(session, "SHOW TABLES"); + Set catalogViews = fetchNames(session, "SHOW VIEWS"); + + // Match each statement to a catalog object by extracting its name. + Map allObjects = new LinkedHashMap<>(); + Map tables = new LinkedHashMap<>(); + Map views = new LinkedHashMap<>(); + List configStatements = new ArrayList<>(); + + for (String stmt : allStatements) { + String name = extractObjectName(stmt); + if (name != null) { + String normalized = normalizeId(name); + if (catalogViews.contains(normalized)) { + views.put(normalized, stmt); + allObjects.put(normalized, stmt); + } else if (catalogTables.contains(normalized)) { + tables.put(normalized, stmt); + allObjects.put(normalized, stmt); + } else { + configStatements.add(stmt); } } else { - // SET, CREATE FUNCTION, USE, etc. - configStatements.add(trimmed); + configStatements.add(stmt); } } return new PipelineInfo(allObjects, tables, views, configStatements); } + /** Execute a SHOW command and return the names as a set. */ + private Set fetchNames(Session session, String showCommand) { + Set names = new LinkedHashSet<>(); + ResultSet result = executeAndFetch(session, showCommand); + for (org.apache.flink.table.data.RowData row : result.getData()) { + names.add(normalizeId(row.getString(0).toString())); + } + return names; + } + // ------------------------------------------------------------------------- // Schema resolution via DESCRIBE // ------------------------------------------------------------------------- @@ -464,15 +481,24 @@ static String cleanType(String type) { /** * Extract the object name from a CREATE TABLE/VIEW statement. Handles backtick-quoted and * plain identifiers, including fully-qualified names (extracts the last part). + * + *

Uses specific patterns to find the keyword position, avoiding false matches on TABLE/VIEW + * keywords that appear in the query body (e.g., FROM TABLE(TUMBLE(...))). */ static String extractObjectName(String ddl) { String upper = ddl.toUpperCase().trim(); - // Find position after TABLE or VIEW keyword. + // Search for CREATE [TEMPORARY] TABLE or CREATE [TEMPORARY] VIEW. int pos = -1; - for (String keyword : new String[] {"TABLE", "VIEW"}) { - int idx = upper.indexOf(keyword); + for (String pattern : + new String[] { + "CREATE TEMPORARY VIEW ", + "CREATE TEMPORARY TABLE ", + "CREATE VIEW ", + "CREATE TABLE " + }) { + int idx = upper.indexOf(pattern); if (idx >= 0) { - pos = idx + keyword.length(); + pos = idx + pattern.length(); break; } } From 37325c3d0f1d33b59d539f0a0fb43213302d8fa3 Mon Sep 17 00:00:00 2001 From: Luca Castelli Date: Tue, 10 Mar 2026 15:59:18 -0400 Subject: [PATCH 4/6] Split response into config_statements + pipeline_statements - config_statements: SET, CREATE FUNCTION, USE (before mocks) - pipeline_statements: view DDL between mocks and target (after mocks) - Per-statement catalog diff for object classification (no extractObjectName) - Remove extractObjectName entirely Co-Authored-By: Claude Opus 4.6 (1M context) --- .../api/testing/TestCompileResponse.java | 39 +++-- .../handler/testing/CompileTestHandler.java | 3 +- .../testing/CompileTestResponseBody.java | 24 ++- .../service/testing/TestPlanService.java | 151 +++++------------- 4 files changed, 88 insertions(+), 129 deletions(-) diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestCompileResponse.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestCompileResponse.java index 0500d0ce27cc7..3277569766d5e 100644 --- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestCompileResponse.java +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestCompileResponse.java @@ -25,27 +25,36 @@ /** * Response from compiling a test plan. * - *

Contains everything the CLI needs to set up an execution session: mock specifications with - * exact schemas, the ordered DDL statements to register between mocks and the target, and the query - * to execute. + *

Contains everything the CLI needs to set up an execution session. The execution order is: + * + *

    + *
  1. {@code configStatements} — session config, functions, USE statements (before mocks) + *
  2. Mock creation — CLI creates mocks using {@code mocks} specifications + *
  3. {@code pipelineStatements} — view DDL between mock boundaries and the target (after mocks) + *
  4. Execute {@code querySql} + *
*/ public final class TestCompileResponse { private final String contractVersion; - private final List sessionStatements; + private final List configStatements; + private final List pipelineStatements; private final String querySql; private final List warnings; private final List mocks; public TestCompileResponse( String contractVersion, - List sessionStatements, + List configStatements, + List pipelineStatements, String querySql, List warnings, List mocks) { this.contractVersion = Objects.requireNonNull(contractVersion); - this.sessionStatements = - Collections.unmodifiableList(Objects.requireNonNull(sessionStatements)); + this.configStatements = + Collections.unmodifiableList(Objects.requireNonNull(configStatements)); + this.pipelineStatements = + Collections.unmodifiableList(Objects.requireNonNull(pipelineStatements)); this.querySql = Objects.requireNonNull(querySql); this.warnings = Collections.unmodifiableList(Objects.requireNonNull(warnings)); this.mocks = Collections.unmodifiableList(Objects.requireNonNull(mocks)); @@ -57,11 +66,19 @@ public String getContractVersion() { } /** - * Ordered DDL statements the CLI should register after creating mocks. Includes view - * definitions between mock boundaries and the target, plus any config/function statements. + * Config/function statements to execute BEFORE creating mocks. Includes SET, CREATE FUNCTION, + * USE CATALOG, etc. + */ + public List getConfigStatements() { + return configStatements; + } + + /** + * Pipeline view DDL to execute AFTER creating mocks. These are the views between mock + * boundaries and the target, in dependency order. */ - public List getSessionStatements() { - return sessionStatements; + public List getPipelineStatements() { + return pipelineStatements; } /** The final query to execute against the target. */ diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/testing/CompileTestHandler.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/testing/CompileTestHandler.java index 73c285b206e07..863991a7838ad 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/testing/CompileTestHandler.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/testing/CompileTestHandler.java @@ -91,7 +91,8 @@ protected CompletableFuture handleRequest( private static CompileTestResponseBody toResponseBody(TestCompileResponse response) { return new CompileTestResponseBody( response.getContractVersion(), - response.getSessionStatements(), + response.getConfigStatements(), + response.getPipelineStatements(), response.getQuerySql(), response.getWarnings(), response.getMocks().stream() diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/testing/CompileTestResponseBody.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/testing/CompileTestResponseBody.java index 720eae9d03ad6..526d09cef5f22 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/testing/CompileTestResponseBody.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/testing/CompileTestResponseBody.java @@ -33,7 +33,8 @@ public class CompileTestResponseBody implements ResponseBody { private static final String FIELD_CONTRACT_VERSION = "contract_version"; - private static final String FIELD_SESSION_STATEMENTS = "session_statements"; + private static final String FIELD_CONFIG_STATEMENTS = "config_statements"; + private static final String FIELD_PIPELINE_STATEMENTS = "pipeline_statements"; private static final String FIELD_QUERY_SQL = "query_sql"; private static final String FIELD_WARNINGS = "warnings"; private static final String FIELD_MOCKS = "mocks"; @@ -41,8 +42,11 @@ public class CompileTestResponseBody implements ResponseBody { @JsonProperty(FIELD_CONTRACT_VERSION) private final String contractVersion; - @JsonProperty(FIELD_SESSION_STATEMENTS) - private final List sessionStatements; + @JsonProperty(FIELD_CONFIG_STATEMENTS) + private final List configStatements; + + @JsonProperty(FIELD_PIPELINE_STATEMENTS) + private final List pipelineStatements; @JsonProperty(FIELD_QUERY_SQL) private final String querySql; @@ -56,12 +60,14 @@ public class CompileTestResponseBody implements ResponseBody { @JsonCreator public CompileTestResponseBody( @JsonProperty(FIELD_CONTRACT_VERSION) String contractVersion, - @JsonProperty(FIELD_SESSION_STATEMENTS) List sessionStatements, + @JsonProperty(FIELD_CONFIG_STATEMENTS) List configStatements, + @JsonProperty(FIELD_PIPELINE_STATEMENTS) List pipelineStatements, @JsonProperty(FIELD_QUERY_SQL) String querySql, @JsonProperty(FIELD_WARNINGS) List warnings, @JsonProperty(FIELD_MOCKS) List mocks) { this.contractVersion = contractVersion; - this.sessionStatements = sessionStatements; + this.configStatements = configStatements; + this.pipelineStatements = pipelineStatements; this.querySql = querySql; this.warnings = warnings; this.mocks = mocks; @@ -71,8 +77,12 @@ public String getContractVersion() { return contractVersion; } - public List getSessionStatements() { - return sessionStatements; + public List getConfigStatements() { + return configStatements; + } + + public List getPipelineStatements() { + return pipelineStatements; } public String getQuerySql() { diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java index ea8f8f8e5557e..4f6c116ad4991 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java @@ -116,8 +116,8 @@ public TestCompileResponse compile(TestCompileRequest request) { boolean hasTemporal = containsTemporalJoin(explainPlan); // Phase 5: Determine needed views (between mocks and target). - List neededStatements = - computeNeededStatements(normalizedTarget, mockTargetSet, pipeline); + List pipelineStatements = + computeNeededViewStatements(normalizedTarget, mockTargetSet, pipeline); // Phase 6: Build mock specs. List mocks = new ArrayList<>(); @@ -159,7 +159,8 @@ public TestCompileResponse compile(TestCompileRequest request) { // Build response. return new TestCompileResponse( TestingApi.CONTRACT_VERSION, - neededStatements, + pipeline.configStatements, + pipelineStatements, "SELECT * FROM " + quoteId(normalizedTarget), warnings, mocks); @@ -177,55 +178,57 @@ public TestCompileResponse compile(TestCompileRequest request) { // ------------------------------------------------------------------------- /** - * Loads all pipeline statements into the session and classifies them using Flink's own catalog - * introspection. No SQL text parsing is needed for classification — SHOW TABLES and SHOW VIEWS - * tell us exactly what Flink created. + * Loads all pipeline statements into the session and classifies them by diffing the catalog + * before and after each statement. This uses only Flink's catalog APIs — no SQL text parsing + * needed for classification. */ private PipelineInfo loadPipeline(Session session, List statements) { - List allStatements = new ArrayList<>(); + Map allObjects = new LinkedHashMap<>(); + Map tables = new LinkedHashMap<>(); + Map views = new LinkedHashMap<>(); + List configStatements = new ArrayList<>(); - // Execute all statements in pipeline order. for (String sql : statements) { String trimmed = sql.trim(); if (trimmed.isEmpty()) { continue; } - configureSession(session, trimmed); - allStatements.add(trimmed); - } - // Query the catalog to see what was created. - Set catalogTables = fetchNames(session, "SHOW TABLES"); - Set catalogViews = fetchNames(session, "SHOW VIEWS"); + // Snapshot catalog state before this statement. + Set tablesBefore = fetchNames(session, "SHOW TABLES"); + Set viewsBefore = fetchNames(session, "SHOW VIEWS"); - // Match each statement to a catalog object by extracting its name. - Map allObjects = new LinkedHashMap<>(); - Map tables = new LinkedHashMap<>(); - Map views = new LinkedHashMap<>(); - List configStatements = new ArrayList<>(); + configureSession(session, trimmed); - for (String stmt : allStatements) { - String name = extractObjectName(stmt); - if (name != null) { - String normalized = normalizeId(name); - if (catalogViews.contains(normalized)) { - views.put(normalized, stmt); - allObjects.put(normalized, stmt); - } else if (catalogTables.contains(normalized)) { - tables.put(normalized, stmt); - allObjects.put(normalized, stmt); - } else { - configStatements.add(stmt); - } + // Diff catalog state to classify the statement. + Set tablesAfter = fetchNames(session, "SHOW TABLES"); + Set viewsAfter = fetchNames(session, "SHOW VIEWS"); + + Set newViews = new LinkedHashSet<>(viewsAfter); + newViews.removeAll(viewsBefore); + + Set newTables = new LinkedHashSet<>(tablesAfter); + newTables.removeAll(tablesBefore); + newTables.removeAll(newViews); // SHOW TABLES includes views; remove them. + + if (!newViews.isEmpty()) { + String name = newViews.iterator().next(); + views.put(name, trimmed); + allObjects.put(name, trimmed); + } else if (!newTables.isEmpty()) { + String name = newTables.iterator().next(); + tables.put(name, trimmed); + allObjects.put(name, trimmed); } else { - configStatements.add(stmt); + // No new catalog objects — this is a config statement (SET, CREATE FUNCTION, etc.) + configStatements.add(trimmed); } } return new PipelineInfo(allObjects, tables, views, configStatements); } - /** Execute a SHOW command and return the names as a set. */ + /** Execute a SHOW command and return the names as a normalized set. */ private Set fetchNames(Session session, String showCommand) { Set names = new LinkedHashSet<>(); ResultSet result = executeAndFetch(session, showCommand); @@ -292,17 +295,15 @@ private static boolean containsTemporalJoin(String plan) { // ------------------------------------------------------------------------- /** - * Computes the ordered list of DDL statements the CLI needs to register after creating mocks. - * This includes config statements and view DDL for views between mock boundaries and the target. + * Computes the ordered list of view DDL statements the CLI needs to register after creating + * mocks. Only includes views between mock boundaries and the target, in pipeline order. */ - private List computeNeededStatements( + private List computeNeededViewStatements( String target, Set mockTargets, PipelineInfo pipeline) { - // Collect views needed between mocks and target. Set neededViews = new LinkedHashSet<>(); collectNeededViews(target, mockTargets, pipeline, neededViews, new LinkedHashSet<>()); - // Build ordered list: config statements first, then needed view DDL in pipeline order. - List result = new ArrayList<>(pipeline.configStatements); + List result = new ArrayList<>(); for (Map.Entry entry : pipeline.views.entrySet()) { if (neededViews.contains(entry.getKey())) { result.add(entry.getValue()); @@ -478,76 +479,6 @@ static String cleanType(String type) { return cleaned; } - /** - * Extract the object name from a CREATE TABLE/VIEW statement. Handles backtick-quoted and - * plain identifiers, including fully-qualified names (extracts the last part). - * - *

Uses specific patterns to find the keyword position, avoiding false matches on TABLE/VIEW - * keywords that appear in the query body (e.g., FROM TABLE(TUMBLE(...))). - */ - static String extractObjectName(String ddl) { - String upper = ddl.toUpperCase().trim(); - // Search for CREATE [TEMPORARY] TABLE or CREATE [TEMPORARY] VIEW. - int pos = -1; - for (String pattern : - new String[] { - "CREATE TEMPORARY VIEW ", - "CREATE TEMPORARY TABLE ", - "CREATE VIEW ", - "CREATE TABLE " - }) { - int idx = upper.indexOf(pattern); - if (idx >= 0) { - pos = idx + pattern.length(); - break; - } - } - if (pos < 0) { - return null; - } - - // Skip whitespace and optional IF NOT EXISTS / IF EXISTS. - String rest = ddl.substring(pos).trim(); - if (rest.toUpperCase().startsWith("IF NOT EXISTS")) { - rest = rest.substring("IF NOT EXISTS".length()).trim(); - } else if (rest.toUpperCase().startsWith("IF EXISTS")) { - rest = rest.substring("IF EXISTS".length()).trim(); - } - - // Extract the identifier (possibly dot-separated, possibly quoted). - StringBuilder name = new StringBuilder(); - int i = 0; - while (i < rest.length()) { - char c = rest.charAt(i); - if (c == '`') { - // Backtick-quoted identifier. - i++; - while (i < rest.length() && rest.charAt(i) != '`') { - name.append(rest.charAt(i)); - i++; - } - i++; // skip closing backtick - } else if (c == '"') { - i++; - while (i < rest.length() && rest.charAt(i) != '"') { - name.append(rest.charAt(i)); - i++; - } - i++; - } else if (c == '.') { - // Dot separator in qualified name — reset to take the last part. - name.setLength(0); - i++; - } else if (Character.isLetterOrDigit(c) || c == '_' || c == '-') { - name.append(c); - i++; - } else { - break; // Hit space, paren, or other delimiter. - } - } - return name.length() > 0 ? name.toString() : null; - } - // ------------------------------------------------------------------------- // Internal data classes // ------------------------------------------------------------------------- From e8e58b16b55ed211e7691fa130ffc16b43c010fa Mon Sep 17 00:00:00 2001 From: Luca Castelli Date: Tue, 10 Mar 2026 16:46:23 -0400 Subject: [PATCH 5/6] Harden gateway: SQL escaping, DESCRIBE bounds, unit tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix insideStringLiteral → insideStringOrComment: handle SQL doubled- quote escaping (''), skip -- line comments and /* */ block comments - sqlReferencesObject now skips matches in comments too - Add bounds checking in describeMock for DESCRIBE result columns - Add 23 unit tests for sqlReferencesObject, insideStringOrComment, cleanType, and normalizeId Co-Authored-By: Claude Opus 4.6 (1M context) --- .../service/testing/TestPlanService.java | 63 +++++- .../service/testing/TestPlanServiceTest.java | 182 ++++++++++++++++++ 2 files changed, 236 insertions(+), 9 deletions(-) create mode 100644 flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/testing/TestPlanServiceTest.java diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java index 4f6c116ad4991..b176c77fd4971 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/testing/TestPlanService.java @@ -249,9 +249,19 @@ private MockInfo describeMock(Session session, String objectName) { ResultSet result = executeAndFetch(session, "DESCRIBE " + quoteId(objectName)); for (org.apache.flink.table.data.RowData row : result.getData()) { + int arity = row.getArity(); + if (arity < 2) { + LOG.warn("DESCRIBE {} returned row with {} columns, expected at least 2", objectName, arity); + continue; + } String colName = row.getString(0).toString(); String colType = cleanType(row.getString(1).toString()); - String watermark = row.isNullAt(5) ? null : row.getString(5).toString(); + + // Watermark info is in column 5 (standard Flink DESCRIBE output). + String watermark = null; + if (arity > 5 && !row.isNullAt(5)) { + watermark = row.getString(5).toString(); + } columns.add(new TestSchemaColumn(colName, colType)); if (watermark != null && !watermark.isEmpty()) { @@ -347,7 +357,8 @@ private void collectNeededViews( /** * Checks if a SQL statement references an object name using word-boundary matching. Skips - * matches inside single-quoted string literals. + * matches inside single-quoted string literals, line comments ({@code --}), and block comments + * ({@code /* * /}). */ static boolean sqlReferencesObject(String sql, String objectName) { String lower = sql.toLowerCase(); @@ -363,7 +374,7 @@ static boolean sqlReferencesObject(String sql, String objectName) { end >= lower.length() || (!Character.isLetterOrDigit(lower.charAt(end)) && lower.charAt(end) != '_'); - if (leftOk && rightOk && !insideStringLiteral(lower, idx)) { + if (leftOk && rightOk && !insideStringOrComment(lower, idx)) { return true; } idx = end; @@ -371,15 +382,49 @@ static boolean sqlReferencesObject(String sql, String objectName) { return false; } - /** Check if position is inside a single-quoted string literal. */ - private static boolean insideStringLiteral(String sql, int position) { - int quotes = 0; + /** + * Check if a position in SQL text is inside a string literal, line comment, or block comment. + */ + static boolean insideStringOrComment(String sql, int position) { + boolean inString = false; + boolean inLineComment = false; + boolean inBlockComment = false; for (int i = 0; i < position; i++) { - if (sql.charAt(i) == '\'' && (i == 0 || sql.charAt(i - 1) != '\\')) { - quotes++; + char c = sql.charAt(i); + char next = (i + 1 < sql.length()) ? sql.charAt(i + 1) : 0; + + if (inLineComment) { + if (c == '\n') { + inLineComment = false; + } + continue; + } + if (inBlockComment) { + if (c == '*' && next == '/') { + inBlockComment = false; + i++; + } + continue; + } + if (!inString && c == '-' && next == '-') { + inLineComment = true; + i++; + continue; + } + if (!inString && c == '/' && next == '*') { + inBlockComment = true; + i++; + continue; + } + if (c == '\'') { + if (inString && next == '\'') { + i++; // skip SQL doubled-quote escape + } else { + inString = !inString; + } } } - return quotes % 2 != 0; + return inString || inLineComment || inBlockComment; } // ------------------------------------------------------------------------- diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/testing/TestPlanServiceTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/testing/TestPlanServiceTest.java new file mode 100644 index 0000000000000..2e664f3f783df --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/testing/TestPlanServiceTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.testing; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link TestPlanService} utility methods. */ +class TestPlanServiceTest { + + // ------------------------------------------------------------------------- + // sqlReferencesObject + // ------------------------------------------------------------------------- + + @Test + void sqlReferencesObjectFindsSimpleReference() { + assertThat(TestPlanService.sqlReferencesObject( + "SELECT * FROM payment_events", "payment_events")).isTrue(); + } + + @Test + void sqlReferencesObjectRespectsWordBoundaries() { + assertThat(TestPlanService.sqlReferencesObject( + "SELECT * FROM payment_events_enriched", "payment_events")).isFalse(); + } + + @Test + void sqlReferencesObjectSkipsStringLiterals() { + assertThat(TestPlanService.sqlReferencesObject( + "SELECT * FROM t WHERE name = 'payment_events'", "payment_events")).isFalse(); + } + + @Test + void sqlReferencesObjectHandlesDoubledQuoteEscape() { + // SQL escapes quotes by doubling: 'O''Brien' + assertThat(TestPlanService.sqlReferencesObject( + "SELECT * FROM t WHERE name = 'it''s payment_events here'", "payment_events")) + .isFalse(); + } + + @Test + void sqlReferencesObjectSkipsLineComments() { + assertThat(TestPlanService.sqlReferencesObject( + "SELECT * FROM t -- references payment_events\nWHERE 1=1", "payment_events")) + .isFalse(); + } + + @Test + void sqlReferencesObjectSkipsBlockComments() { + assertThat(TestPlanService.sqlReferencesObject( + "SELECT * FROM t /* payment_events */ WHERE 1=1", "payment_events")).isFalse(); + } + + @Test + void sqlReferencesObjectMatchesAfterComment() { + assertThat(TestPlanService.sqlReferencesObject( + "-- comment\nSELECT * FROM payment_events", "payment_events")).isTrue(); + } + + @Test + void sqlReferencesObjectCaseInsensitive() { + assertThat(TestPlanService.sqlReferencesObject( + "SELECT * FROM PAYMENT_EVENTS", "payment_events")).isTrue(); + } + + // ------------------------------------------------------------------------- + // insideStringOrComment + // ------------------------------------------------------------------------- + + @Test + void insideStringOrCommentDetectsSimpleString() { + String sql = "SELECT 'hello world' FROM t"; + // Position of 'w' in 'world' — inside string + int pos = sql.indexOf("world"); + assertThat(TestPlanService.insideStringOrComment(sql, pos)).isTrue(); + } + + @Test + void insideStringOrCommentOutsideString() { + String sql = "SELECT 'hello' FROM t"; + int pos = sql.indexOf("FROM"); + assertThat(TestPlanService.insideStringOrComment(sql, pos)).isFalse(); + } + + @Test + void insideStringOrCommentHandlesDoubledQuotes() { + // 'O''Brien' — the second quote is escaped, not a boundary + String sql = "SELECT * FROM t WHERE name = 'O''Brien' AND id = 1"; + int pos = sql.indexOf("AND"); + assertThat(TestPlanService.insideStringOrComment(sql, pos)).isFalse(); + } + + @Test + void insideStringOrCommentSkipsLineComment() { + String sql = "SELECT * -- comment with 'quote\nFROM t"; + int pos = sql.indexOf("FROM"); + assertThat(TestPlanService.insideStringOrComment(sql, pos)).isFalse(); + } + + @Test + void insideStringOrCommentSkipsBlockComment() { + String sql = "SELECT * /* comment with 'quote */ FROM t"; + int pos = sql.indexOf("FROM"); + assertThat(TestPlanService.insideStringOrComment(sql, pos)).isFalse(); + } + + // ------------------------------------------------------------------------- + // cleanType + // ------------------------------------------------------------------------- + + @Test + void cleanTypeRemovesRowtime() { + assertThat(TestPlanService.cleanType("TIMESTAMP(3) *ROWTIME*")).isEqualTo("TIMESTAMP(3)"); + } + + @Test + void cleanTypeRemovesNotNull() { + assertThat(TestPlanService.cleanType("INT NOT NULL")).isEqualTo("INT"); + } + + @Test + void cleanTypeRemovesMetadata() { + assertThat(TestPlanService.cleanType("BIGINT METADATA FROM 'offset'")).isEqualTo("BIGINT"); + } + + @Test + void cleanTypeHandlesCombination() { + assertThat(TestPlanService.cleanType("TIMESTAMP(3) *ROWTIME* NOT NULL")) + .isEqualTo("TIMESTAMP(3)"); + } + + @Test + void cleanTypeLeavesSimpleTypeAlone() { + assertThat(TestPlanService.cleanType("STRING")).isEqualTo("STRING"); + } + + @Test + void cleanTypeHandlesDecimal() { + assertThat(TestPlanService.cleanType("DECIMAL(10, 2)")).isEqualTo("DECIMAL(10, 2)"); + } + + // ------------------------------------------------------------------------- + // normalizeId + // ------------------------------------------------------------------------- + + @Test + void normalizeIdLowercases() { + assertThat(TestPlanService.normalizeId("Payment_Events")).isEqualTo("payment_events"); + } + + @Test + void normalizeIdStripsBackticks() { + assertThat(TestPlanService.normalizeId("`payment_events`")).isEqualTo("payment_events"); + } + + @Test + void normalizeIdStripsDoubleQuotes() { + assertThat(TestPlanService.normalizeId("\"payment_events\"")).isEqualTo("payment_events"); + } + + @Test + void normalizeIdTrims() { + assertThat(TestPlanService.normalizeId(" payment_events ")).isEqualTo("payment_events"); + } +} From 901864701344ec4659568298c465d51ba105200c Mon Sep 17 00:00:00 2001 From: Luca Castelli Date: Tue, 10 Mar 2026 17:35:33 -0400 Subject: [PATCH 6/6] Document sessionObject for v2 CTE targeting Co-Authored-By: Claude Opus 4.6 (1M context) --- .../flink/table/gateway/api/testing/TestMockSpec.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestMockSpec.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestMockSpec.java index 7038d46c9ab4b..419fab00e1084 100644 --- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestMockSpec.java +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/testing/TestMockSpec.java @@ -70,8 +70,10 @@ public String getRequestedName() { } /** - * The SQL identifier the toolkit should use when creating the mock object. Matches the original - * pipeline object name so downstream views can reference it. + * The SQL identifier the toolkit should use when creating the mock object. For v1 (top-level + * targets only), this matches {@code requestedName}. For v2 with CTE targeting, this may differ + * — e.g., a CTE mock like {@code charges_view.base_deduped} would need a dot-free session + * identifier. */ public String getSessionObject() { return sessionObject;