From 4016851c9e183ba383b0bf06d9ff960bf515d4bf Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Sat, 23 May 2026 02:52:58 -0400 Subject: [PATCH 1/5] [FLINK-39735][table] Expose input upsert key on TableSemantics for PTFs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds `TableSemantics#upsertKeyColumns()` so `ProcessTableFunction`s can see the planner-derived upsert key of each table-typed input without forcing callers to repeat it via `PARTITION BY`. The planner already derives this via `FlinkRelMetadataQuery.getUpsertKeys(input)` (metadata handler chain in `FlinkRelMdUpsertKeys`); this PR plumbs the result end-to-end so it becomes visible to PTF authors at specialization and runtime — collapsed to one candidate via `UpsertKeyUtil.smallestKey(...)`. --- .../flink/table/functions/TableSemantics.java | 21 ++++++++++++ .../inference/utils/TableSemanticsMock.java | 19 ++++++++++- .../bridging/BridgingSqlFunction.java | 12 ++++--- .../inference/OperatorBindingCallContext.java | 30 +++++++++++++---- .../StreamExecProcessTableFunction.java | 32 ++++++++++++++++--- .../StreamPhysicalProcessTableFunction.java | 24 +++++++++++++- .../codegen/ProcessTableRunnerGenerator.scala | 10 ++++-- .../process/RuntimeTableSemantics.java | 10 +++++- ...tTableOperatorInterruptibleTimersTest.java | 3 +- .../functions/TestHarnessTableSemantics.java | 12 +++++++ 10 files changed, 151 insertions(+), 22 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java index f63566befce7d..7382a9a9b848a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java @@ -86,6 +86,27 @@ public interface TableSemantics { */ int[] partitionByColumns(); + /** + * Returns the upsert key of the passed table as derived by the planner from primary key + * constraints and the rewritten relational plan. The upsert key uniquely identifies a row + * within the input changelog and survives planner transformations that preserve key semantics + * (e.g. filters, projections that retain key columns). Applies to both table arguments with row + * and set semantics. + * + *

This complements {@link #partitionByColumns()}: a caller is not required to repeat the + * primary key via {@code PARTITION BY} just so a PTF can identify rows - the planner already + * knows the key from the input table's declaration. + * + * @return An array of indexes (0-based) that specify the upsert key columns. Returns an empty + * array if the planner could not derive an upsert key for the input (e.g., append-only + * sources without a declared primary key, or operations that destroyed the key). Returns an + * empty array during the type inference phase as the upsert key is still unknown at that + * point. + */ + default int[] upsertKeyColumns() { + return new int[0]; + } + /** * Returns information about how the passed table is ordered. Applies only to table arguments * with set semantics. diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java index fe881f8fd1fc2..ad8cad8379ab9 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java @@ -35,9 +35,10 @@ public class TableSemanticsMock implements TableSemantics { private final SortDirection[] orderByDirections; private final int timeColumn; private final ChangelogMode changelogMode; + private final int[] upsertKeyColumns; public TableSemanticsMock(DataType dataType) { - this(dataType, new int[0], new int[0], -1, null); + this(dataType, new int[0], new int[0], -1, null, new int[0]); } public TableSemanticsMock( @@ -46,6 +47,16 @@ public TableSemanticsMock( int[] orderByColumns, int timeColumn, @Nullable ChangelogMode changelogMode) { + this(dataType, partitionByColumns, orderByColumns, timeColumn, changelogMode, new int[0]); + } + + public TableSemanticsMock( + DataType dataType, + int[] partitionByColumns, + int[] orderByColumns, + int timeColumn, + @Nullable ChangelogMode changelogMode, + int[] upsertKeyColumns) { this.dataType = dataType; this.partitionByColumns = partitionByColumns; this.orderByColumns = orderByColumns; @@ -55,6 +66,7 @@ public TableSemanticsMock( } this.timeColumn = timeColumn; this.changelogMode = changelogMode; + this.upsertKeyColumns = upsertKeyColumns; } @Override @@ -82,6 +94,11 @@ public int timeColumn() { return timeColumn; } + @Override + public int[] upsertKeyColumns() { + return upsertKeyColumns; + } + @Override public Optional changelogMode() { return Optional.ofNullable(changelogMode); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java index 2c393adf875ec..2e279a0508f32 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java @@ -335,19 +335,20 @@ public boolean hasScalarArgument(String name) { * scalar arguments through the same coercion path as validation. */ public CallContext toCallContext(RexCall call) { - return toCallContext(call, null, null, null); + return toCallContext(call, null, null, null, null); } /** * Variant of {@link #toCallContext(RexCall)} that additionally exposes the call's input time - * columns and changelog modes - needed by the streaming codegen path so PTFs can specialize - * themselves to the exact call. + * columns, changelog modes, and per-input upsert keys - needed by the streaming codegen path + * so PTFs can specialize themselves to the exact call. */ public CallContext toCallContext( RexCall call, @Nullable List inputTimeColumns, @Nullable List inputChangelogModes, - @Nullable ChangelogMode outputChangelogMode) { + @Nullable ChangelogMode outputChangelogMode, + @Nullable List inputUpsertKeys) { return new OperatorBindingCallContext( dataTypeFactory, getDefinition(), @@ -355,7 +356,8 @@ public CallContext toCallContext( call.getType(), inputTimeColumns, inputChangelogModes, - outputChangelogMode); + outputChangelogMode, + inputUpsertKeys); } /** diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java index f31406dad196d..47ae23ed482c8 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java @@ -64,13 +64,14 @@ public final class OperatorBindingCallContext extends AbstractSqlCallContext { private final @Nullable List inputTimeColumns; private final @Nullable List inputChangelogModes; private final @Nullable ChangelogMode outputChangelogMode; + private final @Nullable List inputUpsertKeys; public OperatorBindingCallContext( DataTypeFactory dataTypeFactory, FunctionDefinition definition, SqlOperatorBinding binding, RelDataType returnRelDataType) { - this(dataTypeFactory, definition, binding, returnRelDataType, null, null, null); + this(dataTypeFactory, definition, binding, returnRelDataType, null, null, null, null); } public OperatorBindingCallContext( @@ -80,7 +81,8 @@ public OperatorBindingCallContext( RelDataType returnRelDataType, @Nullable List inputTimeColumns, @Nullable List inputChangelogModes, - @Nullable ChangelogMode outputChangelogMode) { + @Nullable ChangelogMode outputChangelogMode, + @Nullable List inputUpsertKeys) { super( dataTypeFactory, definition, @@ -109,6 +111,7 @@ public int size() { this.inputTimeColumns = inputTimeColumns; this.inputChangelogModes = inputChangelogModes; this.outputChangelogMode = outputChangelogMode; + this.inputUpsertKeys = inputUpsertKeys; } @Override @@ -173,13 +176,18 @@ public Optional getTableSemantics(int pos) { Optional.ofNullable(inputChangelogModes) .map(m -> m.get(tableArgCall.getInputIndex())) .orElse(null); + final int[] upsertKeyColumns = + Optional.ofNullable(inputUpsertKeys) + .map(m -> m.get(tableArgCall.getInputIndex())) + .orElse(new int[0]); return Optional.of( OperatorBindingTableSemantics.create( argumentDataTypes.get(pos), staticArg, tableArgCall, timeColumn, - changelogMode)); + changelogMode, + upsertKeyColumns)); } @Override @@ -283,20 +291,23 @@ private static class OperatorBindingTableSemantics implements TableSemantics { private final SortDirection[] orderByDirections; private final int timeColumn; private final @Nullable ChangelogMode changelogMode; + private final int[] upsertKeyColumns; public static OperatorBindingTableSemantics create( DataType tableDataType, StaticArgument staticArg, RexTableArgCall tableArgCall, int timeColumn, - @Nullable ChangelogMode changelogMode) { + @Nullable ChangelogMode changelogMode, + int[] upsertKeyColumns) { return new OperatorBindingTableSemantics( createDataType(tableDataType, staticArg), tableArgCall.getPartitionKeys(), tableArgCall.getOrderKeys(), RexTableArgCall.toSortDirections(tableArgCall.getSortOrder()), timeColumn, - changelogMode); + changelogMode, + upsertKeyColumns); } private OperatorBindingTableSemantics( @@ -305,13 +316,15 @@ private OperatorBindingTableSemantics( int[] orderByColumns, SortDirection[] orderByDirections, int timeColumn, - @Nullable ChangelogMode changelogMode) { + @Nullable ChangelogMode changelogMode, + int[] upsertKeyColumns) { this.dataType = dataType; this.partitionByColumns = partitionByColumns; this.orderByColumns = orderByColumns; this.orderByDirections = orderByDirections; this.timeColumn = timeColumn; this.changelogMode = changelogMode; + this.upsertKeyColumns = upsertKeyColumns; } private static DataType createDataType(DataType tableDataType, StaticArgument staticArg) { @@ -349,6 +362,11 @@ public int timeColumn() { return timeColumn; } + @Override + public int[] upsertKeyColumns() { + return upsertKeyColumns; + } + @Override public Optional changelogMode() { return Optional.ofNullable(changelogMode); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java index 3973329af7484..33de7a7bc8464 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java @@ -108,6 +108,7 @@ public class StreamExecProcessTableFunction extends ExecNodeBase public static final String FIELD_NAME_FUNCTION_CALL = "functionCall"; public static final String FIELD_NAME_INPUT_CHANGELOG_MODES = "inputChangelogModes"; public static final String FIELD_NAME_OUTPUT_CHANGELOG_MODE = "outputChangelogMode"; + public static final String FIELD_NAME_INPUT_UPSERT_KEYS = "inputUpsertKeys"; @JsonProperty(FIELD_NAME_UID) private final @Nullable String uid; @@ -121,6 +122,9 @@ public class StreamExecProcessTableFunction extends ExecNodeBase @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) private final ChangelogMode outputChangelogMode; + @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS) + private final List inputUpsertKeys; + public StreamExecProcessTableFunction( ReadableConfig tableConfig, List inputProperties, @@ -129,7 +133,8 @@ public StreamExecProcessTableFunction( @Nullable String uid, RexCall invocation, List inputChangelogModes, - ChangelogMode outputChangelogMode) { + ChangelogMode outputChangelogMode, + List inputUpsertKeys) { this( ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecProcessTableFunction.class), @@ -141,7 +146,8 @@ public StreamExecProcessTableFunction( uid, invocation, inputChangelogModes, - outputChangelogMode); + outputChangelogMode, + inputUpsertKeys); } @JsonCreator @@ -155,7 +161,8 @@ public StreamExecProcessTableFunction( @JsonProperty(FIELD_NAME_UID) @Nullable String uid, @JsonProperty(FIELD_NAME_FUNCTION_CALL) RexNode invocation, @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODES) List inputChangelogModes, - @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) ChangelogMode outputChangelogMode) { + @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) ChangelogMode outputChangelogMode, + @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS) @Nullable List inputUpsertKeys) { super(id, context, persistedConfig, inputProperties, outputType, description); this.uid = uid; // Mirror the FlinkLogicalTableFunctionScan converter for the compiled-plan restore path: @@ -164,6 +171,14 @@ public StreamExecProcessTableFunction( this.invocation = BridgingSqlFunction.resolveCallTraits((RexCall) invocation); this.inputChangelogModes = inputChangelogModes; this.outputChangelogMode = outputChangelogMode; + // Older compiled plans (pre-FLINK-39735) did not persist this field. Default to per-input + // empty arrays so the runtime sees the same behavior as before (no derivable upsert key). + this.inputUpsertKeys = + inputUpsertKeys != null + ? inputUpsertKeys + : IntStream.range(0, inputChangelogModes.size()) + .mapToObj(i -> new int[0]) + .collect(Collectors.toList()); } public @Nullable String getUid() { @@ -202,7 +217,12 @@ protected Transformation translateToPlanInternal( final RexCall udfCall = StreamPhysicalProcessTableFunction.toUdfCall(invocation); final GeneratedRunnerResult generated = ProcessTableRunnerGenerator.generate( - ctx, udfCall, inputTimeColumns, inputChangelogModes, outputChangelogMode); + ctx, + udfCall, + inputTimeColumns, + inputChangelogModes, + outputChangelogMode, + inputUpsertKeys); final GeneratedProcessTableRunner generatedRunner = generated.runner(); final LinkedHashMap stateInfos = generated.stateInfos(); @@ -309,6 +329,7 @@ private RuntimeTableSemantics createRuntimeTableSemantics( } final int timeColumn = inputTimeColumns.get(tableArgCall.getInputIndex()); + final int[] upsertKeyColumns = inputUpsertKeys.get(tableArgCall.getInputIndex()); return new RuntimeTableSemantics( tableArg.getName(), @@ -320,7 +341,8 @@ private RuntimeTableSemantics createRuntimeTableSemantics( consumedChangelogMode, tableArg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH), tableArg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE), - timeColumn); + timeColumn, + upsertKeyColumns); } private Transformation createKeyedTransformation( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java index 5ccecf18e71be..6f90b38e431d1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java @@ -27,11 +27,13 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.calcite.RexTableArgCall; import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; +import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan; import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils; +import org.apache.flink.table.planner.plan.utils.UpsertKeyUtil; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.planner.utils.ShortcutUtils; import org.apache.flink.table.types.inference.StaticArgument; @@ -165,6 +167,7 @@ public ExecNode translateToExecNode() { verifyTimeAttributes(getInputs(), call, inputChangelogModes, outputChangelogMode); final List> providedInputArgs = getProvidedInputArgs(call); verifyPassThroughColumnsForUpdates(providedInputArgs, outputChangelogMode); + final List inputUpsertKeys = deriveInputUpsertKeys(getInputs()); return new StreamExecProcessTableFunction( unwrapTableConfig(this), getInputs().stream().map(i -> InputProperty.DEFAULT).collect(Collectors.toList()), @@ -173,7 +176,26 @@ public ExecNode translateToExecNode() { uid, call, inputChangelogModes, - outputChangelogMode); + outputChangelogMode, + inputUpsertKeys); + } + + /** + * Derives an upsert key (collapsed to one candidate via {@link UpsertKeyUtil#smallestKey}) for + * each input. Returns an empty array entry for inputs without a derivable upsert key + * (append-only sources without a declared primary key, or operations that destroyed the key). + * Surfaces as {@link org.apache.flink.table.functions.TableSemantics#upsertKeyColumns()} so + * PTFs can identify rows without requiring callers to repeat the key via PARTITION BY. + */ + private static List deriveInputUpsertKeys(List inputs) { + final List perInput = new ArrayList<>(inputs.size()); + for (RelNode input : inputs) { + final FlinkRelMetadataQuery fmq = + FlinkRelMetadataQuery.reuseOrCreate(input.getCluster().getMetadataQuery()); + final Set upsertKeys = fmq.getUpsertKeys(input); + perInput.add(UpsertKeyUtil.smallestKey(upsertKeys).orElse(new int[0])); + } + return perInput; } @Override diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala index 52df803d5c8f8..402157aa0fdb6 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala @@ -65,7 +65,8 @@ object ProcessTableRunnerGenerator { udfCall: RexCall, inputTimeColumns: java.util.List[Integer], inputChangelogModes: java.util.List[ChangelogMode], - outputChangelogMode: ChangelogMode): GeneratedRunnerResult = { + outputChangelogMode: ChangelogMode, + inputUpsertKeys: java.util.List[Array[Int]]): GeneratedRunnerResult = { val function: BridgingSqlFunction = udfCall.getOperator.asInstanceOf[BridgingSqlFunction] val definition: FunctionDefinition = function.getDefinition val dataTypeFactory = function.getDataTypeFactory @@ -77,7 +78,12 @@ object ProcessTableRunnerGenerator { // Thus, functions can reconfigure themselves for the exact use case. // Including updating their state layout. val callContext = - function.toCallContext(udfCall, inputTimeColumns, inputChangelogModes, outputChangelogMode) + function.toCallContext( + udfCall, + inputTimeColumns, + inputChangelogModes, + outputChangelogMode, + inputUpsertKeys) // Create the final UDF for runtime val udf = UserDefinedFunctionHelper.createSpecializedFunction( diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java index cabab4c613143..6719adda32034 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java @@ -44,6 +44,7 @@ public class RuntimeTableSemantics implements TableSemantics, Serializable { private final boolean passColumnsThrough; private final boolean hasSetSemantics; private final int timeColumn; + private final int[] upsertKeyColumns; private transient ChangelogMode changelogMode; @@ -57,7 +58,8 @@ public RuntimeTableSemantics( RuntimeChangelogMode consumedChangelogMode, boolean passColumnsThrough, boolean hasSetSemantics, - int timeColumn) { + int timeColumn, + int[] upsertKeyColumns) { this.argName = argName; this.inputIndex = inputIndex; this.dataType = dataType; @@ -68,6 +70,7 @@ public RuntimeTableSemantics( this.passColumnsThrough = passColumnsThrough; this.hasSetSemantics = hasSetSemantics; this.timeColumn = timeColumn; + this.upsertKeyColumns = upsertKeyColumns; } public String getArgName() { @@ -118,6 +121,11 @@ public int timeColumn() { return timeColumn; } + @Override + public int[] upsertKeyColumns() { + return upsertKeyColumns; + } + @Override public Optional changelogMode() { return Optional.of(getChangelogMode()); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java index be390ab5f5557..38a0df4811168 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java @@ -246,7 +246,8 @@ private static RuntimeTableSemantics tableSemantics() { RuntimeChangelogMode.serialize(ChangelogMode.insertOnly()), /* passColumnsThrough */ false, /* hasSetSemantics */ true, - /* timeColumn */ 1); + /* timeColumn */ 1, + /* upsertKeyColumns */ new int[0]); } // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java index fadf21d7dd942..0ddd8d5c5a82c 100644 --- a/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java +++ b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java @@ -30,10 +30,17 @@ class TestHarnessTableSemantics implements TableSemantics { private final DataType dataType; private final int[] partitionByColumns; + private final int[] upsertKeyColumns; TestHarnessTableSemantics(DataType dataType, int[] partitionByColumns) { + this(dataType, partitionByColumns, new int[0]); + } + + TestHarnessTableSemantics( + DataType dataType, int[] partitionByColumns, int[] upsertKeyColumns) { this.dataType = dataType; this.partitionByColumns = partitionByColumns; + this.upsertKeyColumns = upsertKeyColumns; } @Override @@ -61,6 +68,11 @@ public int timeColumn() { return -1; } + @Override + public int[] upsertKeyColumns() { + return upsertKeyColumns; + } + @Override public Optional changelogMode() { return Optional.empty(); From e8ffe84649e7f813867c24a9926a8ef6cb83acbb Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Sat, 23 May 2026 03:04:15 -0400 Subject: [PATCH 2/5] Adding a small test fixture next to the existing ContextFunction for a full integration test --- .../exec/stream/ProcessTableFunctionTestUtils.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java index 8d83e51770d59..76187ed679e0b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java @@ -456,6 +456,18 @@ public void eval(Context ctx, @ArgumentHint(SET_SEMANTIC_TABLE) Row r, String s) } } + /** + * Testing function for FLINK-39735: surfaces the planner-derived upsert key on {@link + * TableSemantics}. Used by tests to assert that {@code upsertKeyColumns()} reports the + * primary-key columns of the input even when the caller did not write {@code PARTITION BY}. + */ + public static class UpsertKeyContextFunction extends AppendProcessTableFunctionBase { + public void eval(Context ctx, @ArgumentHint(ROW_SEMANTIC_TABLE) Row r) { + final TableSemantics semantics = ctx.tableSemanticsFor("r"); + collectObjects(r, semantics.upsertKeyColumns()); + } + } + /** Testing function. */ public static class PojoStateFunction extends AppendProcessTableFunctionBase { public void eval(@StateHint Score s, @ArgumentHint(SET_SEMANTIC_TABLE) Row r) { From c4a57400a5302a91583c7b6cd1115c8cd26525c3 Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Sun, 24 May 2026 00:37:44 -0400 Subject: [PATCH 3/5] [FLINK-39735] Fixed dangling so moves up at the end of prev line atching spotless expected wrap --- .../table/planner/functions/bridging/BridgingSqlFunction.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java index 2e279a0508f32..59e67eaea94f8 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java @@ -340,8 +340,8 @@ public CallContext toCallContext(RexCall call) { /** * Variant of {@link #toCallContext(RexCall)} that additionally exposes the call's input time - * columns, changelog modes, and per-input upsert keys - needed by the streaming codegen path - * so PTFs can specialize themselves to the exact call. + * columns, changelog modes, and per-input upsert keys - needed by the streaming codegen path so + * PTFs can specialize themselves to the exact call. */ public CallContext toCallContext( RexCall call, From f1cdda392925ff138d998b5f352066a917707786 Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Sun, 24 May 2026 00:42:36 -0400 Subject: [PATCH 4/5] [FLINK-39735] Fixed Scala compile failure in flink-table-planner_2.12 --- .../program/FlinkChangelogModeInferenceProgram.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index fd646a34148b4..47939aa8dffe4 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -1676,7 +1676,12 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val inputTimeColumns = StreamPhysicalProcessTableFunction.toInputTimeColumns(process.getCall) val function = udfCall.getOperator.asInstanceOf[BridgingSqlFunction] val callContext = - function.toCallContext(udfCall, inputTimeColumns, inputChangelogModes, outputChangelogMode) + function.toCallContext( + udfCall, + inputTimeColumns, + inputChangelogModes, + outputChangelogMode, + null) // Expose a simplified context focused on changelog-relevant inputs: changelog modes, // resolved literal arguments, and table semantics (e.g., partition-by columns). From 95ba7c09c61a04b4d780c187b96e246a3912412b Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Sun, 24 May 2026 08:33:40 -0400 Subject: [PATCH 5/5] [FLINK-39735] Fix spotless violation in TestHarnessTableSemantics constructor --- .../table/runtime/functions/TestHarnessTableSemantics.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java index 0ddd8d5c5a82c..1f81a36ed767a 100644 --- a/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java +++ b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java @@ -36,8 +36,7 @@ class TestHarnessTableSemantics implements TableSemantics { this(dataType, partitionByColumns, new int[0]); } - TestHarnessTableSemantics( - DataType dataType, int[] partitionByColumns, int[] upsertKeyColumns) { + TestHarnessTableSemantics(DataType dataType, int[] partitionByColumns, int[] upsertKeyColumns) { this.dataType = dataType; this.partitionByColumns = partitionByColumns; this.upsertKeyColumns = upsertKeyColumns;