-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-39636][table] Add produces_full_deletes parameter in TO_CHANGELOG #28235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
e4ca3b3
e08cdd2
0dfb78c
ed6fd23
06dbddc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -296,17 +296,19 @@ This is useful when you need to materialize changelog events into a downstream s | |||||
| SELECT * FROM TO_CHANGELOG( | ||||||
| input => TABLE source_table [PARTITION BY key_col], | ||||||
| [op => DESCRIPTOR(op_column_name),] | ||||||
| [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]] | ||||||
| [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...],] | ||||||
| [produces_full_deletes => BOOLEAN] | ||||||
| ) | ||||||
| ``` | ||||||
|
|
||||||
| ### Parameters | ||||||
|
|
||||||
| | Parameter | Required | Description | | ||||||
| |:-------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | ||||||
| | `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located and run in the same operator instance. Without `PARTITION BY`, each row is processed independently. Accepts insert-only, retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key should match or be a subset of the upsert key of the subquery. | | ||||||
| | `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. | | ||||||
| | `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. | | ||||||
| | Parameter | Required | Description | | ||||||
| |:------------------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | ||||||
| | `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located and run in the same operator instance. Without `PARTITION BY`, each row is processed independently. Accepts insert-only, retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key should match or be a subset of the upsert key of the subquery. | | ||||||
| | `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. | | ||||||
| | `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. | | ||||||
| | `produces_full_deletes` | No | A `BOOLEAN` literal that controls how DELETE rows are emitted. When `true` (default), the function requires fully-populated DELETE rows from the input. The planner inserts a `ChangelogNormalize` operator for upsert sources that emit key-only deletes, so downstream sees the full pre-image on DELETE. When `false`, the function instead emits partial DELETE rows: in row semantics it preserves the planner-derived upsert key columns of the input and nulls the rest; in set semantics (`PARTITION BY`) it preserves the partition key and nulls the rest. Requires that the input declares an upsert key or that the call uses `PARTITION BY`; otherwise the function has no identifying columns to preserve and the call is rejected. | | ||||||
|
|
||||||
| #### Default op_mapping | ||||||
|
|
||||||
|
|
@@ -397,6 +399,76 @@ SELECT * FROM TO_CHANGELOG( | |||||
| -- UPDATE_BEFORE is dropped (not in the mapping) | ||||||
| ``` | ||||||
|
|
||||||
| #### Delete handling | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice documentation!
Suggested change
|
||||||
|
|
||||||
| The `produces_full_deletes` argument controls how DELETE rows are emitted and what the planner requires from the input. The matrix below shows each combination with `PARTITION BY` (set semantics) and without (row semantics). | ||||||
|
|
||||||
| ##### `produces_full_deletes => true` (default) | ||||||
|
|
||||||
| The planner requires fully-populated DELETE rows on the input. For upsert sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted upstream to materialize the full pre-image from state. For sources that already emit a full pre-image (e.g. retract), the flag is a no-op. The function then passes the input row through unchanged on DELETE. | ||||||
|
|
||||||
| **Row semantics** (no `PARTITION BY`): | ||||||
|
|
||||||
| ```sql | ||||||
| -- Upsert source: -D[id:5] (key-only). | ||||||
| -- ChangelogNormalize materializes the full pre-image from state. | ||||||
| -- Output: +I[op:'DELETE', id:5, name:'Alice'] | ||||||
| SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source) | ||||||
|
|
||||||
| -- Retract source: -D[id:5, name:'Alice'] (full pre-image). | ||||||
| -- No ChangelogNormalize inserted; the input row is passed through unchanged. | ||||||
| -- Output: +I[op:'DELETE', id:5, name:'Alice'] | ||||||
| SELECT * FROM TO_CHANGELOG(input => TABLE retract_source) | ||||||
| ``` | ||||||
|
|
||||||
| **Set semantics** (`PARTITION BY`): | ||||||
|
|
||||||
| ```sql | ||||||
| -- Upsert source: -D[id:5] (key-only). | ||||||
| -- ChangelogNormalize materializes the full pre-image from state. | ||||||
| -- Output: +I[id:5, op:'DELETE', name:'Alice'] | ||||||
| SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id) | ||||||
|
|
||||||
| -- Retract source: -D[id:5, name:'Alice'] (full pre-image). | ||||||
| -- No ChangelogNormalize inserted; the input row is passed through unchanged. | ||||||
| -- Output: +I[id:5, op:'DELETE', name:'Alice'] | ||||||
| SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id) | ||||||
| ``` | ||||||
|
|
||||||
| ##### `produces_full_deletes => false` | ||||||
|
|
||||||
| The planner skips `ChangelogNormalize` and the function emits partial DELETE rows. This avoids the stateful normalization operator for upsert sources (e.g. Kafka compacted topics) where the full pre-image is not needed downstream. Requires an upsert key (row semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected with a validation error. | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we don't have a good explanation of the upsert key in the docs. It'd be good to have it and then link it in the places where we mention it for the users |
||||||
|
|
||||||
| **Row semantics** (no `PARTITION BY`): the function preserves the planner-derived upsert key columns on DELETE rows and nulls the rest. The upsert key is typically a declared `PRIMARY KEY`. | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
|
||||||
| ```sql | ||||||
| -- Upsert source with PRIMARY KEY (id): -D[id:5] (key-only). | ||||||
| -- Output: +I[op:'DELETE', id:5, name:null] | ||||||
| SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source, produces_full_deletes => false) | ||||||
|
|
||||||
| -- Retract source with PRIMARY KEY (id): -D[id:5, name:'Alice'] (full pre-image). | ||||||
| -- Output: +I[op:'DELETE', id:5, name:null] | ||||||
| SELECT * FROM TO_CHANGELOG(input => TABLE retract_source, produces_full_deletes => false) | ||||||
| ``` | ||||||
|
|
||||||
| **Set semantics** (`PARTITION BY`): the function preserves the partition key and nulls every non-partition-key column on DELETE rows. This matches the shape expected by upsert sinks and Kafka compacted topics. | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
|
||||||
| ```sql | ||||||
| -- Upsert source: -D[id:5] (key-only). | ||||||
| -- Output: +I[id:5, op:'DELETE', name:null] | ||||||
| SELECT * FROM TO_CHANGELOG( | ||||||
| input => TABLE upsert_source PARTITION BY id, | ||||||
| produces_full_deletes => false | ||||||
| ) | ||||||
|
|
||||||
| -- Retract source: -D[id:5, name:'Alice'] (full pre-image). | ||||||
| -- Output: +I[id:5, op:'DELETE', name:null] | ||||||
| SELECT * FROM TO_CHANGELOG( | ||||||
| input => TABLE retract_source PARTITION BY id, | ||||||
| produces_full_deletes => false | ||||||
| ) | ||||||
| ``` | ||||||
|
|
||||||
| #### Partitioning by a key | ||||||
|
|
||||||
| ```sql | ||||||
|
|
@@ -434,6 +506,15 @@ Table result = myTable.toChangelog( | |||||
| map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") | ||||||
| ); | ||||||
|
|
||||||
| // Require fully-populated DELETE rows from the input (inserts a ChangelogNormalize for | ||||||
| // upsert sources). When false (default), no full-delete requirement is enforced; in row | ||||||
| // semantics the function preserves the input's upsert key columns on DELETE and nulls the | ||||||
| // rest (passes the input through unchanged when no upsert key is derivable). In set | ||||||
| // semantics non-partition-key columns are nulled on DELETE. | ||||||
| Table result = myTable.toChangelog( | ||||||
| lit(true).asArgument("produces_full_deletes") | ||||||
| ); | ||||||
|
|
||||||
| // Set semantics: co-locate rows with the same key in the same parallel operator instance. | ||||||
| // Equivalent to PARTITION BY in SQL. The partition keys are prepended to the output columns. | ||||||
| Table result = myTable.partitionBy($("id")).toChangelog(); | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -204,9 +204,18 @@ public interface PartitionedTable { | |||||||||||||||||||||||||||
| * descriptor("deleted").asArgument("op"), | ||||||||||||||||||||||||||||
| * map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") | ||||||||||||||||||||||||||||
| * ); | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * // Opt out of full-delete semantics. The default (true) requires fully-populated DELETE | ||||||||||||||||||||||||||||
| * // rows and inserts a ChangelogNormalize for upsert sources. When false, the function | ||||||||||||||||||||||||||||
| * // preserves the partition key on DELETE rows and nulls the rest, which avoids the | ||||||||||||||||||||||||||||
| * // stateful normalization operator upstream. | ||||||||||||||||||||||||||||
| * Table result = table | ||||||||||||||||||||||||||||
| * .partitionBy($("id")) | ||||||||||||||||||||||||||||
| * .toChangelog(lit(false).asArgument("produces_full_deletes")); | ||||||||||||||||||||||||||||
|
Comment on lines
+208
to
+214
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's keep the api usage simple with only the info the user needs. More details can be found in the documentation |
||||||||||||||||||||||||||||
| * }</pre> | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * @param arguments optional named arguments for {@code op} and {@code op_mapping} | ||||||||||||||||||||||||||||
| * @param arguments optional named arguments for {@code op}, {@code op_mapping}, and {@code | ||||||||||||||||||||||||||||
| * produces_full_deletes} | ||||||||||||||||||||||||||||
| * @return an append-only {@link Table} with output schema {@code [partition_keys, op, | ||||||||||||||||||||||||||||
| * non_partition_input_columns]} | ||||||||||||||||||||||||||||
| * @see Table#toChangelog(Expression...) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -1459,9 +1459,19 @@ default TableResult executeInsert( | |||||||||||||||||||||
| * descriptor("deleted").asArgument("op"), | ||||||||||||||||||||||
| * map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") | ||||||||||||||||||||||
| * ); | ||||||||||||||||||||||
| * | ||||||||||||||||||||||
| * // Opt out of full-delete semantics. The default (true) requires fully-populated DELETE | ||||||||||||||||||||||
| * // rows and inserts a ChangelogNormalize for upsert sources. When false, the function | ||||||||||||||||||||||
| * // emits partial DELETE rows: row semantics preserves the planner-derived upsert key | ||||||||||||||||||||||
| * // columns and nulls the rest; set semantics preserves the partition key. Requires an | ||||||||||||||||||||||
| * // upsert key or PARTITION BY; otherwise the call is rejected. | ||||||||||||||||||||||
|
Comment on lines
+1462
to
+1467
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||
| * Table result = table.toChangelog( | ||||||||||||||||||||||
| * lit(false).asArgument("produces_full_deletes") | ||||||||||||||||||||||
| * ); | ||||||||||||||||||||||
| * }</pre> | ||||||||||||||||||||||
| * | ||||||||||||||||||||||
| * @param arguments optional named arguments for {@code op} and {@code op_mapping} | ||||||||||||||||||||||
| * @param arguments optional named arguments for {@code op}, {@code op_mapping}, and {@code | ||||||||||||||||||||||
| * produces_full_deletes} | ||||||||||||||||||||||
| * @return an append-only {@link Table} with an {@code op} column prepended to the input columns | ||||||||||||||||||||||
| */ | ||||||||||||||||||||||
| Table toChangelog(Expression... arguments); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's have here a short description with the information that the user needs to use it. Detailed information is in the section below