Skip to content

[FLINK-39636][table] Add produces_full_deletes parameter in TO_CHANGELOG#28235

Open
raminqaf wants to merge 4 commits into
apache:masterfrom
raminqaf:FLINK-39636
Open

[FLINK-39636][table] Add produces_full_deletes parameter in TO_CHANGELOG#28235
raminqaf wants to merge 4 commits into
apache:masterfrom
raminqaf:FLINK-39636

Conversation

@raminqaf
Copy link
Copy Markdown
Contributor

What is the purpose of the change

Adds the built-in TO_CHANGELOG process table function (PTF) introduced in FLIP-564. The function converts a table (insert-only, retract, or upsert) into a changelog view that surfaces the row-kind (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE) as an explicit op column.

A new produces_full_deletes boolean parameter controls how DELETE rows are emitted. When false (default), the function emits partial DELETEs: the upsert-key columns of the input table are preserved and the remaining columns are nulled. When true, full DELETE rows are passed through unchanged. Partial deletes match the contract of most upsert sinks and avoid forcing users to retain full pre-image state.

Brief change log

  • Added BuiltInFunctionDefinitions.TO_CHANGELOG and the corresponding ToChangelogFunction runtime in flink-table-api-java.
  • Added StreamPhysicalToChangelog / StreamExecToChangelog exec nodes wired through FlinkRelMdChangelogMode and FlinkRelMdUpsertKeys so the planner picks the correct downstream changelog mode.
  • Computed the input upsert key at planning time via FlinkRelMetadataQuery.getUpsertKeys(input) collapsed to one candidate via UpsertKeyUtil.smallestKey(...), and threaded it through the codegen path so the runtime knows which columns to keep on DELETE.
  • Implemented row-semantics and set-semantics variants; PARTITION BY over a row-semantics input forwards the partitioning column as the upsert key.
  • Added compiled-plan JSON support for StreamExecProcessTableFunction (new inputUpsertKeys field with a per-input empty-array default for back-compat with older plans).

Verifying this change

This change added tests and can be verified as follows:

  • New unit tests for the planner pieces in ToChangelogTest.
  • New end-to-end semantic tests in ToChangelogSemanticTests covering: insert-only inputs, retract inputs with a derivable upsert key (partial DELETE), retract inputs with produces_full_deletes=true (full DELETE pass-through), PARTITION BY over a non-leading column, single-column upsert key from the input PK, and row semantics where the upsert key comes from the PK constraint rather than PARTITION BY.
  • New restore test in ToChangelogRestoreTest validates the compiled-plan + savepoint round trip for a retract source.
  • Validation tests for misuse: missing input argument, wrong argument type, conflicting PARTITION BY and upsert key.
  • Run with: mvn -pl flink-table/flink-table-planner test -Dtest='ToChangelogTest,ToChangelogSemanticTests,ToChangelogRestoreTest'. Result: 50 pass, 1 skip (existing savepoint stub).

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes (new BuiltInFunctionDefinitions.TO_CHANGELOG; new compiled-plan JSON field on StreamExecProcessTableFunction with a back-compat default)
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes (the ToChangelogFunction operator runs per record, but it only writes the row kind plus a projected key on DELETE; no extra state)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs + JavaDocs (built-in function docs page entry plus BuiltInFunctionDefinitions.TO_CHANGELOG JavaDoc)

Was generative AI tooling used to co-author this PR?
  • Yes
    Generated-by: Claude Opus 4.7

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 22, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@raminqaf raminqaf force-pushed the FLINK-39636 branch 8 times, most recently from 14e0413 to a8b3dbc Compare May 26, 2026 10:21
Adds an optional `produces_full_deletes` boolean parameter to the TO_CHANGELOG built-in PTF. When the caller asserts the input emits full DELETE rows, the function passes them through unchanged and the planner can skip ChangelogNormalize via the REQUIRE_FULL_DELETE conditional trait. Otherwise the function emits partial DELETE rows that preserve identifying columns (partition keys via the framework, upsert keys via the function) and null the rest.

