From 7f6b79beb6ab200bb5ce05e0f4475ce38ebd3db3 Mon Sep 17 00:00:00 2001 From: syalla Date: Tue, 16 Jun 2026 19:36:56 +0000 Subject: [PATCH 1/5] Fix OCC conflict resolution failure during Flink recommit on restart During Flink job restart, StreamWriteOperatorCoordinator.restoreEvents() calls recommitInstant() without initializing lastCompletedTxnAndMetadata. This causes OCC conflict resolution to use INIT_INSTANT_TS as baseline, checking against ALL completed instants on the timeline instead of only those that completed concurrently. Since streaming upserts repeatedly write to the same file groups, this always finds overlapping file IDs and throws HoodieWriteConflictException. The fix initializes the transaction state before recommitting by finding the last completed instant whose requested time and completion time are both before the inflight instant's requested time. This ensures conflict resolution only checks against genuinely concurrent commits, while still detecting real conflicts from other writers during downtime. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../hudi/client/utils/TransactionUtils.java | 26 +++++++++++++++++++ .../hudi/client/HoodieFlinkWriteClient.java | 14 ++++++++++ .../sink/StreamWriteOperatorCoordinator.java | 5 ++++ 3 files changed, 45 insertions(+) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java index 6b5ac8c575aa4..745f2dbbcf922 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java @@ -39,11 +39,15 @@ import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.util.Comparator; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN; +import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; + import static org.apache.hudi.config.HoodieWriteConfig.ENABLE_SCHEMA_CONFLICT_RESOLUTION; @Slf4j @@ -141,6 +145,28 @@ public static Option>> getLastCompletedT return getHoodieInstantAndMetaDataPair(metaClient, hoodieInstantOption); } + /** + * Get the last completed transaction hoodie instant before the given instant time. + * The returned instant has both requested time and completion time less than the given instant time, + * ensuring it was fully completed before the given instant was created. + * + * @param metaClient table meta client + * @param currentInstantTime the requested time of the current inflight instant + * @return the last completed instant before the given instant, with its extra metadata + */ + public static Option>> getLastCompletedTxnInstantAndMetadata( + HoodieTableMetaClient metaClient, String currentInstantTime) { + Option hoodieInstantOption = Option.fromJavaOptional( + metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants() + .findInstantsBefore(currentInstantTime) + .getInstantsAsStream() + .filter(instant -> instant.getCompletionTime() != null + && compareTimestamps(instant.getCompletionTime(), LESSER_THAN, currentInstantTime)) + .max(Comparator.comparing(HoodieInstant::getCompletionTime))); + return getHoodieInstantAndMetaDataPair(metaClient, hoodieInstantOption); + } + private static Option>> getHoodieInstantAndMetaDataPair(HoodieTableMetaClient metaClient, Option hoodieInstantOption) { try { if (hoodieInstantOption.isPresent()) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 228209cf1df6f..66d7a71f0396d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -397,6 +397,20 @@ public void preTxn(WriteOperationType operationType, HoodieTableMetaClient metaC tableServiceClient.startAsyncArchiveService(this); } + /** + * Initializes the transaction state for recommitting an inflight instant during recovery. + * Sets lastCompletedTxnAndMetadata to the last completed instant whose requested time + * and completion time are both before the given instant's requested time, ensuring + * OCC conflict resolution only checks against genuinely concurrent commits. + */ + public void preTxnForRecommit(WriteOperationType operationType, HoodieTableMetaClient metaClient, String currentInstantTime) { + if (txnManager.isLockRequired() && config.needResolveWriteConflict(operationType, metaClient.isMetadataTable(), config, metaClient.getTableConfig())) { + this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient, currentInstantTime); + this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient); + this.pendingInflightAndRequestedInstants.remove(currentInstantTime); + } + } + /** * Initialized the metadata table on start up, should only be called once on driver. */ diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 7b9095258903e..ccc21373af3e9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -547,6 +547,11 @@ private boolean recommitInstant(HoodieTimeline completedTimeline, long checkpoin if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) { writeClient.getHeartbeatClient().start(instant); } + // Initialize the transaction state so that OCC conflict resolution uses the correct + // baseline: the last completed instant before this inflight instant was created. + // Without this, lastCompletedTxnAndMetadata is empty and conflict resolution checks + // against all completed instants on the timeline, causing false conflicts. + writeClient.preTxnForRecommit(tableState.operationType, this.metaClient, instant); return commitInstant(checkpointId, instant, bootstrapBuffer); } else { // clean the corresponding event buffer if the instant is already committed. From 7ed75d53e98d064501a143544e9e61cd5c6cd537 Mon Sep 17 00:00:00 2001 From: syalla Date: Tue, 16 Jun 2026 20:29:02 +0000 Subject: [PATCH 2/5] Fix checkstyle: remove extra blank line between static import groups Co-Authored-By: Claude Opus 4.6 (1M context) --- .../main/java/org/apache/hudi/client/utils/TransactionUtils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java index 745f2dbbcf922..7906cd8ecc0ae 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java @@ -47,7 +47,6 @@ import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN; import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; - import static org.apache.hudi.config.HoodieWriteConfig.ENABLE_SCHEMA_CONFLICT_RESOLUTION; @Slf4j From 25dfa28fba3129d00a6bee93a66a32bd35f4bce3 Mon Sep 17 00:00:00 2001 From: syalla Date: Wed, 17 Jun 2026 23:12:52 +0000 Subject: [PATCH 3/5] Address review comments --- .../hudi/client/utils/TransactionUtils.java | 2 +- .../client/utils/TestTransactionUtils.java | 81 +++++++++++++++++++ .../sink/StreamWriteOperatorCoordinator.java | 21 ++++- .../hudi/sink/TestWriteMergeOnRead.java | 18 +++++ 4 files changed, 117 insertions(+), 5 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java index 7906cd8ecc0ae..16ec26d5d8366 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java @@ -162,7 +162,7 @@ public static Option>> getLastCompletedT .getInstantsAsStream() .filter(instant -> instant.getCompletionTime() != null && compareTimestamps(instant.getCompletionTime(), LESSER_THAN, currentInstantTime)) - .max(Comparator.comparing(HoodieInstant::getCompletionTime))); + .max(Comparator.comparing(HoodieInstant::requestedTime))); return getHoodieInstantAndMetaDataPair(metaClient, hoodieInstantOption); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java index c243a49d599ac..a4437806c4dca 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieWriteConflictException; import org.apache.hudi.table.HoodieTable; @@ -41,6 +42,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.Map; import java.util.Properties; import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCommit; @@ -49,7 +51,10 @@ import static org.apache.hudi.common.model.WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL; import static org.apache.hudi.common.model.WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL; import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -132,4 +137,80 @@ void resolveWriteConflictIfAnyNoExceptionForMetadataTable() throws Exception { // since we bypass entire conflict resolution verify(spyMetaClient, times(0)).reloadActiveTimeline(); } + + @Test + void getLastCompletedTxnInstantAndMetadataSelectsMaxRequestedTime() throws Exception { + // Simulate interleaved completions: + // Commit A: requested=T1, completed=T5 (slow commit) + // Commit B: requested=T2, completed=T3 (fast commit) + // currentInstantTime=T6 + // The method should return B (max requestedTime=T2), not A (max completionTime=T5). + String t1 = "20240101010101000"; + String t2 = "20240101010102000"; + String t3 = "20240101010103000"; + String t5 = "20240101010105000"; + String t6 = "20240101010106000"; + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + HoodieCommitMetadata metadataA = createCommitMetadata(t1, "file-1"); + HoodieCommitMetadata metadataB = createCommitMetadata(t2, "file-2"); + testTable.addCommit(t1, Option.of(t5), Option.of(metadataA)); + testTable.addCommit(t2, Option.of(t3), Option.of(metadataB)); + + metaClient.reloadActiveTimeline(); + Option>> result = + TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient, t6); + + assertTrue(result.isPresent()); + assertEquals(t2, result.get().getLeft().requestedTime(), + "Should select the instant with the latest requestedTime (T2), not the latest completionTime (T5)"); + } + + @Test + void getLastCompletedTxnInstantAndMetadataExcludesInstantsCompletedAfterCurrent() throws Exception { + // Commit A: requested=T1, completed=T3 (completed before current) + // Commit B: requested=T2, completed=T5 (completed after current) + // currentInstantTime=T4 + // Should return A only, since B's completionTime >= currentInstantTime. + String t1 = "20240101010101000"; + String t2 = "20240101010102000"; + String t3 = "20240101010103000"; + String t4 = "20240101010104000"; + String t5 = "20240101010105000"; + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + HoodieCommitMetadata metadataA = createCommitMetadata(t1, "file-1"); + HoodieCommitMetadata metadataB = createCommitMetadata(t2, "file-2"); + testTable.addCommit(t1, Option.of(t3), Option.of(metadataA)); + testTable.addCommit(t2, Option.of(t5), Option.of(metadataB)); + + metaClient.reloadActiveTimeline(); + Option>> result = + TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient, t4); + + assertTrue(result.isPresent()); + assertEquals(t1, result.get().getLeft().requestedTime(), + "Should only include instants whose completionTime < currentInstantTime"); + } + + @Test + void getLastCompletedTxnInstantAndMetadataReturnsEmptyWhenNoInstantsQualify() throws Exception { + // Commit A: requested=T1, completed=T3 + // currentInstantTime=T2 (before A's completion) + // Should return empty. + String t1 = "20240101010101000"; + String t2 = "20240101010102000"; + String t3 = "20240101010103000"; + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + HoodieCommitMetadata metadataA = createCommitMetadata(t1, "file-1"); + testTable.addCommit(t1, Option.of(t3), Option.of(metadataA)); + + metaClient.reloadActiveTimeline(); + Option>> result = + TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient, t2); + + assertFalse(result.isPresent(), + "Should return empty when no instants have completionTime < currentInstantTime"); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index ccc21373af3e9..3cc358f96f5cb 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -435,10 +435,12 @@ private CompletableFuture handleInFlightInstantsRequest(Co private void restoreEvents() { if (this.eventBuffers.nonEmpty()) { - final HoodieTimeline completedTimeline = this.metaClient.getActiveTimeline().filterCompletedInstants(); this.eventBuffers.getEventBufferStream() - .forEach(entry -> recommitInstant(completedTimeline, entry.getKey(), entry.getValue().getLeft(), entry.getValue().getRight())); - this.metaClient.reloadActiveTimeline(); + .forEach(entry -> { + this.metaClient.reloadActiveTimeline(); + final HoodieTimeline completedTimeline = this.metaClient.getActiveTimeline().filterCompletedInstants(); + recommitInstant(completedTimeline, entry.getKey(), entry.getValue().getLeft(), entry.getValue().getRight()); + }); } } @@ -608,8 +610,19 @@ private void handleWriteMetaEvent(WriteMetadataEvent event) { */ private boolean commitInstants(long checkpointId) { // use < instead of <= because the write metadata event sends the last known checkpoint id which is smaller than the current one. + boolean[] isFirstInstant = {true}; List result = this.eventBuffers.getEventBufferStream().filter(entry -> entry.getKey() < checkpointId) - .map(entry -> commitInstant(entry.getKey(), entry.getValue().getLeft(), entry.getValue().getRight())).collect(Collectors.toList()); + .map(entry -> { + if (isFirstInstant[0]) { + isFirstInstant[0] = false; + } else { + // Refresh the baseline for subsequent instants so that OCC conflict resolution + // sees the just-committed instant as completed, not as a concurrent conflict. + this.metaClient.reloadActiveTimeline(); + this.writeClient.preTxn(tableState.operationType, this.metaClient); + } + return commitInstant(entry.getKey(), entry.getValue().getLeft(), entry.getValue().getRight()); + }).collect(Collectors.toList()); return result.stream().anyMatch(i -> i); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index 5e4deee54c966..2046e0a5d4fa2 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -25,10 +25,12 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.HoodieIndex; @@ -379,6 +381,22 @@ public void testInsertDuplicateRecordsWithCDCMode() throws Exception { assertEquals(firstInstant.requestedTime(), secondWriteStats.get(0).getPrevCommit()); } + @Test + public void testCommittingMultipleInstantsWithOCC() throws Exception { + conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()); + preparePipeline(conf) + .consume(TestData.DATA_SET_INSERT) + .checkpoint(1) + .assertNextEvent(4, "par1,par2,par3,par4") + .consume(TestData.DATA_SET_UPDATE_INSERT) + .checkpoint(2) + .assertNextEvent(4, "par1,par2,par3,par4") + .checkpointComplete(2) + .checkWrittenData(EXPECTED2) + .end(); + } + @Override protected Map getExpectedBeforeCheckpointComplete() { return EXPECTED1; From 03d48107943172bfb5d1bcb48bef89b9114982bd Mon Sep 17 00:00:00 2001 From: syalla Date: Thu, 18 Jun 2026 00:29:37 +0000 Subject: [PATCH 4/5] Address review comments --- .../sink/StreamWriteOperatorCoordinator.java | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 3cc358f96f5cb..1a602dc2db9c1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -610,20 +610,23 @@ private void handleWriteMetaEvent(WriteMetadataEvent event) { */ private boolean commitInstants(long checkpointId) { // use < instead of <= because the write metadata event sends the last known checkpoint id which is smaller than the current one. - boolean[] isFirstInstant = {true}; - List result = this.eventBuffers.getEventBufferStream().filter(entry -> entry.getKey() < checkpointId) - .map(entry -> { - if (isFirstInstant[0]) { - isFirstInstant[0] = false; - } else { - // Refresh the baseline for subsequent instants so that OCC conflict resolution - // sees the just-committed instant as completed, not as a concurrent conflict. - this.metaClient.reloadActiveTimeline(); - this.writeClient.preTxn(tableState.operationType, this.metaClient); - } - return commitInstant(entry.getKey(), entry.getValue().getLeft(), entry.getValue().getRight()); - }).collect(Collectors.toList()); - return result.stream().anyMatch(i -> i); + List>> entries = this.eventBuffers.getEventBufferStream() + .filter(entry -> entry.getKey() < checkpointId) + .collect(Collectors.toList()); + boolean anyCommitted = false; + for (int i = 0; i < entries.size(); i++) { + if (i > 0) { + // Refresh the baseline for subsequent instants so that OCC conflict resolution + // sees the just-committed instant as completed, not as a concurrent conflict. + this.metaClient.reloadActiveTimeline(); + this.writeClient.preTxn(tableState.operationType, this.metaClient); + } + Map.Entry> entry = entries.get(i); + if (commitInstant(entry.getKey(), entry.getValue().getLeft(), entry.getValue().getRight())) { + anyCommitted = true; + } + } + return anyCommitted; } /** From d25fd02a64e68180c66c520b463a8a8b48db51c7 Mon Sep 17 00:00:00 2001 From: danny0405 Date: Thu, 18 Jun 2026 20:50:51 +0800 Subject: [PATCH 5/5] a potential fix --- .../apache/hudi/client/BaseHoodieClient.java | 7 +- .../hudi/client/utils/TransactionUtils.java | 49 +++-------- .../commit/BaseCommitActionExecutor.java | 2 +- .../client/utils/TestTransactionUtils.java | 85 +------------------ .../FlinkStreamingMetadataWriteHandler.java | 2 +- .../hudi/client/HoodieFlinkWriteClient.java | 50 ++++++++--- .../org/apache/hudi/util/TxnStateMemo.java | 70 +++++++++++++++ .../sink/StreamWriteOperatorCoordinator.java | 73 +++++++--------- .../apache/hudi/sink/utils/EventBuffers.java | 4 + 9 files changed, 163 insertions(+), 179 deletions(-) create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TxnStateMemo.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java index 30aa9f770b5f0..44a8c5bd7c2fb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java @@ -229,7 +229,8 @@ protected void resolveWriteConflict(HoodieTable table, HoodieCommitMetadata meta Timer.Context conflictResolutionTimer = metrics.getConflictResolutionCtx(); try { TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(), - Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), true, pendingInflightAndRequestedInstants); + Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), true, + pendingInflightAndRequestedInstants, getConflictResolutionExclusionInstants()); metrics.emitConflictResolutionSuccessful(); } catch (HoodieWriteConflictException e) { metrics.emitConflictResolutionFailed(); @@ -242,6 +243,10 @@ protected void resolveWriteConflict(HoodieTable table, HoodieCommitMetadata meta } } + protected Set getConflictResolutionExclusionInstants() { + return Collections.emptySet(); + } + /** * Finalize Write operation. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java index 16ec26d5d8366..4ef0b6dc33f5a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java @@ -39,14 +39,11 @@ import lombok.extern.slf4j.Slf4j; import java.io.IOException; -import java.util.Comparator; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN; -import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; import static org.apache.hudi.config.HoodieWriteConfig.ENABLE_SCHEMA_CONFLICT_RESOLUTION; @Slf4j @@ -55,14 +52,16 @@ public class TransactionUtils { /** * Resolve any write conflicts when committing data. * - * @param table - * @param currentTxnOwnerInstant - * @param thisCommitMetadata - * @param config - * @param lastCompletedTxnOwnerInstant - * @param pendingInstants - * @return - * @throws HoodieWriteConflictException + * @param table hoodie table instance to resolve conflicts against + * @param currentTxnOwnerInstant current transaction owner instant + * @param thisCommitMetadata commit metadata for the current transaction + * @param config write config + * @param lastCompletedTxnOwnerInstant last completed transaction observed before this write + * @param timelineRefreshedWithinTransaction whether the table timeline has already been refreshed within this transaction + * @param pendingInstants instants that were inflight or requested before the current write started + * @param conflictResolutionExclusionInstants instant requested times to exclude from conflict resolution candidates + * @return metadata for the resolved commit when conflict resolution succeeds + * @throws HoodieWriteConflictException when a write conflict cannot be resolved */ public static Option resolveWriteConflictIfAny( final HoodieTable table, @@ -71,7 +70,8 @@ public static Option resolveWriteConflictIfAny( final HoodieWriteConfig config, Option lastCompletedTxnOwnerInstant, boolean timelineRefreshedWithinTransaction, - Set pendingInstants) throws HoodieWriteConflictException { + Set pendingInstants, + Set conflictResolutionExclusionInstants) throws HoodieWriteConflictException { WriteOperationType operationType = thisCommitMetadata.map(HoodieCommitMetadata::getOperationType).orElse(null); if (config.needResolveWriteConflict(operationType, table.isMetadataTable(), config, table.getMetaClient().getTableConfig())) { // deal with pendingInstants @@ -85,7 +85,8 @@ public static Option resolveWriteConflictIfAny( Stream instantStream = Stream.concat(resolutionStrategy.getCandidateInstants( table.getMetaClient(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant, Option.of(config)), - completedInstantsDuringCurrentWriteOperation); + completedInstantsDuringCurrentWriteOperation) + .filter(instant -> !conflictResolutionExclusionInstants.contains(instant.requestedTime())); final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.orElseGet(HoodieCommitMetadata::new)); instantStream.forEach(instant -> { @@ -144,28 +145,6 @@ public static Option>> getLastCompletedT return getHoodieInstantAndMetaDataPair(metaClient, hoodieInstantOption); } - /** - * Get the last completed transaction hoodie instant before the given instant time. - * The returned instant has both requested time and completion time less than the given instant time, - * ensuring it was fully completed before the given instant was created. - * - * @param metaClient table meta client - * @param currentInstantTime the requested time of the current inflight instant - * @return the last completed instant before the given instant, with its extra metadata - */ - public static Option>> getLastCompletedTxnInstantAndMetadata( - HoodieTableMetaClient metaClient, String currentInstantTime) { - Option hoodieInstantOption = Option.fromJavaOptional( - metaClient.getActiveTimeline().getCommitsTimeline() - .filterCompletedInstants() - .findInstantsBefore(currentInstantTime) - .getInstantsAsStream() - .filter(instant -> instant.getCompletionTime() != null - && compareTimestamps(instant.getCompletionTime(), LESSER_THAN, currentInstantTime)) - .max(Comparator.comparing(HoodieInstant::requestedTime))); - return getHoodieInstantAndMetaDataPair(metaClient, hoodieInstantOption); - } - private static Option>> getHoodieInstantAndMetaDataPair(HoodieTableMetaClient metaClient, Option hoodieInstantOption) { try { if (hoodieInstantOption.isPresent()) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index ff9fcb8666926..4268a18edabaa 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -203,7 +203,7 @@ protected void autoCommit(HoodieWriteMetadata result) { setCommitMetadata(result); // table instance is created outside the transaction boundary so setting `timelineRefreshedWithinTransaction` to false below TransactionUtils.resolveWriteConflictIfAny(table, txnManager.getCurrentTransactionOwner(), - result.getCommitMetadata(), config, txnManager.getLastCompletedTransactionOwner(), false, pendingInflightAndRequestedInstants); + result.getCommitMetadata(), config, txnManager.getLastCompletedTransactionOwner(), false, pendingInflightAndRequestedInstants, Collections.emptySet()); commit(result); } finally { txnManager.endStateChange(inflightInstant); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java index a4437806c4dca..db196037722ad 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java @@ -30,7 +30,6 @@ import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieWriteConflictException; import org.apache.hudi.table.HoodieTable; @@ -42,7 +41,6 @@ import java.io.IOException; import java.util.Collections; -import java.util.Map; import java.util.Properties; import static org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCommit; @@ -51,10 +49,7 @@ import static org.apache.hudi.common.model.WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL; import static org.apache.hudi.common.model.WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL; import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -94,7 +89,7 @@ void resolveWriteConflictIfAnyThrowsExceptionIfConflict(boolean timelineRefreshe when(table.getMetaClient()).thenReturn(spyMetaClient); assertThrows(HoodieWriteConflictException.class, () -> TransactionUtils.resolveWriteConflictIfAny(table, currentInstant, Option.of(currentMetadata), writeConfig, - lastSuccessfulInstant, timelineRefreshedWithinTransaction, Collections.singleton(newInstantTime))); + lastSuccessfulInstant, timelineRefreshedWithinTransaction, Collections.singleton(newInstantTime), Collections.emptySet())); verify(spyMetaClient, times(timelineRefreshedWithinTransaction ? 0 : 1)).reloadActiveTimeline(); } @@ -133,84 +128,8 @@ void resolveWriteConflictIfAnyNoExceptionForMetadataTable() throws Exception { HoodieTableMetaClient spyMetaClient = spy(metaClient); when(table.getMetaClient()).thenReturn(spyMetaClient); Option actualResult = TransactionUtils.resolveWriteConflictIfAny(table, currentInstant, Option.of(currentMetadata), writeConfig, - lastSuccessfulInstant, false, Collections.singleton(newInstantTime)); + lastSuccessfulInstant, false, Collections.singleton(newInstantTime), Collections.emptySet()); // since we bypass entire conflict resolution verify(spyMetaClient, times(0)).reloadActiveTimeline(); } - - @Test - void getLastCompletedTxnInstantAndMetadataSelectsMaxRequestedTime() throws Exception { - // Simulate interleaved completions: - // Commit A: requested=T1, completed=T5 (slow commit) - // Commit B: requested=T2, completed=T3 (fast commit) - // currentInstantTime=T6 - // The method should return B (max requestedTime=T2), not A (max completionTime=T5). - String t1 = "20240101010101000"; - String t2 = "20240101010102000"; - String t3 = "20240101010103000"; - String t5 = "20240101010105000"; - String t6 = "20240101010106000"; - - HoodieTestTable testTable = HoodieTestTable.of(metaClient); - HoodieCommitMetadata metadataA = createCommitMetadata(t1, "file-1"); - HoodieCommitMetadata metadataB = createCommitMetadata(t2, "file-2"); - testTable.addCommit(t1, Option.of(t5), Option.of(metadataA)); - testTable.addCommit(t2, Option.of(t3), Option.of(metadataB)); - - metaClient.reloadActiveTimeline(); - Option>> result = - TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient, t6); - - assertTrue(result.isPresent()); - assertEquals(t2, result.get().getLeft().requestedTime(), - "Should select the instant with the latest requestedTime (T2), not the latest completionTime (T5)"); - } - - @Test - void getLastCompletedTxnInstantAndMetadataExcludesInstantsCompletedAfterCurrent() throws Exception { - // Commit A: requested=T1, completed=T3 (completed before current) - // Commit B: requested=T2, completed=T5 (completed after current) - // currentInstantTime=T4 - // Should return A only, since B's completionTime >= currentInstantTime. - String t1 = "20240101010101000"; - String t2 = "20240101010102000"; - String t3 = "20240101010103000"; - String t4 = "20240101010104000"; - String t5 = "20240101010105000"; - - HoodieTestTable testTable = HoodieTestTable.of(metaClient); - HoodieCommitMetadata metadataA = createCommitMetadata(t1, "file-1"); - HoodieCommitMetadata metadataB = createCommitMetadata(t2, "file-2"); - testTable.addCommit(t1, Option.of(t3), Option.of(metadataA)); - testTable.addCommit(t2, Option.of(t5), Option.of(metadataB)); - - metaClient.reloadActiveTimeline(); - Option>> result = - TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient, t4); - - assertTrue(result.isPresent()); - assertEquals(t1, result.get().getLeft().requestedTime(), - "Should only include instants whose completionTime < currentInstantTime"); - } - - @Test - void getLastCompletedTxnInstantAndMetadataReturnsEmptyWhenNoInstantsQualify() throws Exception { - // Commit A: requested=T1, completed=T3 - // currentInstantTime=T2 (before A's completion) - // Should return empty. - String t1 = "20240101010101000"; - String t2 = "20240101010102000"; - String t3 = "20240101010103000"; - - HoodieTestTable testTable = HoodieTestTable.of(metaClient); - HoodieCommitMetadata metadataA = createCommitMetadata(t1, "file-1"); - testTable.addCommit(t1, Option.of(t3), Option.of(metadataA)); - - metaClient.reloadActiveTimeline(); - Option>> result = - TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient, t2); - - assertFalse(result.isPresent(), - "Should return empty when no instants have completionTime < currentInstantTime"); - } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkStreamingMetadataWriteHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkStreamingMetadataWriteHandler.java index f500f99e0635b..220e1a8ac72fe 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkStreamingMetadataWriteHandler.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkStreamingMetadataWriteHandler.java @@ -93,7 +93,7 @@ public void startCommit(String instantTime, HoodieTable table) { public void cleanResources(String instantTime) { Option metadataWriterOpt = this.metadataWriterMap.remove(instantTime); if (metadataWriterOpt == null || metadataWriterOpt.isEmpty()) { - log.warn("Metadata writer for {} has not been initialized, no need to stop heartbeat.", instantTime); + log.debug("Metadata writer for {} has not been initialized or been closed already, skip the close.", instantTime); return; } try { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 66d7a71f0396d..98c311be3951a 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -34,7 +34,9 @@ import org.apache.hudi.common.model.TableServiceType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.FlinkHoodieIndexFactory; @@ -49,6 +51,7 @@ import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.table.action.commit.BucketType; import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper; +import org.apache.hudi.util.TxnStateMemo; import com.codahale.metrics.Timer; import lombok.AccessLevel; @@ -60,6 +63,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -97,6 +101,12 @@ public class HoodieFlinkWriteClient */ private final boolean isStreamingWriteMetadataTable; + /** + * Transaction state snapshots keyed by instant. + */ + private final TxnStateMemo txnStateMemo = new TxnStateMemo(); + private Set conflictResolutionExclusionInstants = Collections.emptySet(); + public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) { this(context, writeConfig, false); } @@ -136,6 +146,7 @@ public void cleanResources(String instantTime) { if (isStreamingWriteMetadataTable) { this.streamingMetadataWriteHandler.cleanResources(instantTime); } + this.txnStateMemo.slip(instantTime); } /** @@ -389,28 +400,39 @@ private BucketInfo createBucketInfo(HoodieRecord record) { * Refresh the last transaction metadata, * should be called before the Driver starts a new transaction with a reloaded metaclient. */ - public void preTxn(WriteOperationType operationType, HoodieTableMetaClient metaClient) { + public void preTxn(WriteOperationType operationType, HoodieTableMetaClient metaClient, String currentInstant, Collection sameWriterInstants) { if (txnManager.isLockRequired() && config.needResolveWriteConflict(operationType, metaClient.isMetadataTable(), config, metaClient.getTableConfig())) { - this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient); - this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient); + Option>> lastCompletedTxnAndMetadata = + TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient); + Set pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient); + Set conflictResolutionExclusionInstants = getConflictResolutionExclusionInstants(currentInstant, sameWriterInstants); + this.txnStateMemo.memo(currentInstant, lastCompletedTxnAndMetadata, conflictResolutionExclusionInstants, pendingInflightAndRequestedInstants); } tableServiceClient.startAsyncArchiveService(this); } - /** - * Initializes the transaction state for recommitting an inflight instant during recovery. - * Sets lastCompletedTxnAndMetadata to the last completed instant whose requested time - * and completion time are both before the given instant's requested time, ensuring - * OCC conflict resolution only checks against genuinely concurrent commits. - */ - public void preTxnForRecommit(WriteOperationType operationType, HoodieTableMetaClient metaClient, String currentInstantTime) { - if (txnManager.isLockRequired() && config.needResolveWriteConflict(operationType, metaClient.isMetadataTable(), config, metaClient.getTableConfig())) { - this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient, currentInstantTime); - this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient); - this.pendingInflightAndRequestedInstants.remove(currentInstantTime); + private Set getConflictResolutionExclusionInstants(String currentInstant, Collection sameWriterInstants) { + Set exclusionInstants = new HashSet<>(sameWriterInstants); + exclusionInstants.add(currentInstant); + return exclusionInstants; + } + + public void loadTxn(String instantTime) { + Option txnState = this.txnStateMemo.get(instantTime); + if (txnState.isPresent()) { + this.lastCompletedTxnAndMetadata = txnState.get().getLastCompletedTxnAndMetadata(); + this.conflictResolutionExclusionInstants = txnState.get().getConflictResolutionExclusionInstants(); + this.pendingInflightAndRequestedInstants = txnState.get().getPendingInflightAndRequestedInstants(); + tableServiceClient.setLastCompletedTxnAndMetadata(this.lastCompletedTxnAndMetadata); + tableServiceClient.setPendingInflightAndRequestedInstants(this.pendingInflightAndRequestedInstants); } } + @Override + protected Set getConflictResolutionExclusionInstants() { + return this.conflictResolutionExclusionInstants; + } + /** * Initialized the metadata table on start up, should only be called once on driver. */ diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TxnStateMemo.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TxnStateMemo.java new file mode 100644 index 0000000000000..d262201eaa6bd --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TxnStateMemo.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Memorizes transaction states by instant for Flink streaming writes. + */ +public class TxnStateMemo { + + private final Map memo = new HashMap<>(); + + public void memo(String instant, + Option>> lastCompletedTxnAndMetadata, + Set conflictResolutionExclusionInstants, + Set pendingInflightAndRequestedInstants) { + memo.put(instant, new TxnState( + lastCompletedTxnAndMetadata, + new HashSet<>(conflictResolutionExclusionInstants), + new HashSet<>(pendingInflightAndRequestedInstants))); + } + + public Option get(String instant) { + return Option.ofNullable(memo.get(instant)); + } + + public void slip(String instant) { + memo.remove(instant); + } + + public boolean contains(String currentInstant) { + return memo.containsKey(currentInstant); + } + + @Getter + @AllArgsConstructor(access = AccessLevel.PRIVATE) + public static class TxnState { + private final Option>> lastCompletedTxnAndMetadata; + private final Set conflictResolutionExclusionInstants; + private final Set pendingInflightAndRequestedInstants; + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 1a602dc2db9c1..73e57a84db842 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -435,12 +435,9 @@ private CompletableFuture handleInFlightInstantsRequest(Co private void restoreEvents() { if (this.eventBuffers.nonEmpty()) { + final HoodieTimeline completedTimeline = this.metaClient.reloadActiveTimeline().filterCompletedInstants(); this.eventBuffers.getEventBufferStream() - .forEach(entry -> { - this.metaClient.reloadActiveTimeline(); - final HoodieTimeline completedTimeline = this.metaClient.getActiveTimeline().filterCompletedInstants(); - recommitInstant(completedTimeline, entry.getKey(), entry.getValue().getLeft(), entry.getValue().getRight()); - }); + .forEach(entry -> recommitInstant(completedTimeline, entry.getKey(), entry.getValue().getLeft(), entry.getValue().getRight())); } } @@ -512,8 +509,6 @@ private void initEventBufferIfNecessary() { private String startInstant() { // refresh the meta client which is reused metaClient.reloadActiveTimeline(); - // refresh the last txn metadata - this.writeClient.preTxn(tableState.operationType, this.metaClient); // put the assignment in front of metadata generation, // because the instant request from write task is asynchronous. this.instant = this.writeClient.startCommit(tableState.commitAction, this.metaClient); @@ -525,6 +520,8 @@ private String startInstant() { this.writeClient.setWriteTimer(tableState.commitAction); log.info("Create instant [{}] for table [{}] with type [{}]", this.instant, this.conf.get(FlinkOptions.TABLE_NAME), conf.get(FlinkOptions.TABLE_TYPE)); + // refresh the last txn metadata + this.writeClient.preTxn(tableState.operationType, this.metaClient, this.instant, this.eventBuffers.getAllInstants()); return this.instant; } @@ -549,15 +546,14 @@ private boolean recommitInstant(HoodieTimeline completedTimeline, long checkpoin if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) { writeClient.getHeartbeatClient().start(instant); } - // Initialize the transaction state so that OCC conflict resolution uses the correct - // baseline: the last completed instant before this inflight instant was created. - // Without this, lastCompletedTxnAndMetadata is empty and conflict resolution checks - // against all completed instants on the timeline, causing false conflicts. - writeClient.preTxnForRecommit(tableState.operationType, this.metaClient, instant); + // Initialize the transaction state so same-writer instants can be excluded + // during OCC conflict resolution. + writeClient.preTxn(tableState.operationType, this.metaClient, instant, this.eventBuffers.getAllInstants()); return commitInstant(checkpointId, instant, bootstrapBuffer); } else { // clean the corresponding event buffer if the instant is already committed. eventBuffers.reset(checkpointId); + writeClient.cleanResources(instant); return false; } } @@ -610,23 +606,9 @@ private void handleWriteMetaEvent(WriteMetadataEvent event) { */ private boolean commitInstants(long checkpointId) { // use < instead of <= because the write metadata event sends the last known checkpoint id which is smaller than the current one. - List>> entries = this.eventBuffers.getEventBufferStream() - .filter(entry -> entry.getKey() < checkpointId) - .collect(Collectors.toList()); - boolean anyCommitted = false; - for (int i = 0; i < entries.size(); i++) { - if (i > 0) { - // Refresh the baseline for subsequent instants so that OCC conflict resolution - // sees the just-committed instant as completed, not as a concurrent conflict. - this.metaClient.reloadActiveTimeline(); - this.writeClient.preTxn(tableState.operationType, this.metaClient); - } - Map.Entry> entry = entries.get(i); - if (commitInstant(entry.getKey(), entry.getValue().getLeft(), entry.getValue().getRight())) { - anyCommitted = true; - } - } - return anyCommitted; + List result = this.eventBuffers.getEventBufferStream().filter(entry -> entry.getKey() < checkpointId) + .map(entry -> commitInstant(entry.getKey(), entry.getValue().getLeft(), entry.getValue().getRight())).collect(Collectors.toList()); + return result.stream().anyMatch(i -> i); } /** @@ -635,24 +617,26 @@ private boolean commitInstants(long checkpointId) { * @return true if the write statuses are committed successfully. */ private boolean commitInstant(long checkpointId, String instant, EventBuffer eventBuffer) { - if (eventBuffer.isEmptyDataWriteBuffer()) { - // all the data write tasks are reset by failover, reset the while buffer and returns early. - this.eventBuffers.reset(checkpointId); - // stop the heart beat for lazy cleaning - writeClient.cleanResources(instant); - return false; - } + try { + if (eventBuffer.isEmptyDataWriteBuffer()) { + // all the data write tasks are reset by failover, reset the while buffer and returns early. + this.eventBuffers.reset(checkpointId); + return false; + } - List dataWriteResults = eventBuffer.collectDataWriteStatuses(); - if (dataWriteResults.isEmpty() && !OptionsResolver.allowCommitOnEmptyBatch(conf)) { - // No data has written, reset the buffer and returns early - this.eventBuffers.reset(checkpointId); - // stop the heart beat for lazy cleaning + List dataWriteResults = eventBuffer.collectDataWriteStatuses(); + if (dataWriteResults.isEmpty() && !OptionsResolver.allowCommitOnEmptyBatch(conf)) { + // No data has written, reset the buffer and returns early + this.eventBuffers.reset(checkpointId); + return false; + } + doCommit(checkpointId, instant, dataWriteResults, eventBuffer.collectIndexWriteStatuses()); + return true; + } finally { + // Stop the heartbeat and remove the memoized transaction state regardless of + // whether commit succeeds or fails before the coordinator restarts. writeClient.cleanResources(instant); - return false; } - doCommit(checkpointId, instant, dataWriteResults, eventBuffer.collectIndexWriteStatuses()); - return true; } /** @@ -673,6 +657,7 @@ private void doCommit(long checkpointId, String instant, List dataW FlinkValidatorUtils.runValidators(conf, instant, allWriteStatus, checkpointCommitMetadata, () -> StreamerUtil.getPreviousCommitMetadata(this.metaClient)); + this.writeClient.loadTxn(instant); boolean success = writeClient.commit(instant, allWriteStatus, Option.of(checkpointCommitMetadata), tableState.commitAction, partitionToReplacedFileIds); if (success) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java index 87e1010e2227a..52c6cc8587597 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java @@ -133,6 +133,10 @@ public HashMap getAllCheckpointIdAndInstants() { return result; } + public Collection getAllInstants() { + return this.eventBuffers.values().stream().map(Pair::getLeft).collect(Collectors.toList()); + } + public void initNewEventBuffer(long checkpointId, String instantTime) { this.eventBuffers.put(checkpointId, Pair.of(instantTime, new EventBuffer(dataWriteParallelism, indexWriteParallelism))); }