diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index 78f61e1ce107e..c91107b2df245 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -296,17 +296,19 @@ This is useful when you need to materialize changelog events into a downstream s SELECT * FROM TO_CHANGELOG( input => TABLE source_table [PARTITION BY key_col], [op => DESCRIPTOR(op_column_name),] - [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]] + [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...],] + [produces_full_deletes => BOOLEAN] ) ``` ### Parameters -| Parameter | Required | Description | -|:-------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located and run in the same operator instance. Without `PARTITION BY`, each row is processed independently. Accepts insert-only, retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key should match or be a subset of the upsert key of the subquery. | -| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. | -| `op_mapping` | No | A `MAP` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. | +| Parameter | Required | Description | +|:------------------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located and run in the same operator instance. Without `PARTITION BY`, each row is processed independently. Accepts insert-only, retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key should match or be a subset of the upsert key of the subquery. | +| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. | +| `op_mapping` | No | A `MAP` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. | +| `produces_full_deletes` | No | A `BOOLEAN` literal that controls how DELETE rows are emitted. When `true` (default), DELETE rows carry all columns, the full image. When `false`, only the identifying key columns are preserved and the rest are nulled. See [Full vs partial deletes](#full-vs-partial-deletes) for more details. | #### Default op_mapping @@ -397,6 +399,90 @@ SELECT * FROM TO_CHANGELOG( -- UPDATE_BEFORE is dropped (not in the mapping) ``` +#### Upsert key + +An **upsert key** is a column or set of columns that uniquely identifies a row across its lifecycle in a changelog. It is what downstream operators and sinks use to decide which earlier row a new INSERT, UPDATE_AFTER, or DELETE refers to. + +The planner derives the upsert key from the input table: + +* A declared `PRIMARY KEY` on the source table when reading directly. +* The grouping columns of an upstream `GROUP BY `. +* The keys propagated by operators that preserve them (e.g. lookup joins, calc-projections that keep the key columns). + +When no upsert key can be derived (e.g. a plain append-only source with no key constraint and no grouping upstream), the input has no row identity and downstream operators must treat it as append-only or fall back to retract semantics. + +`TO_CHANGELOG` consumes the upsert key to decide which columns to preserve when emitting partial DELETE rows. See [Full vs partial deletes](#full-vs-partial-deletes) below. + +#### Full vs partial deletes + +The `produces_full_deletes` argument controls how DELETE rows are emitted and what the planner requires from the input. The matrix below shows each combination with `PARTITION BY` (set semantics) and without (row semantics). When `false`, the function relies on the input table's [upsert key](#upsert-key) to decide which columns to preserve. + +##### `produces_full_deletes => true` (default) + +The planner requires fully-populated DELETE rows on the input. For upsert sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted upstream to materialize the full pre-image from state. For sources that already emit a full pre-image (e.g. retract), the flag is a no-op. The function then passes the input row through unchanged on DELETE. + +**Row semantics** (no `PARTITION BY`): + +```sql +-- Upsert source: -D[id:5] (key-only). +-- ChangelogNormalize materializes the full pre-image from state. +-- Output: +I[op:'DELETE', id:5, name:'Alice'] +SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source) + +-- Retract source: -D[id:5, name:'Alice'] (full pre-image). +-- No ChangelogNormalize inserted; the input row is passed through unchanged. +-- Output: +I[op:'DELETE', id:5, name:'Alice'] +SELECT * FROM TO_CHANGELOG(input => TABLE retract_source) +``` + +**Set semantics** (`PARTITION BY`): + +```sql +-- Upsert source: -D[id:5] (key-only). +-- ChangelogNormalize materializes the full pre-image from state. +-- Output: +I[id:5, op:'DELETE', name:'Alice'] +SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id) + +-- Retract source: -D[id:5, name:'Alice'] (full pre-image). +-- No ChangelogNormalize inserted; the input row is passed through unchanged. +-- Output: +I[id:5, op:'DELETE', name:'Alice'] +SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id) +``` + +##### `produces_full_deletes => false` + +The planner skips `ChangelogNormalize` and the function emits partial DELETE rows. This avoids the stateful normalization operator for upsert sources (e.g. Kafka compacted topics) where the full pre-image is not needed downstream. Requires an [upsert key](#upsert-key) to be present for the input table (row semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected with a validation error. + +**Row semantics** (no `PARTITION BY`): the function preserves the planner-derived upsert key columns on DELETE rows and nulls the rest. The upsert key is typically a declared `PRIMARY KEY` when directly reading from a source or the key provided in a `GROUP BY `. + +```sql +-- Upsert source with PRIMARY KEY (id): -D[id:5] (key-only). +-- Output: +I[op:'DELETE', id:5, name:null] +SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source, produces_full_deletes => false) + +-- Retract source with PRIMARY KEY (id): -D[id:5, name:'Alice'] (full pre-image). +-- Output: +I[op:'DELETE', id:5, name:null] +SELECT * FROM TO_CHANGELOG(input => TABLE retract_source, produces_full_deletes => false) +``` + +**Set semantics** (`PARTITION BY`): the function preserves the partition key and nulls every non-partition-key column on DELETE rows. The key used as the partition-key column should be the unique key that will be used as the record identifier. This matches the shape expected by upsert sinks and Kafka compacted topics. + +```sql +-- Upsert source: -D[id:5] (key-only). +-- Output: +I[id:5, op:'DELETE', name:null] +SELECT * FROM TO_CHANGELOG( + input => TABLE upsert_source PARTITION BY id, + produces_full_deletes => false +) + +-- Retract source: -D[id:5, name:'Alice'] (full pre-image). +-- Output: +I[id:5, op:'DELETE', name:null] +SELECT * FROM TO_CHANGELOG( + input => TABLE retract_source PARTITION BY id, + produces_full_deletes => false +) +``` + #### Partitioning by a key ```sql @@ -434,6 +520,13 @@ Table result = myTable.toChangelog( map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") ); +// Opt out of full-delete semantics. When `true` (default), DELETE rows carry the full +// pre-image. When `false`, only the identifying key columns are preserved and the rest +// are nulled. See [Full vs partial deletes](#full-vs-partial-deletes) for more details. +Table result = myTable.toChangelog( + lit(false).asArgument("produces_full_deletes") +); + // Set semantics: co-locate rows with the same key in the same parallel operator instance. // Equivalent to PARTITION BY in SQL. The partition keys are prepended to the output columns. Table result = myTable.partitionBy($("id")).toChangelog(); diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py index 583167fa47838..e7898e8752d1f 100644 --- a/flink-python/pyflink/table/table.py +++ b/flink-python/pyflink/table/table.py @@ -1196,10 +1196,19 @@ def to_changelog(self, *arguments: Expression) -> 'Table': INSERT-only row with a string ``op`` column indicating the original operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE). + The optional ``produces_full_deletes`` boolean controls how DELETE rows are + emitted. When ``True`` (default), the planner inserts a ``ChangelogNormalize`` + operator for upsert sources that emit key-only deletes so the function emits + fully populated DELETE rows downstream. When ``False``, the function emits + partial DELETE rows: row semantics preserves the planner-derived upsert key + columns and nulls the rest, set semantics (``PARTITION BY``) preserves the + partition key and nulls the rest. Requires an upsert key or ``PARTITION BY``; + otherwise the call is rejected. + Example: :: - >>> from pyflink.table.expressions import descriptor, map_ + >>> from pyflink.table.expressions import descriptor, map_, lit, col >>> # Default: adds 'op' column with standard change operation names >>> result = table.to_changelog() >>> # Custom op column name and mapping @@ -1213,8 +1222,14 @@ def to_changelog(self, *arguments: Expression) -> 'Table': ... map_("INSERT, UPDATE_AFTER", "false", ... "DELETE", "true").as_argument("op_mapping") ... ) + >>> # Opt out of full-delete semantics to emit partial DELETE rows. + >>> # Requires an upsert key or PARTITION BY; otherwise rejected. + >>> result = table.to_changelog( + ... lit(False).as_argument("produces_full_deletes") + ... ) - :param arguments: Optional named arguments for ``op`` and ``op_mapping``. + :param arguments: Optional named arguments for ``op``, ``op_mapping``, and + ``produces_full_deletes``. :return: An append-only :class:`~pyflink.table.Table` with an ``op`` column prepended to the input columns. """ diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java index 51086f1edfe22..401317d125d4c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java @@ -204,9 +204,19 @@ public interface PartitionedTable { * descriptor("deleted").asArgument("op"), * map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") * ); + * + * // Opt out of full-delete semantics. When `true` (default), DELETE rows carry the full + * // pre-image. When `false`, only the identifying key columns are preserved and the rest + * // are nulled. See [Full vs partial deletes]( + * // https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/changelog/#full-vs-partial-deletes) + * // for more details. + * Table result = table + * .partitionBy($("id")) + * .toChangelog(lit(false).asArgument("produces_full_deletes")); * } * - * @param arguments optional named arguments for {@code op} and {@code op_mapping} + * @param arguments optional named arguments for {@code op}, {@code op_mapping}, and {@code + * produces_full_deletes} * @return an append-only {@link Table} with output schema {@code [partition_keys, op, * non_partition_input_columns]} * @see Table#toChangelog(Expression...) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java index 0ffb90c5fcc02..ab4a617a2df6a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java @@ -1459,9 +1459,19 @@ default TableResult executeInsert( * descriptor("deleted").asArgument("op"), * map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") * ); + * + * // Opt out of full-delete semantics. When `true` (default), DELETE rows carry the full + * // pre-image. When `false`, only the identifying key columns are preserved and the rest + * // are nulled. See [Full vs partial deletes]( + * // https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/changelog/#full-vs-partial-deletes) + * // for more details. + * Table result = table.toChangelog( + * lit(false).asArgument("produces_full_deletes") + * ); * } * - * @param arguments optional named arguments for {@code op} and {@code op_mapping} + * @param arguments optional named arguments for {@code op}, {@code op_mapping}, and {@code + * produces_full_deletes} * @return an append-only {@link Table} with an {@code op} column prepended to the input columns */ Table toChangelog(Expression... arguments); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java index be8e578deea4d..5e4a02cb9fb67 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java @@ -781,6 +781,11 @@ public Optional changelogMode() { return Optional.empty(); } + @Override + public List upsertKeyColumns() { + return Collections.emptyList(); + } + private PartitionQueryOperation findPartitionOperation(QueryOperation op) { if (op instanceof PartitionQueryOperation) { return (PartitionQueryOperation) op; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index e3d30a5c8da6b..6cfd89110186f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -840,25 +840,41 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "UPDATE_BEFORE")))) .withConditionalTrait( StaticArgumentTrait.REQUIRE_FULL_DELETE, - TraitCondition.or( - // op_mapping omitted: default mapping includes - // DELETE. - TraitCondition.not( - TraitCondition.argIsPresent( - "op_mapping")), - TraitCondition.argMatches( - "op_mapping", - Map.class, - mapping -> - opMappingContainsKey( - (Map) - mapping, - "DELETE")))), + // Require full deletes by default. The user can opt + // out via produces_full_deletes=FALSE. + // REQUIRE_FULL_DELETE + // still gates on the active op_mapping mapping DELETE; + // otherwise no DELETE rows reach the function and there + // is no point inserting ChangelogNormalize upstream. + TraitCondition.and( + TraitCondition.or( + TraitCondition.not( + TraitCondition.argIsPresent( + "produces_full_deletes")), + TraitCondition.argIsEqualTo( + "produces_full_deletes", + Boolean.TRUE)), + TraitCondition.or( + TraitCondition.not( + TraitCondition.argIsPresent( + "op_mapping")), + TraitCondition.argMatches( + "op_mapping", + Map.class, + mapping -> + opMappingContainsKey( + (Map< + String, + String>) + mapping, + "DELETE"))))), StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true), StaticArgument.scalar( "op_mapping", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()), - true)) + true), + StaticArgument.scalar( + "produces_full_deletes", DataTypes.BOOLEAN(), true)) .inputTypeStrategy(TO_CHANGELOG_INPUT_TYPE_STRATEGY) .outputTypeStrategy(TO_CHANGELOG_OUTPUT_TYPE_STRATEGY) .runtimeClass( diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java index f63566befce7d..3d17401c5efab 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java @@ -24,6 +24,7 @@ import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.types.DataType; +import java.util.List; import java.util.Optional; /** @@ -128,6 +129,29 @@ public interface TableSemantics { */ Optional changelogMode(); + /** + * Upsert key candidates derived from the passed table's metadata. + * + *

Returns a list of 0-based column index arrays that uniquely identify a row for upsert + * semantics. This is distinct from {@link #partitionByColumns()}: partition keys describe + * distribution and co-location, upsert keys describe row identity. Useful for functions that + * need to emit key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER pairs, or want to have a + * unique identifier to interact with state. + * + *

Returns an empty list when no upsert key is derivable, or when the planner has not yet + * computed metadata (during type inference). + * + *

When the planner derives multiple candidate upsert keys for the same input (e.g., a table + * with several primary key constraints), all of them are returned. Picking which candidate to + * use is the function's responsibility, and the choice must be stable across releases to keep + * PTF state consistent after job restarts and upgrades. The order of the returned list is not + * part of the contract; PTF authors should not depend on it. A typical choice is the smallest + * candidate by cardinality, with ties broken by the column indices in ascending order. + * + * @return Candidate upsert keys of the passed table, or an empty list if none. + */ + List upsertKeyColumns(); + /** The sort direction for ORDER BY columns in table arguments with set semantics. */ @PublicEvolving enum SortDirection { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java index ba87cc0559dd6..4c0149f6533b1 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java @@ -40,6 +40,7 @@ enum Kind { ARG_IS_PRESENT, NOT, OR, + AND, ARG_MATCHES } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java index 14734db11f4b6..8b86b37cc9fb8 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java @@ -84,6 +84,17 @@ static TraitCondition or(final TraitCondition left, final TraitCondition right) ctx -> left.test(ctx) || right.test(ctx)); } + /** + * True when both the {@code left} and the {@code right} {@link TraitCondition} evaluate to + * true. + */ + static TraitCondition and(final TraitCondition left, final TraitCondition right) { + return new BuiltInCondition( + BuiltInCondition.Kind.AND, + List.of(left, right), + ctx -> left.test(ctx) && right.test(ctx)); + } + /** True when the named scalar argument was provided by the caller. */ static TraitCondition argIsPresent(final String argName) { return new BuiltInCondition( diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java index d977deab4a93b..4c27aae4723a9 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java @@ -22,17 +22,13 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.DataTypes.Field; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.TableSemantics; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.inference.ArgumentCount; import org.apache.flink.table.types.inference.CallContext; -import org.apache.flink.table.types.inference.ConstantArgumentCount; import org.apache.flink.table.types.inference.InputTypeStrategy; -import org.apache.flink.table.types.inference.Signature; -import org.apache.flink.table.types.inference.Signature.Argument; import org.apache.flink.table.types.inference.TypeStrategy; import org.apache.flink.types.ColumnList; +import org.apache.flink.types.RowKind; import java.util.ArrayList; import java.util.Arrays; @@ -46,39 +42,30 @@ @Internal public final class ToChangelogTypeStrategy { - private static final String DEFAULT_OP_COLUMN_NAME = "op"; + // Positional argument indexes for TO_CHANGELOG. Must match the order of StaticArguments + // registered in BuiltInFunctionDefinitions#TO_CHANGELOG; changing one without the other + // silently breaks argument resolution. + public static final int ARG_TABLE = 0; + public static final int ARG_OP = 1; + public static final int ARG_OP_MAPPING = 2; + public static final int ARG_PRODUCES_FULL_DELETES = 3; private static final Set VALID_ROW_KIND_NAMES = Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE"); + private static final String DELETE = RowKind.DELETE.name(); + // -------------------------------------------------------------------------------------------- // Input validation // -------------------------------------------------------------------------------------------- public static final InputTypeStrategy INPUT_TYPE_STRATEGY = - new InputTypeStrategy() { - @Override - public ArgumentCount getArgumentCount() { - return ConstantArgumentCount.between(1, 3); - } - + new ValidationOnlyInputTypeStrategy() { @Override public Optional> inferInputTypes( final CallContext callContext, final boolean throwOnFailure) { return validateInputs(callContext, throwOnFailure); } - - @Override - public List getExpectedSignatures(final FunctionDefinition definition) { - return List.of( - Signature.of(Argument.of("input", "TABLE")), - Signature.of( - Argument.of("input", "TABLE"), Argument.of("op", "DESCRIPTOR")), - Signature.of( - Argument.of("input", "TABLE"), - Argument.of("op", "DESCRIPTOR"), - Argument.of("op_mapping", "MAP"))); - } }; // -------------------------------------------------------------------------------------------- @@ -89,20 +76,16 @@ public List getExpectedSignatures(final FunctionDefinition definition callContext -> { final TableSemantics semantics = callContext - .getTableSemantics(0) + .getTableSemantics(ARG_TABLE) .orElseThrow( () -> new ValidationException( "First argument must be a table for TO_CHANGELOG.")); - final String opColumnName = resolveOpColumnName(callContext); - final List inputFields = DataType.getFields(semantics.dataType()); - final int[] outputIndices = - ChangelogTypeStrategyUtils.computeOutputIndices(semantics); + final String opColumnName = + ChangelogTypeStrategyUtils.resolveOpColumnName(callContext); - final List outputFields = new ArrayList<>(); - outputFields.add(DataTypes.FIELD(opColumnName, DataTypes.STRING())); - Arrays.stream(outputIndices).mapToObj(inputFields::get).forEach(outputFields::add); + final List outputFields = buildOutputFields(semantics, opColumnName); return Optional.of(DataTypes.ROW(outputFields).notNull()); }; @@ -113,38 +96,69 @@ public List getExpectedSignatures(final FunctionDefinition definition private static Optional> validateInputs( final CallContext callContext, final boolean throwOnFailure) { - final boolean isMissingTableArg = callContext.getTableSemantics(0).isEmpty(); - if (isMissingTableArg) { + Optional> error; + + error = validateTableArg(callContext, throwOnFailure); + if (error.isPresent()) { + return error; + } + + error = validateOpDescriptor(callContext, throwOnFailure); + if (error.isPresent()) { + return error; + } + + error = validateOpMapping(callContext, throwOnFailure); + if (error.isPresent()) { + return error; + } + + error = validateProducesFullDeletes(callContext, throwOnFailure); + if (error.isPresent()) { + return error; + } + + return Optional.of(callContext.getArgumentDataTypes()); + } + + private static Optional> validateTableArg( + final CallContext callContext, final boolean throwOnFailure) { + if (callContext.getTableSemantics(ARG_TABLE).isEmpty()) { return callContext.fail( throwOnFailure, "First argument must be a table for TO_CHANGELOG."); } + return Optional.empty(); + } - final Optional opDescriptor = callContext.getArgumentValue(1, ColumnList.class); - final boolean hasInvalidOpDescriptor = - opDescriptor.isPresent() && opDescriptor.get().getNames().size() != 1; - if (hasInvalidOpDescriptor) { + private static Optional> validateOpDescriptor( + final CallContext callContext, final boolean throwOnFailure) { + final Optional opDescriptor = + callContext.getArgumentValue(ARG_OP, ColumnList.class); + if (opDescriptor.isPresent() && opDescriptor.get().getNames().size() != 1) { return callContext.fail( throwOnFailure, "The descriptor for argument 'op' must contain exactly one column name."); } + return Optional.empty(); + } - final boolean hasMappingArgProvided = !callContext.isArgumentNull(2); - final boolean isMappingArgLiteral = callContext.isArgumentLiteral(2); + /** Validates op_mapping is a constant literal and that its keys are well-formed. */ + @SuppressWarnings("rawtypes") + private static Optional> validateOpMapping( + final CallContext callContext, final boolean throwOnFailure) { + final boolean hasMappingArgProvided = !callContext.isArgumentNull(ARG_OP_MAPPING); + final boolean isMappingArgLiteral = callContext.isArgumentLiteral(ARG_OP_MAPPING); if (hasMappingArgProvided && !isMappingArgLiteral) { return callContext.fail( throwOnFailure, "The 'op_mapping' argument must be a constant MAP literal."); } - final Optional opMapping = callContext.getArgumentValue(2, Map.class); + final Optional opMapping = callContext.getArgumentValue(ARG_OP_MAPPING, Map.class); if (opMapping.isPresent()) { - final Optional> validationError = - validateOpMappingKeys(callContext, opMapping.get(), throwOnFailure); - if (validationError.isPresent()) { - return validationError; - } + return validateOpMappingKeys( + callContext, (Map) opMapping.get(), throwOnFailure); } - - return Optional.of(callContext.getArgumentDataTypes()); + return Optional.empty(); } /** @@ -153,16 +167,13 @@ private static Optional> validateInputs( * trimmed. Names are case-sensitive and must match exactly (e.g., {@code INSERT}, not {@code * insert}). Each name must be valid and appear at most once across all entries. */ - @SuppressWarnings("rawtypes") private static Optional> validateOpMappingKeys( - final CallContext callContext, final Map opMapping, final boolean throwOnFailure) { + final CallContext callContext, + final Map opMapping, + final boolean throwOnFailure) { final Set allRowKindsSeen = new HashSet<>(); - for (final Object key : opMapping.keySet()) { - if (!(key instanceof String)) { - return callContext.fail( - throwOnFailure, "Invalid target mapping for argument 'op_mapping'."); - } - final String[] rowKindNames = ((String) key).split(","); + for (final String key : opMapping.keySet()) { + final String[] rowKindNames = key.split(","); for (final String rawName : rowKindNames) { final String rowKindName = rawName.trim(); if (!VALID_ROW_KIND_NAMES.contains(rowKindName)) { @@ -170,7 +181,7 @@ private static Optional> validateOpMappingKeys( throwOnFailure, String.format( "Invalid target mapping for argument 'op_mapping'. " - + "Unknown change operation: '%s'. Valid values are: %s.", + + "Unknown change operation: '%s'. Operations are case-sensitive. Valid values are: %s.", rowKindName, VALID_ROW_KIND_NAMES)); } final boolean isDuplicate = !allRowKindsSeen.add(rowKindName); @@ -187,12 +198,63 @@ private static Optional> validateOpMappingKeys( return Optional.empty(); } - private static String resolveOpColumnName(final CallContext callContext) { - return callContext - .getArgumentValue(1, ColumnList.class) - .filter(cl -> !cl.getNames().isEmpty()) - .map(cl -> cl.getNames().get(0)) - .orElse(DEFAULT_OP_COLUMN_NAME); + @SuppressWarnings("rawtypes") + private static Optional> validateProducesFullDeletes( + final CallContext callContext, final boolean throwOnFailure) { + final boolean isExplicit = !callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES); + if (!isExplicit) { + return Optional.empty(); + } + if (!callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) { + return callContext.fail( + throwOnFailure, + "The 'produces_full_deletes' argument must be a constant BOOLEAN literal."); + } + final boolean producesFullDeletes = + callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES, Boolean.class).orElse(true); + if (!producesFullDeletes) { + return Optional.empty(); + } + // The check against the input changelog mode lives in the function constructor since + // TableSemantics#changelogMode() returns empty here at type-inference time. The mapping + // check below only needs the literal op_mapping argument, so it lives here. Only runs + // when the user explicitly set produces_full_deletes=true; the default true is not + // validated since it is a safe no-op for any input. + final Optional opMapping = callContext.getArgumentValue(ARG_OP_MAPPING, Map.class); + if (opMapping.isPresent() && !mapsDelete((Map) opMapping.get())) { + return callContext.fail( + throwOnFailure, + "Invalid 'produces_full_deletes' for TO_CHANGELOG: the active 'op_mapping' " + + "does not map DELETE rows, so no DELETE rows are emitted. Remove " + + "the 'produces_full_deletes' argument or add a DELETE entry to " + + "'op_mapping'."); + } + return Optional.empty(); + } + + /** + * Returns {@code true} when at least one {@code op_mapping} key references {@code DELETE}. Keys + * may be comma-separated (e.g., {@code "INSERT, DELETE"}) per the user-facing contract. + */ + private static boolean mapsDelete(final Map opMapping) { + for (final String key : opMapping.keySet()) { + for (final String rawName : key.split(",")) { + if (DELETE.equals(rawName.trim())) { + return true; + } + } + } + return false; + } + + private static List buildOutputFields( + final TableSemantics semantics, final String opColumnName) { + final List inputFields = DataType.getFields(semantics.dataType()); + final int[] outputIndices = ChangelogTypeStrategyUtils.computeOutputIndices(semantics); + final List outputFields = new ArrayList<>(); + outputFields.add(DataTypes.FIELD(opColumnName, DataTypes.STRING())); + Arrays.stream(outputIndices).mapToObj(inputFields::get).forEach(outputFields::add); + return outputFields; } private ToChangelogTypeStrategy() {} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/UpsertKeyUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/UpsertKeyUtils.java new file mode 100644 index 0000000000000..528a977d037e1 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/UpsertKeyUtils.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.annotation.Internal; + +import java.util.Comparator; +import java.util.List; + +/** Helpers for working with upsert key candidates. */ +@Internal +public final class UpsertKeyUtils { + + /** + * Comparator that orders upsert-key candidates deterministically and stably across releases: + * candidates with fewer columns come first; ties between equal-cardinality candidates are + * broken by the column indices in ascending order, leading column first. + */ + private static final Comparator SMALLEST_FIRST = + Comparator.comparingInt(a -> a.length) + .thenComparing( + (a, b) -> { + for (int i = 0; i < a.length; i++) { + final int cmp = Integer.compare(a[i], b[i]); + if (cmp != 0) { + return cmp; + } + } + return 0; + }); + + /** + * Picks the smallest upsert key from the given candidates using {@link #SMALLEST_FIRST}. + * Returns an empty array when the candidate list is empty. The returned reference is one of the + * input arrays; callers must not mutate it. + */ + public static int[] smallestKey(final List candidates) { + if (candidates.isEmpty()) { + return new int[0]; + } + int[] smallest = candidates.get(0); + for (int i = 1; i < candidates.size(); i++) { + if (SMALLEST_FIRST.compare(candidates.get(i), smallest) < 0) { + smallest = candidates.get(i); + } + } + return smallest; + } + + private UpsertKeyUtils() {} +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java new file mode 100644 index 0000000000000..6b5773949dbd0 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase; +import org.apache.flink.table.types.inference.utils.TableSemanticsMock; +import org.apache.flink.types.ColumnList; + +import java.util.Map; +import java.util.stream.Stream; + +import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TO_CHANGELOG_INPUT_TYPE_STRATEGY; +import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_OP; +import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_OP_MAPPING; +import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_PRODUCES_FULL_DELETES; +import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_TABLE; + +/** Tests for {@link ToChangelogTypeStrategy#INPUT_TYPE_STRATEGY}. */ +class ToChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase { + + private static final DataType TABLE_TYPE = + DataTypes.ROW( + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("score", DataTypes.BIGINT())); + + private static final DataType DESCRIPTOR_TYPE = DataTypes.DESCRIPTOR(); + + private static final DataType MAP_TYPE = DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()); + + private static final DataType BOOLEAN_TYPE = DataTypes.BOOLEAN(); + + @Override + protected Stream testData() { + return Stream.of( + // Valid: produces_full_deletes=true with default op_mapping (includes DELETE) + TestSpec.forStrategy( + "Valid produces_full_deletes=true with default mapping", + TO_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt(ARG_TABLE, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(ARG_OP, ColumnList.of("op")) + .calledWithLiteralAt(ARG_OP_MAPPING, null) + .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true) + .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE), + + // Valid: produces_full_deletes=true with op_mapping that includes DELETE + TestSpec.forStrategy( + "Valid produces_full_deletes=true with explicit DELETE mapping", + TO_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt(ARG_TABLE, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(ARG_OP, ColumnList.of("op")) + .calledWithLiteralAt(ARG_OP_MAPPING, Map.of("INSERT", "I", "DELETE", "D")) + .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true) + .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE), + + // Valid: produces_full_deletes=true with comma-separated DELETE key + TestSpec.forStrategy( + "Valid produces_full_deletes=true with comma-separated DELETE", + TO_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt(ARG_TABLE, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(ARG_OP, ColumnList.of("op")) + .calledWithLiteralAt(ARG_OP_MAPPING, Map.of("INSERT, DELETE", "X")) + .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true) + .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE), + + // Valid: produces_full_deletes=false with op_mapping that omits DELETE + TestSpec.forStrategy( + "Valid produces_full_deletes=false with no DELETE in mapping", + TO_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt(ARG_TABLE, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(ARG_OP, ColumnList.of("op")) + .calledWithLiteralAt(ARG_OP_MAPPING, Map.of("INSERT, UPDATE_AFTER", "X")) + .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, false) + .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE), + + // Error: produces_full_deletes=true with op_mapping that strips DELETE + TestSpec.forStrategy( + "produces_full_deletes=true rejected when op_mapping omits DELETE", + TO_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt(ARG_TABLE, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(ARG_OP, ColumnList.of("op")) + .calledWithLiteralAt(ARG_OP_MAPPING, Map.of("INSERT, UPDATE_AFTER", "X")) + .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true) + .expectErrorMessage( + "Invalid 'produces_full_deletes' for TO_CHANGELOG: the active " + + "'op_mapping' does not map DELETE rows"), + + // Error: multi-column descriptor for `op` + TestSpec.forStrategy( + "Descriptor with multiple columns", + TO_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt(ARG_TABLE, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(ARG_OP, ColumnList.of("a", "b")) + .expectErrorMessage("must contain exactly one column name"), + + // Error: invalid RowKind in op_mapping key + TestSpec.forStrategy( + "Invalid RowKind in mapping key", TO_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt(ARG_TABLE, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(ARG_OP, ColumnList.of("op")) + .calledWithLiteralAt(ARG_OP_MAPPING, Map.of("INVALID_KIND", "X")) + .expectErrorMessage("Unknown change operation: 'INVALID_KIND'"), + + // Error: duplicate RowKind across entries + TestSpec.forStrategy( + "Duplicate RowKind in mapping keys", + TO_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt(ARG_TABLE, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(ARG_OP, ColumnList.of("op")) + .calledWithLiteralAt( + ARG_OP_MAPPING, Map.of("INSERT, DELETE", "A", "DELETE", "B")) + .expectErrorMessage("Duplicate change operation: 'DELETE'")); + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java index fe881f8fd1fc2..9b9830870fd4e 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java @@ -24,6 +24,8 @@ import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; import java.util.Optional; /** Mock implementation of {@link TableSemantics} for testing purposes. */ @@ -35,6 +37,7 @@ public class TableSemanticsMock implements TableSemantics { private final SortDirection[] orderByDirections; private final int timeColumn; private final ChangelogMode changelogMode; + private final List upsertKeyColumns; public TableSemanticsMock(DataType dataType) { this(dataType, new int[0], new int[0], -1, null); @@ -46,6 +49,22 @@ public TableSemanticsMock( int[] orderByColumns, int timeColumn, @Nullable ChangelogMode changelogMode) { + this( + dataType, + partitionByColumns, + orderByColumns, + timeColumn, + changelogMode, + Collections.emptyList()); + } + + public TableSemanticsMock( + DataType dataType, + int[] partitionByColumns, + int[] orderByColumns, + int timeColumn, + @Nullable ChangelogMode changelogMode, + List upsertKeyColumns) { this.dataType = dataType; this.partitionByColumns = partitionByColumns; this.orderByColumns = orderByColumns; @@ -55,6 +74,7 @@ public TableSemanticsMock( } this.timeColumn = timeColumn; this.changelogMode = changelogMode; + this.upsertKeyColumns = upsertKeyColumns; } @Override @@ -86,4 +106,9 @@ public int timeColumn() { public Optional changelogMode() { return Optional.ofNullable(changelogMode); } + + @Override + public List upsertKeyColumns() { + return upsertKeyColumns; + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java index 2c393adf875ec..c38aa3eca85a5 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java @@ -335,7 +335,7 @@ public boolean hasScalarArgument(String name) { * scalar arguments through the same coercion path as validation. */ public CallContext toCallContext(RexCall call) { - return toCallContext(call, null, null, null); + return toCallContext(call, null, null, null, null); } /** @@ -348,6 +348,20 @@ public CallContext toCallContext( @Nullable List inputTimeColumns, @Nullable List inputChangelogModes, @Nullable ChangelogMode outputChangelogMode) { + return toCallContext( + call, inputTimeColumns, inputChangelogModes, outputChangelogMode, null); + } + + /** + * Variant that additionally exposes the call's input upsert keys. Used by the streaming codegen + * path so PTFs can specialize themselves on the input's row-identity information. + */ + public CallContext toCallContext( + RexCall call, + @Nullable List inputTimeColumns, + @Nullable List inputChangelogModes, + @Nullable ChangelogMode outputChangelogMode, + @Nullable List inputUpsertKeys) { return new OperatorBindingCallContext( dataTypeFactory, getDefinition(), @@ -355,7 +369,8 @@ public CallContext toCallContext( call.getType(), inputTimeColumns, inputChangelogModes, - outputChangelogMode); + outputChangelogMode, + inputUpsertKeys); } /** diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java index 065a30335458f..b86fe9b4175ac 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java @@ -50,6 +50,7 @@ import javax.annotation.Nullable; import java.util.AbstractList; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -312,6 +313,11 @@ public int timeColumn() { public Optional changelogMode() { return Optional.empty(); } + + @Override + public List upsertKeyColumns() { + return Collections.emptyList(); + } } // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java index f31406dad196d..2127fd47814b9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java @@ -64,13 +64,14 @@ public final class OperatorBindingCallContext extends AbstractSqlCallContext { private final @Nullable List inputTimeColumns; private final @Nullable List inputChangelogModes; private final @Nullable ChangelogMode outputChangelogMode; + private final @Nullable List inputUpsertKeys; public OperatorBindingCallContext( DataTypeFactory dataTypeFactory, FunctionDefinition definition, SqlOperatorBinding binding, RelDataType returnRelDataType) { - this(dataTypeFactory, definition, binding, returnRelDataType, null, null, null); + this(dataTypeFactory, definition, binding, returnRelDataType, null, null, null, null); } public OperatorBindingCallContext( @@ -81,6 +82,26 @@ public OperatorBindingCallContext( @Nullable List inputTimeColumns, @Nullable List inputChangelogModes, @Nullable ChangelogMode outputChangelogMode) { + this( + dataTypeFactory, + definition, + binding, + returnRelDataType, + inputTimeColumns, + inputChangelogModes, + outputChangelogMode, + null); + } + + public OperatorBindingCallContext( + DataTypeFactory dataTypeFactory, + FunctionDefinition definition, + SqlOperatorBinding binding, + RelDataType returnRelDataType, + @Nullable List inputTimeColumns, + @Nullable List inputChangelogModes, + @Nullable ChangelogMode outputChangelogMode, + @Nullable List inputUpsertKeys) { super( dataTypeFactory, definition, @@ -109,6 +130,7 @@ public int size() { this.inputTimeColumns = inputTimeColumns; this.inputChangelogModes = inputChangelogModes; this.outputChangelogMode = outputChangelogMode; + this.inputUpsertKeys = inputUpsertKeys; } @Override @@ -173,13 +195,15 @@ public Optional getTableSemantics(int pos) { Optional.ofNullable(inputChangelogModes) .map(m -> m.get(tableArgCall.getInputIndex())) .orElse(null); + final List upsertKeys = inputUpsertKeys != null ? inputUpsertKeys : List.of(); return Optional.of( OperatorBindingTableSemantics.create( argumentDataTypes.get(pos), staticArg, tableArgCall, timeColumn, - changelogMode)); + changelogMode, + upsertKeys)); } @Override @@ -283,20 +307,23 @@ private static class OperatorBindingTableSemantics implements TableSemantics { private final SortDirection[] orderByDirections; private final int timeColumn; private final @Nullable ChangelogMode changelogMode; + private final List upsertKeyColumns; public static OperatorBindingTableSemantics create( DataType tableDataType, StaticArgument staticArg, RexTableArgCall tableArgCall, int timeColumn, - @Nullable ChangelogMode changelogMode) { + @Nullable ChangelogMode changelogMode, + List upsertKeyColumns) { return new OperatorBindingTableSemantics( createDataType(tableDataType, staticArg), tableArgCall.getPartitionKeys(), tableArgCall.getOrderKeys(), RexTableArgCall.toSortDirections(tableArgCall.getSortOrder()), timeColumn, - changelogMode); + changelogMode, + upsertKeyColumns); } private OperatorBindingTableSemantics( @@ -305,13 +332,15 @@ private OperatorBindingTableSemantics( int[] orderByColumns, SortDirection[] orderByDirections, int timeColumn, - @Nullable ChangelogMode changelogMode) { + @Nullable ChangelogMode changelogMode, + List upsertKeyColumns) { this.dataType = dataType; this.partitionByColumns = partitionByColumns; this.orderByColumns = orderByColumns; this.orderByDirections = orderByDirections; this.timeColumn = timeColumn; this.changelogMode = changelogMode; + this.upsertKeyColumns = upsertKeyColumns; } private static DataType createDataType(DataType tableDataType, StaticArgument staticArg) { @@ -353,5 +382,10 @@ public int timeColumn() { public Optional changelogMode() { return Optional.ofNullable(changelogMode); } + + @Override + public List upsertKeyColumns() { + return upsertKeyColumns; + } } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java index 3973329af7484..5c5e9375c35ac 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java @@ -69,6 +69,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.calcite.linq4j.Ord; @@ -108,6 +109,7 @@ public class StreamExecProcessTableFunction extends ExecNodeBase public static final String FIELD_NAME_FUNCTION_CALL = "functionCall"; public static final String FIELD_NAME_INPUT_CHANGELOG_MODES = "inputChangelogModes"; public static final String FIELD_NAME_OUTPUT_CHANGELOG_MODE = "outputChangelogMode"; + public static final String FIELD_NAME_INPUT_UPSERT_KEYS = "inputUpsertKeys"; @JsonProperty(FIELD_NAME_UID) private final @Nullable String uid; @@ -121,6 +123,10 @@ public class StreamExecProcessTableFunction extends ExecNodeBase @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) private final ChangelogMode outputChangelogMode; + @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS) + @JsonInclude(JsonInclude.Include.NON_EMPTY) + private final List inputUpsertKeys; + public StreamExecProcessTableFunction( ReadableConfig tableConfig, List inputProperties, @@ -129,7 +135,8 @@ public StreamExecProcessTableFunction( @Nullable String uid, RexCall invocation, List inputChangelogModes, - ChangelogMode outputChangelogMode) { + ChangelogMode outputChangelogMode, + List inputUpsertKeys) { this( ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecProcessTableFunction.class), @@ -141,7 +148,8 @@ public StreamExecProcessTableFunction( uid, invocation, inputChangelogModes, - outputChangelogMode); + outputChangelogMode, + inputUpsertKeys); } @JsonCreator @@ -155,7 +163,8 @@ public StreamExecProcessTableFunction( @JsonProperty(FIELD_NAME_UID) @Nullable String uid, @JsonProperty(FIELD_NAME_FUNCTION_CALL) RexNode invocation, @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODES) List inputChangelogModes, - @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) ChangelogMode outputChangelogMode) { + @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) ChangelogMode outputChangelogMode, + @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS) @Nullable List inputUpsertKeys) { super(id, context, persistedConfig, inputProperties, outputType, description); this.uid = uid; // Mirror the FlinkLogicalTableFunctionScan converter for the compiled-plan restore path: @@ -164,6 +173,7 @@ public StreamExecProcessTableFunction( this.invocation = BridgingSqlFunction.resolveCallTraits((RexCall) invocation); this.inputChangelogModes = inputChangelogModes; this.outputChangelogMode = outputChangelogMode; + this.inputUpsertKeys = inputUpsertKeys != null ? inputUpsertKeys : List.of(); } public @Nullable String getUid() { @@ -202,7 +212,12 @@ protected Transformation translateToPlanInternal( final RexCall udfCall = StreamPhysicalProcessTableFunction.toUdfCall(invocation); final GeneratedRunnerResult generated = ProcessTableRunnerGenerator.generate( - ctx, udfCall, inputTimeColumns, inputChangelogModes, outputChangelogMode); + ctx, + udfCall, + inputTimeColumns, + inputChangelogModes, + outputChangelogMode, + inputUpsertKeys); final GeneratedProcessTableRunner generatedRunner = generated.runner(); final LinkedHashMap stateInfos = generated.stateInfos(); @@ -310,9 +325,12 @@ private RuntimeTableSemantics createRuntimeTableSemantics( final int timeColumn = inputTimeColumns.get(tableArgCall.getInputIndex()); + final int inputIndex = tableArgCall.getInputIndex(); + final List upsertKeys = + inputIndex < inputUpsertKeys.size() ? inputUpsertKeys : List.of(); return new RuntimeTableSemantics( tableArg.getName(), - tableArgCall.getInputIndex(), + inputIndex, dataType, tableArgCall.getPartitionKeys(), tableArgCall.getOrderKeys(), @@ -320,7 +338,8 @@ private RuntimeTableSemantics createRuntimeTableSemantics( consumedChangelogMode, tableArg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH), tableArg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE), - timeColumn); + timeColumn, + upsertKeys); } private Transformation createKeyedTransformation( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java index 5ccecf18e71be..9de4ab1871d6c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java @@ -27,6 +27,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.calcite.RexTableArgCall; import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; +import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction; @@ -62,6 +63,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -165,6 +167,21 @@ public ExecNode translateToExecNode() { verifyTimeAttributes(getInputs(), call, inputChangelogModes, outputChangelogMode); final List> providedInputArgs = getProvidedInputArgs(call); verifyPassThroughColumnsForUpdates(providedInputArgs, outputChangelogMode); + final FlinkRelMetadataQuery fmq = + FlinkRelMetadataQuery.reuseOrCreate(getCluster().getMetadataQuery()); + final List inputUpsertKeys = + getInputs().stream() + .map( + input -> { + final Set keys = fmq.getUpsertKeys(input); + return keys == null + ? Collections.emptyList() + : keys.stream() + .map(ImmutableBitSet::toArray) + .collect(Collectors.toList()); + }) + .flatMap(List::stream) + .collect(Collectors.toList()); return new StreamExecProcessTableFunction( unwrapTableConfig(this), getInputs().stream().map(i -> InputProperty.DEFAULT).collect(Collectors.toList()), @@ -173,7 +190,8 @@ public ExecNode translateToExecNode() { uid, call, inputChangelogModes, - outputChangelogMode); + outputChangelogMode, + inputUpsertKeys); } @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java index cc301118b29f1..af99d92e9ed3b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java @@ -18,13 +18,17 @@ package org.apache.flink.table.planner.plan.utils; +import org.apache.flink.table.utils.UpsertKeyUtils; + import org.apache.calcite.util.ImmutableBitSet; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; /** * Utility for upsertKey which represented as a Set of {@link @@ -55,21 +59,8 @@ public static Optional smallestKey(@Nullable Set upsertK if (null == upsertKeys || upsertKeys.isEmpty()) { return Optional.empty(); } - return upsertKeys.stream() - .map(ImmutableBitSet::toArray) - .reduce( - (k1, k2) -> { - if (k1.length < k2.length) { - return k1; - } - if (k1.length == k2.length) { - for (int index = 0; index < k1.length; index++) { - if (k1[index] < k2[index]) { - return k1; - } - } - } - return k2; - }); + final List asArrays = + upsertKeys.stream().map(ImmutableBitSet::toArray).collect(Collectors.toList()); + return Optional.of(UpsertKeyUtils.smallestKey(asArrays)); } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala index 52df803d5c8f8..402157aa0fdb6 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala @@ -65,7 +65,8 @@ object ProcessTableRunnerGenerator { udfCall: RexCall, inputTimeColumns: java.util.List[Integer], inputChangelogModes: java.util.List[ChangelogMode], - outputChangelogMode: ChangelogMode): GeneratedRunnerResult = { + outputChangelogMode: ChangelogMode, + inputUpsertKeys: java.util.List[Array[Int]]): GeneratedRunnerResult = { val function: BridgingSqlFunction = udfCall.getOperator.asInstanceOf[BridgingSqlFunction] val definition: FunctionDefinition = function.getDefinition val dataTypeFactory = function.getDataTypeFactory @@ -77,7 +78,12 @@ object ProcessTableRunnerGenerator { // Thus, functions can reconfigure themselves for the exact use case. // Including updating their state layout. val callContext = - function.toCallContext(udfCall, inputTimeColumns, inputChangelogModes, outputChangelogMode) + function.toCallContext( + udfCall, + inputTimeColumns, + inputChangelogModes, + outputChangelogMode, + inputUpsertKeys) // Create the final UDF for runtime val udf = UserDefinedFunctionHelper.createSpecializedFunction( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java index bfffa8bd67d04..41c539790679a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java @@ -55,6 +55,13 @@ public List programs() { ToChangelogTestPrograms.INVALID_DESCRIPTOR, ToChangelogTestPrograms.INVALID_OP_MAPPING, ToChangelogTestPrograms.OP_MAPPING_REFERENCES_UNSUPPORTED_KIND, - ToChangelogTestPrograms.DUPLICATE_ROW_KIND); + ToChangelogTestPrograms.DUPLICATE_ROW_KIND, + ToChangelogTestPrograms.INVALID_PRODUCES_FULL_DELETES_FOR_APPEND_ONLY, + ToChangelogTestPrograms.RETRACT_PRODUCES_PARTIAL_DELETES, + ToChangelogTestPrograms.UPSERT_PRODUCES_FULL_DELETES, + ToChangelogTestPrograms.UPSERT_PRODUCES_PARTIAL_DELETES, + ToChangelogTestPrograms.RETRACT_PARTITION_BY_PRODUCES_PARTIAL_DELETES, + ToChangelogTestPrograms.RETRACT_PARTITION_BY_PRODUCES_FULL_DELETES, + ToChangelogTestPrograms.UPSERT_PARTITION_BY_PRODUCES_FULL_DELETES); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java index 250ad1f1d8fbe..a4ba781a8ef5d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java @@ -157,7 +157,7 @@ public class ToChangelogTestPrograms { public static final TableTestProgram UPSERT = TableTestProgram.of( "to-changelog-upsert-input", - "upsert input gets ChangelogNormalize for UPDATE_BEFORE and full deletes") + "upsert input in row semantics gets ChangelogNormalize for UPDATE_BEFORE and emits partial deletes") .setupTableSource( SourceTestStep.newBuilder("t") .addSchema( @@ -598,4 +598,192 @@ public class ToChangelogTestPrograms { ValidationException.class, "Duplicate change operation: 'DELETE'") .build(); + + public static final TableTestProgram INVALID_PRODUCES_FULL_DELETES_FOR_APPEND_ONLY = + TableTestProgram.of( + "to-changelog-invalid-produces-full-deletes-for-append-only", + "fails when produces_full_deletes=true on an input that never emits DELETE rows") + .setupTableSource(SIMPLE_SOURCE) + .runFailingSql( + "SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t, " + + "produces_full_deletes => true)", + ValidationException.class, + "the input table only produces [INSERT] and never emits DELETE rows") + .build(); + + // -------------------------------------------------------------------------------------------- + // Full vs partial deletes matrix (input kind x PARTITION BY x produces_full_deletes) + // -------------------------------------------------------------------------------------------- + + public static final TableTestProgram RETRACT_PRODUCES_PARTIAL_DELETES = + TableTestProgram.of( + "to-changelog-retract-produces-partial-deletes", + "retract input in row semantics with produces_full_deletes=false: skips ChangelogNormalize and the partial DELETE row from the input passes through unchanged") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.all()) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 10L), + Row.ofKind(RowKind.INSERT, "Bob", 20L), + Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 10L), + Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 30L), + Row.ofKind(RowKind.DELETE, "Bob", null)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("op STRING", "name STRING", "score BIGINT") + .consumedValues( + "+I[INSERT, Alice, 10]", + "+I[INSERT, Bob, 20]", + "+I[UPDATE_BEFORE, Alice, 10]", + "+I[UPDATE_AFTER, Alice, 30]", + "+I[DELETE, Bob, null]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t, " + + "produces_full_deletes => false)") + .build(); + + public static final TableTestProgram UPSERT_PRODUCES_FULL_DELETES = + TableTestProgram.of( + "to-changelog-upsert-produces-full-deletes", + "upsert input in row semantics with produces_full_deletes=true: ChangelogNormalize materializes the full DELETE row from a key-only delete") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.upsert()) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 10L), + // Key-only delete: ChangelogNormalize fills the row. + Row.ofKind(RowKind.DELETE, "Alice", null)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("op STRING", "name STRING", "score BIGINT") + .consumedValues( + "+I[INSERT, Alice, 10]", "+I[DELETE, Alice, 10]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t, " + + "produces_full_deletes => true)") + .build(); + + public static final TableTestProgram UPSERT_PRODUCES_PARTIAL_DELETES = + TableTestProgram.of( + "to-changelog-upsert-produces-partial-deletes", + "upsert input in row semantics with single-column upsert key + " + + "produces_full_deletes=false: DELETE preserves the upsert key " + + "column and nulls the rest without requiring PARTITION BY") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.upsert()) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 10L), + Row.ofKind(RowKind.DELETE, "Alice", 10L)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("op STRING", "name STRING", "score BIGINT") + .consumedValues( + "+I[INSERT, Alice, 10]", "+I[DELETE, Alice, null]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t, " + + "produces_full_deletes => false)") + .build(); + + public static final TableTestProgram RETRACT_PARTITION_BY_PRODUCES_PARTIAL_DELETES = + TableTestProgram.of( + "to-changelog-retract-partition-by-produces-partial-deletes", + "retract input in set semantics with produces_full_deletes=false: nulls non-partition-key columns on DELETE even when the input row is fully populated") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.all()) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 10L), + Row.ofKind(RowKind.INSERT, "Bob", 20L), + Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 30L), + Row.ofKind(RowKind.DELETE, "Bob", 20L)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("name STRING", "op STRING", "score BIGINT") + .consumedValues( + "+I[Alice, INSERT, 10]", + "+I[Bob, INSERT, 20]", + "+I[Alice, UPDATE_AFTER, 30]", + "+I[Bob, DELETE, null]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t PARTITION BY name," + + "produces_full_deletes => false)") + .build(); + + public static final TableTestProgram RETRACT_PARTITION_BY_PRODUCES_FULL_DELETES = + TableTestProgram.of( + "to-changelog-retract-partition-by-produces-full-deletes", + "retract input in set semantics with produces_full_deletes=true (default): the input row passes through unchanged, full DELETE pre-image reaches the output") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.all()) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 10L), + Row.ofKind(RowKind.DELETE, "Alice", 10L)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("name STRING", "op STRING", "score BIGINT") + .consumedValues( + "+I[Alice, INSERT, 10]", "+I[Alice, DELETE, 10]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t PARTITION BY name, " + + "produces_full_deletes => true)") + .build(); + + public static final TableTestProgram UPSERT_PARTITION_BY_PRODUCES_FULL_DELETES = + TableTestProgram.of( + "to-changelog-upsert-partition-by-produces-full-deletes", + "upsert input in set semantics with produces_full_deletes=true: ChangelogNormalize materializes the full DELETE row from a key-only delete") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.upsert()) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 10L), + Row.ofKind(RowKind.INSERT, "Bob", 20L), + Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 30L), + Row.ofKind(RowKind.DELETE, "Bob", null)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("name STRING", "op STRING", "score BIGINT") + .consumedValues( + "+I[Alice, INSERT, 10]", + "+I[Bob, INSERT, 20]", + "+I[Alice, UPDATE_BEFORE, 10]", + "+I[Alice, UPDATE_AFTER, 30]", + "+I[Bob, DELETE, 20]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t PARTITION BY name, " + + "produces_full_deletes => true)") + .build(); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java index 7f6f93849c282..1f7347b027e7f 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java @@ -144,4 +144,42 @@ void testUpsertPartitionByNoUpdateBeforeAndDelete() { + "op_mapping => MAP['INSERT,UPDATE_AFTER', 'C'])", CHANGELOG_MODE); } + + @Test + void testUpsertSourceProducesFullDeletes() { + util.tableEnv() + .executeSql( + "CREATE TABLE upsert_source (" + + " id INT," + + " name STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'changelog-mode' = 'I,UA,D'" + + ")"); + util.verifyRelPlan( + "SELECT * FROM TO_CHANGELOG(" + + "input => TABLE upsert_source, " + + "produces_full_deletes => true)", + CHANGELOG_MODE); + } + + @Test + void testUpsertSourceKeyOnlyDeletes() { + util.tableEnv() + .executeSql( + "CREATE TABLE upsert_source (" + + " id INT," + + " name STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'changelog-mode' = 'I,UA,D'" + + ")"); + util.verifyRelPlan( + "SELECT * FROM TO_CHANGELOG(" + + "input => TABLE upsert_source, " + + "produces_full_deletes => false)", + CHANGELOG_MODE); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml index 7cea1058e5c82..174addbf41890 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml @@ -23,14 +23,14 @@ limitations under the License. @@ -42,14 +42,14 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), D @@ -62,14 +62,14 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DEFAUL @@ -81,14 +81,14 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), D @@ -101,14 +101,14 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DESCRI @@ -121,14 +121,56 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DESCRI + + + + + TABLE upsert_source, produces_full_deletes => false)]]> + + + + + + + + + + + TABLE upsert_source, produces_full_deletes => true)]]> + + + + + + ", - "description" : "ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[op,name,score], rowType=[RecordType(VARCHAR(2147483647) op, VARCHAR(2147483647) name, BIGINT score)])", + "description" : "ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[op,name,score], rowType=[RecordType(VARCHAR(2147483647) op, VARCHAR(2147483647) name, BIGINT score)])", "uid" : null, "functionCall" : { "kind" : "CALL", @@ -59,6 +59,11 @@ "syntax" : "SPECIAL", "internalName" : "$DEFAULT$1", "type" : "MAP" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$DEFAULT$1", + "type" : "BOOLEAN" }, { "kind" : "CALL", "syntax" : "SPECIAL", @@ -73,7 +78,8 @@ "type" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647) NOT NULL, `score` BIGINT> NOT NULL" }, "inputChangelogModes" : [ [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ] ], - "outputChangelogMode" : [ "INSERT" ] + "outputChangelogMode" : [ "INSERT" ], + "inputUpsertKeys" : [ [ 0 ] ] }, { "id" : 3, "type" : "stream-exec-sink_2", diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java index 0b6c6ca55b88e..66162e205b9ab 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java @@ -33,21 +33,31 @@ import org.apache.flink.table.functions.TableSemantics; import org.apache.flink.table.types.inference.CallContext; import org.apache.flink.table.types.inference.strategies.ChangelogTypeStrategyUtils; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.utils.UpsertKeyUtils; import org.apache.flink.types.ColumnList; import org.apache.flink.types.RowKind; import javax.annotation.Nullable; import java.util.EnumMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; +import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_OP_MAPPING; +import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_PRODUCES_FULL_DELETES; +import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_TABLE; + /** * Runtime implementation of {@link BuiltInFunctionDefinitions#TO_CHANGELOG}. * - *

Converts each input row into an INSERT-only output row with an operation code column. The - * output schema is {@code [op_column, ...all_input_columns...]}. + *

Converts each input row into an INSERT-only output row with an operation code column. Output + * schema is {@code [op_column, ...projected_input_columns...]}. Partition columns are prepended by + * the framework outside this function and are not part of the projection. * *

Uses {@link JoinedRowData} to combine the op column with the full input row. */ @@ -65,26 +75,55 @@ public class ToChangelogFunction extends BuiltInProcessTableFunction { private final Map rawOpMap; private final int[] outputIndices; + private final RowType inputRowType; + private final boolean producesFullDelete; + private final boolean[] upsertKeyColumn; private transient Map opMap; private transient GenericRowData opRow; private transient JoinedRowData output; private transient ProjectedRowData projectedOutput; + private transient GenericRowData partialDeletePayload; + private transient RowData.FieldGetter[] preservedFieldGetters; @SuppressWarnings("unchecked") public ToChangelogFunction(final SpecializedContext context) { super(BuiltInFunctionDefinitions.TO_CHANGELOG, context); final CallContext callContext = context.getCallContext(); // Table argument is guaranteed by the type strategy's validation phase. - final TableSemantics tableSemantics = callContext.getTableSemantics(0).get(); + final TableSemantics tableSemantics = callContext.getTableSemantics(ARG_TABLE).get(); final Map opMapping = - callContext.getArgumentValue(2, Map.class).orElse(null); + callContext.getArgumentValue(ARG_OP_MAPPING, Map.class).orElse(null); this.rawOpMap = buildOpMap(opMapping); if (opMapping != null) { validateOpMap(this.rawOpMap, tableSemantics); } + final boolean producesFullDeletesArg = + callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES, Boolean.class).orElse(true); + final boolean isExplicit = !callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES); + validateProducesFullDeletes(producesFullDeletesArg, isExplicit, tableSemantics); + this.outputIndices = ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics); + this.inputRowType = (RowType) tableSemantics.dataType().getLogicalType(); + this.producesFullDelete = producesFullDeletesArg; + this.upsertKeyColumn = + computeUpsertKeyColumn( + this.outputIndices, + UpsertKeyUtils.smallestKey(tableSemantics.upsertKeyColumns())); + } + + private static boolean[] computeUpsertKeyColumn( + final int[] outputIndices, final int[] upsertKey) { + final Set keepInputIndices = new HashSet<>(); + for (final int key : upsertKey) { + keepInputIndices.add(key); + } + final boolean[] mask = new boolean[outputIndices.length]; + for (int i = 0; i < outputIndices.length; i++) { + mask[i] = keepInputIndices.contains(outputIndices[i]); + } + return mask; } @Override @@ -95,6 +134,16 @@ public void open(final FunctionContext context) throws Exception { opRow = new GenericRowData(1); output = new JoinedRowData(); projectedOutput = ProjectedRowData.from(outputIndices); + partialDeletePayload = new GenericRowData(outputIndices.length); + preservedFieldGetters = new RowData.FieldGetter[outputIndices.length]; + final List inputFieldTypes = inputRowType.getChildren(); + for (int i = 0; i < outputIndices.length; i++) { + if (upsertKeyColumn[i]) { + preservedFieldGetters[i] = + RowData.createFieldGetter( + inputFieldTypes.get(outputIndices[i]), outputIndices[i]); + } + } } /** @@ -145,17 +194,86 @@ private static void validateOpMap( } } + /** + * Validates an explicit {@code produces_full_deletes} argument against the input. + * + *

For {@code produces_full_deletes=true}, the input changelog must emit DELETE rows; + * otherwise the parameter is dead. For {@code produces_full_deletes=false}, the input must + * declare an upsert key or the call must use {@code PARTITION BY}; otherwise the function has + * no identifying columns to preserve when nulling the rest. + * + *

No validation runs when the argument is absent, since the default (full deletes) is safe + * for any input. + * + *

Lives here rather than in the input type strategy because {@link + * TableSemantics#changelogMode()} and {@link TableSemantics#upsertKeyColumns()} are only + * populated at specialization time. + */ + private static void validateProducesFullDeletes( + final boolean producesFullDeletesArg, + final boolean isExplicit, + final TableSemantics tableSemantics) { + if (!isExplicit) { + return; + } + if (producesFullDeletesArg) { + final ChangelogMode inputMode = tableSemantics.changelogMode().orElse(null); + if (inputMode != null && !inputMode.contains(RowKind.DELETE)) { + throw new ValidationException( + String.format( + "Invalid 'produces_full_deletes' for TO_CHANGELOG: the input " + + "table only produces %s and never emits DELETE rows. " + + "Remove the 'produces_full_deletes' argument.", + inputMode.getContainedKinds())); + } + return; + } + final boolean hasPartitionBy = tableSemantics.partitionByColumns().length > 0; + final boolean hasUpsertKey = !tableSemantics.upsertKeyColumns().isEmpty(); + if (!hasPartitionBy && !hasUpsertKey) { + throw new ValidationException( + "Invalid 'produces_full_deletes=false' for TO_CHANGELOG: the input has no " + + "upsert key and the call has no PARTITION BY, so the function has " + + "no identifying columns to preserve on DELETE rows. Remove the " + + "argument (the default emits full DELETE rows) or add a " + + "PARTITION BY."); + } + } + public void eval( final Context ctx, final RowData input, @Nullable final ColumnList op, - @Nullable final MapData opMapping) { + @Nullable final MapData opMapping, + @Nullable final Boolean producesFullDeletes) { final StringData opCode = opMap.get(input.getRowKind()); if (opCode == null) { return; } opRow.setField(0, opCode); - collect(output.replace(opRow, projectedOutput.replaceRow(input))); + final RowData payload; + if (input.getRowKind() == RowKind.DELETE && !producesFullDelete) { + payload = buildPartialDeletePayload(input); + } else { + payload = projectedOutput.replaceRow(input); + } + collect(output.replace(opRow, payload)); + } + + /** + * Builds the payload for a partial DELETE row: upsert-key columns are copied from the input, + * all other columns are emitted as {@code null}. Partition-key columns are not included here + * since the framework prepends them outside the function's projected output. + */ + private RowData buildPartialDeletePayload(final RowData input) { + for (int i = 0; i < outputIndices.length; i++) { + if (upsertKeyColumn[i]) { + partialDeletePayload.setField(i, preservedFieldGetters[i].getFieldOrNull(input)); + } else { + partialDeletePayload.setField(i, null); + } + } + return partialDeletePayload; } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java index cabab4c613143..7f0fbf0c3d382 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java @@ -24,6 +24,7 @@ import org.apache.flink.table.types.DataType; import java.io.Serializable; +import java.util.List; import java.util.Optional; /** @@ -44,6 +45,7 @@ public class RuntimeTableSemantics implements TableSemantics, Serializable { private final boolean passColumnsThrough; private final boolean hasSetSemantics; private final int timeColumn; + private final List upsertKeyColumns; private transient ChangelogMode changelogMode; @@ -57,7 +59,8 @@ public RuntimeTableSemantics( RuntimeChangelogMode consumedChangelogMode, boolean passColumnsThrough, boolean hasSetSemantics, - int timeColumn) { + int timeColumn, + List upsertKeyColumns) { this.argName = argName; this.inputIndex = inputIndex; this.dataType = dataType; @@ -68,6 +71,7 @@ public RuntimeTableSemantics( this.passColumnsThrough = passColumnsThrough; this.hasSetSemantics = hasSetSemantics; this.timeColumn = timeColumn; + this.upsertKeyColumns = upsertKeyColumns; } public String getArgName() { @@ -122,4 +126,9 @@ public int timeColumn() { public Optional changelogMode() { return Optional.of(getChangelogMode()); } + + @Override + public List upsertKeyColumns() { + return upsertKeyColumns; + } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java index be390ab5f5557..38a0df4811168 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java @@ -246,7 +246,8 @@ private static RuntimeTableSemantics tableSemantics() { RuntimeChangelogMode.serialize(ChangelogMode.insertOnly()), /* passColumnsThrough */ false, /* hasSetSemantics */ true, - /* timeColumn */ 1); + /* timeColumn */ 1, + /* upsertKeyColumns */ new int[0]); } // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java index fadf21d7dd942..91edd9a059d69 100644 --- a/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java +++ b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java @@ -23,6 +23,8 @@ import org.apache.flink.table.functions.TableSemantics; import org.apache.flink.table.types.DataType; +import java.util.Collections; +import java.util.List; import java.util.Optional; /** {@link TableSemantics} implementation for {@link ProcessTableFunctionTestHarness}. */ @@ -30,10 +32,17 @@ class TestHarnessTableSemantics implements TableSemantics { private final DataType dataType; private final int[] partitionByColumns; + private final List upsertKeyColumns; TestHarnessTableSemantics(DataType dataType, int[] partitionByColumns) { + this(dataType, partitionByColumns, Collections.emptyList()); + } + + TestHarnessTableSemantics( + DataType dataType, int[] partitionByColumns, List upsertKeyColumns) { this.dataType = dataType; this.partitionByColumns = partitionByColumns; + this.upsertKeyColumns = upsertKeyColumns; } @Override @@ -65,4 +74,9 @@ public int timeColumn() { public Optional changelogMode() { return Optional.empty(); } + + @Override + public List upsertKeyColumns() { + return upsertKeyColumns; + } }