Rejects produces_full_deletes=true when the input changelog never emits DELETE rows (insert-only mode or op_mapping strips DELETE).
raminqaf added 2 commits May 26, 2026 12:41
…angelogTypeStrategy

Aligns the structure of ToChangelogTypeStrategy with its sibling FromChangelogTypeStrategy: introduces positional argument-index constants, switches the input type strategy to ValidationOnlyInputTypeStrategy so signature/argument-count come from the StaticArguments in the function definition, splits the monolithic validateInputs into per-argument helpers, and reuses ChangelogTypeStrategyUtils.resolveOpColumnName instead of the local duplicate.
Adds upsertKeyColumns() to TableSemantics, populated by the planner via FlinkRelMetadataQuery.getUpsertKeys. ProcessTableFunctions can now read the upsert key of each input table at specialization time without having to re-derive it from a RelNode or require the caller to repeat the key via PARTITION BY.

Plumbs the value end-to-end: StreamPhysicalProcessTableFunction queries the metadata, StreamExecProcessTableFunction persists it as @JsonProperty inputUpsertKeys (one entry per table input) so compiled plans round-trip, and OperatorBindingCallContext / BridgingSqlFunction / ProcessTableRunnerGenerator thread it to the specialized function constructor.
@raminqaf raminqaf force-pushed the FLINK-39636 branch 5 times, most recently from bed8261 to 7078ccf Compare May 26, 2026 11:24
@raminqaf raminqaf marked this pull request as ready for review May 26, 2026 11:25
…ert key

Consumes the new TableSemantics.upsertKeyColumns() in ToChangelogFunction to emit partial DELETE rows in row semantics when the input declares an upsert key. Identifying columns are preserved on the DELETE row, non-key columns are emitted as null. Without an upsert key the function still passes the input through unchanged.

[hotfix][table] Use generic Map<String, String> for ToChangelogTypeStrategy op_mapping

Drops raw Map types and the 'rawtypes' suppressions in validateOpMappingKeys and mapsDelete by passing the resolved op_mapping argument as Map<String, String>. Also extracts the literal 'DELETE' string into a constant derived from RowKind.DELETE.name().
* we cannot null anything safely and fall back to passing the input through unchanged, which is
* equivalent to a full delete.
*/
private static boolean resolveProducesFullDelete(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this resolving to saying that we produce partial deletes as default?

  • I think we should probably go with full deletes as default
  • If the user specify partial delete, we try to accomodate. If we can't, we throw an error

private final int[] outputIndices;
private final RowType inputRowType;
private final boolean producesFullDelete;
private final boolean[] preserveOnDelete;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this probably makes more sense

Suggested change
private final boolean[] preserveOnDelete;
private final boolean[] upsertKeyIndex;

Or upsertKeysAtIndex? or let me know if you have a better idea

validateOpMap(this.rawOpMap, tableSemantics);
}
final boolean producesFullDeletesArg =
callContext.getArgumentValue(3, Boolean.class).orElse(false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we add constant for these indexes?

Comment on lines +428 to +434
When the input has no derivable upsert key (e.g. a pure append-only source, or an upstream operator that erased the key), there is no identifying column to preserve. The function then passes the input through unchanged.

```sql
-- Source emits -D[id:5] (key-only, no declared key).
-- Output: +I[op:'DELETE', id:5, name:null]
SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
```
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example here is not matching what you want to show. You mention it passes through unchanged but the example does name:null


@Override
public int[] upsertKeyColumns() {
return new int[0];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we run into a out of index exception here? Can we guarantee that int[0] is the shortest key?

What happens if the user has for example two primary keys defined for a table and they're both of length 1? We randomly pick one? I first thought we would expose all upsertKeys. If we only expose one, we have to make it clear in the documentation and explain which one we expose to the user

// sources that emit key-only deletes.
TraitCondition.and(
TraitCondition.argIsEqualTo(
"produces_full_deletes", Boolean.TRUE),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's go with produces_full_deletes yes as default (adjustment from the flip). This aligns with what we added in the first versions and I think it's a better default behavior than the opposite.

Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this, @raminqaf! Added the first set of comments, take a look

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants