diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java index 840fe007ebf84..801bab4a973ed 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java @@ -185,8 +185,8 @@ public StateChangeInfo add(RowData row, long timestamp) throws Exceptio isNewRowKey ? new Node(row, newSqn, highSqn, null, null, timestamp) : sqnToNodeState.get(oldSqn).withRow(row, timestamp)); - highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize)); if (isNewRowKey) { + highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize)); rowToSqnState.put(key, RowSqnInfo.ofSingle(newSqn)); if (!isNewContextKey) { sqnToNodeState.put(highSqn, sqnToNodeState.get(highSqn).withNext(newSqn)); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java index b68638115cfb7..3679d687775c4 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java @@ -200,6 +200,33 @@ public void testAdd() throws Exception { }); } + @TestTemplate + public void testAddAfterReplacingNonHighestRow() throws Exception { + // Replacing an existing row by key must not regress highestSqnAndSizeState.highSqn. + // Otherwise, the next genuinely new row reuses an SQN already taken by an existing row, + // overwriting its node in sqnToNodeState. + runTest( + state -> { + state.add(row("k1", "v1"), 1L); // SQN 0 + state.add(row("k2", "v2"), 2L); // SQN 1 + state.add(row("k3", "v3"), 3L); // SQN 2, highSqn = 2 + + // Replace a non-highest-SQN row. Pre-fix, this would regress highSqn from 2 + // to k1's SQN (0). + state.add(row("k1", "v1-updated"), 4L); + + // New row must get SQN 3, not collide with k2's SQN 1. + state.add(row("k4", "v4"), 5L); + + assertStateContents( + state, + Tuple2.of(row("k1", "v1-updated"), 4L), + Tuple2.of(row("k2", "v2"), 2L), + Tuple2.of(row("k3", "v3"), 3L), + Tuple2.of(row("k4", "v4"), 5L)); + }); + } + @TestTemplate public void testRemove() throws Exception { runTest(