From 7055275860a5d7ab718c9d0f3053809a2f751ed7 Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Sat, 23 May 2026 01:32:09 -0400 Subject: [PATCH 1/2] [FLINK-39740][table-runtime] Do not regress highestSqnAndSizeState on row replace in LinkedMultiSetState MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit LinkedMultiSetState.add() unconditionally wrote the computed newSqn back into highestSqnAndSizeState. In the replace branch, newSqn was set to the existing row's SQN — generally less than the current highSqn — so the write regressed the highest-SQN invariant. The next genuinely new row would then compute newSqn = highSqn + 1, collide with an existing node in sqnToNodeState, and silently corrupt the doubly-linked list. --- .../linked/LinkedMultiSetState.java | 2 +- .../SequencedMultiSetStateTest.java | 27 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java index 840fe007ebf84..801bab4a973ed 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java @@ -185,8 +185,8 @@ public StateChangeInfo add(RowData row, long timestamp) throws Exceptio isNewRowKey ? new Node(row, newSqn, highSqn, null, null, timestamp) : sqnToNodeState.get(oldSqn).withRow(row, timestamp)); - highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize)); if (isNewRowKey) { + highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize)); rowToSqnState.put(key, RowSqnInfo.ofSingle(newSqn)); if (!isNewContextKey) { sqnToNodeState.put(highSqn, sqnToNodeState.get(highSqn).withNext(newSqn)); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java index b68638115cfb7..c226bb93da7f4 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java @@ -200,6 +200,33 @@ public void testAdd() throws Exception { }); } + @TestTemplate + public void testAddAfterReplacingNonHighestRow() throws Exception { + // Regression test for FLINK-39740: replacing an existing row by key must not regress + // highestSqnAndSizeState.highSqn. Otherwise, the next genuinely new row reuses an SQN + // already taken by an existing row, overwriting its node in sqnToNodeState. + runTest( + state -> { + state.add(row("k1", "v1"), 1L); // SQN 0 + state.add(row("k2", "v2"), 2L); // SQN 1 + state.add(row("k3", "v3"), 3L); // SQN 2, highSqn = 2 + + // Replace a non-highest-SQN row. Pre-fix, this would regress highSqn from 2 + // to k1's SQN (0). + state.add(row("k1", "v1-updated"), 4L); + + // New row must get SQN 3, not collide with k2's SQN 1. + state.add(row("k4", "v4"), 5L); + + assertStateContents( + state, + Tuple2.of(row("k1", "v1-updated"), 4L), + Tuple2.of(row("k2", "v2"), 2L), + Tuple2.of(row("k3", "v3"), 3L), + Tuple2.of(row("k4", "v4"), 5L)); + }); + } + @TestTemplate public void testRemove() throws Exception { runTest( From 598ff9e857f113842b68d94e6fb82d07bfcf0e09 Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Sat, 23 May 2026 23:20:17 -0400 Subject: [PATCH 2/2] [FLINK-39740] Removed the JIRA reference in tests --- .../sequencedmultisetstate/SequencedMultiSetStateTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java index c226bb93da7f4..3679d687775c4 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java @@ -202,9 +202,9 @@ public void testAdd() throws Exception { @TestTemplate public void testAddAfterReplacingNonHighestRow() throws Exception { - // Regression test for FLINK-39740: replacing an existing row by key must not regress - // highestSqnAndSizeState.highSqn. Otherwise, the next genuinely new row reuses an SQN - // already taken by an existing row, overwriting its node in sqnToNodeState. + // Replacing an existing row by key must not regress highestSqnAndSizeState.highSqn. + // Otherwise, the next genuinely new row reuses an SQN already taken by an existing row, + // overwriting its node in sqnToNodeState. runTest( state -> { state.add(row("k1", "v1"), 1L); // SQN 0