diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java index 30aa9f770b5f0..44a8c5bd7c2fb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java @@ -229,7 +229,8 @@ protected void resolveWriteConflict(HoodieTable table, HoodieCommitMetadata meta Timer.Context conflictResolutionTimer = metrics.getConflictResolutionCtx(); try { TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(), - Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), true, pendingInflightAndRequestedInstants); + Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), true, + pendingInflightAndRequestedInstants, getConflictResolutionExclusionInstants()); metrics.emitConflictResolutionSuccessful(); } catch (HoodieWriteConflictException e) { metrics.emitConflictResolutionFailed(); @@ -242,6 +243,10 @@ protected void resolveWriteConflict(HoodieTable table, HoodieCommitMetadata meta } } + protected Set getConflictResolutionExclusionInstants() { + return Collections.emptySet(); + } + /** * Finalize Write operation. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java index 6b5ac8c575aa4..4ef0b6dc33f5a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java @@ -52,14 +52,16 @@ public class TransactionUtils { /** * Resolve any write conflicts when committing data. * - * @param table - * @param currentTxnOwnerInstant - * @param thisCommitMetadata - * @param config - * @param lastCompletedTxnOwnerInstant - * @param pendingInstants - * @return - * @throws HoodieWriteConflictException + * @param table hoodie table instance to resolve conflicts against + * @param currentTxnOwnerInstant current transaction owner instant + * @param thisCommitMetadata commit metadata for the current transaction + * @param config write config + * @param lastCompletedTxnOwnerInstant last completed transaction observed before this write + * @param timelineRefreshedWithinTransaction whether the table timeline has already been refreshed within this transaction + * @param pendingInstants instants that were inflight or requested before the current write started + * @param conflictResolutionExclusionInstants instant requested times to exclude from conflict resolution candidates + * @return metadata for the resolved commit when conflict resolution succeeds + * @throws HoodieWriteConflictException when a write conflict cannot be resolved */ public static Option resolveWriteConflictIfAny( final HoodieTable table, @@ -68,7 +70,8 @@ public static Option resolveWriteConflictIfAny( final HoodieWriteConfig config, Option lastCompletedTxnOwnerInstant, boolean timelineRefreshedWithinTransaction, - Set pendingInstants) throws HoodieWriteConflictException { + Set pendingInstants, + Set conflictResolutionExclusionInstants) throws HoodieWriteConflictException { WriteOperationType operationType = thisCommitMetadata.map(HoodieCommitMetadata::getOperationType).orElse(null); if (config.needResolveWriteConflict(operationType, table.isMetadataTable(), config, table.getMetaClient().getTableConfig())) { // deal with pendingInstants @@ -82,7 +85,8 @@ public static Option resolveWriteConflictIfAny( Stream instantStream = Stream.concat(resolutionStrategy.getCandidateInstants( table.getMetaClient(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant, Option.of(config)), - completedInstantsDuringCurrentWriteOperation); + completedInstantsDuringCurrentWriteOperation) + .filter(instant -> !conflictResolutionExclusionInstants.contains(instant.requestedTime())); final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.orElseGet(HoodieCommitMetadata::new)); instantStream.forEach(instant -> { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index ff9fcb8666926..4268a18edabaa 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -203,7 +203,7 @@ protected void autoCommit(HoodieWriteMetadata result) { setCommitMetadata(result); // table instance is created outside the transaction boundary so setting `timelineRefreshedWithinTransaction` to false below TransactionUtils.resolveWriteConflictIfAny(table, txnManager.getCurrentTransactionOwner(), - result.getCommitMetadata(), config, txnManager.getLastCompletedTransactionOwner(), false, pendingInflightAndRequestedInstants); + result.getCommitMetadata(), config, txnManager.getLastCompletedTransactionOwner(), false, pendingInflightAndRequestedInstants, Collections.emptySet()); commit(result); } finally { txnManager.endStateChange(inflightInstant); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java index c243a49d599ac..db196037722ad 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java @@ -89,7 +89,7 @@ void resolveWriteConflictIfAnyThrowsExceptionIfConflict(boolean timelineRefreshe when(table.getMetaClient()).thenReturn(spyMetaClient); assertThrows(HoodieWriteConflictException.class, () -> TransactionUtils.resolveWriteConflictIfAny(table, currentInstant, Option.of(currentMetadata), writeConfig, - lastSuccessfulInstant, timelineRefreshedWithinTransaction, Collections.singleton(newInstantTime))); + lastSuccessfulInstant, timelineRefreshedWithinTransaction, Collections.singleton(newInstantTime), Collections.emptySet())); verify(spyMetaClient, times(timelineRefreshedWithinTransaction ? 0 : 1)).reloadActiveTimeline(); } @@ -128,7 +128,7 @@ void resolveWriteConflictIfAnyNoExceptionForMetadataTable() throws Exception { HoodieTableMetaClient spyMetaClient = spy(metaClient); when(table.getMetaClient()).thenReturn(spyMetaClient); Option actualResult = TransactionUtils.resolveWriteConflictIfAny(table, currentInstant, Option.of(currentMetadata), writeConfig, - lastSuccessfulInstant, false, Collections.singleton(newInstantTime)); + lastSuccessfulInstant, false, Collections.singleton(newInstantTime), Collections.emptySet()); // since we bypass entire conflict resolution verify(spyMetaClient, times(0)).reloadActiveTimeline(); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkStreamingMetadataWriteHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkStreamingMetadataWriteHandler.java index f500f99e0635b..220e1a8ac72fe 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkStreamingMetadataWriteHandler.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkStreamingMetadataWriteHandler.java @@ -93,7 +93,7 @@ public void startCommit(String instantTime, HoodieTable table) { public void cleanResources(String instantTime) { Option metadataWriterOpt = this.metadataWriterMap.remove(instantTime); if (metadataWriterOpt == null || metadataWriterOpt.isEmpty()) { - log.warn("Metadata writer for {} has not been initialized, no need to stop heartbeat.", instantTime); + log.debug("Metadata writer for {} has not been initialized or been closed already, skip the close.", instantTime); return; } try { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 228209cf1df6f..98c311be3951a 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -34,7 +34,9 @@ import org.apache.hudi.common.model.TableServiceType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.FlinkHoodieIndexFactory; @@ -49,6 +51,7 @@ import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.table.action.commit.BucketType; import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper; +import org.apache.hudi.util.TxnStateMemo; import com.codahale.metrics.Timer; import lombok.AccessLevel; @@ -60,6 +63,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -97,6 +101,12 @@ public class HoodieFlinkWriteClient */ private final boolean isStreamingWriteMetadataTable; + /** + * Transaction state snapshots keyed by instant. + */ + private final TxnStateMemo txnStateMemo = new TxnStateMemo(); + private Set conflictResolutionExclusionInstants = Collections.emptySet(); + public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) { this(context, writeConfig, false); } @@ -136,6 +146,7 @@ public void cleanResources(String instantTime) { if (isStreamingWriteMetadataTable) { this.streamingMetadataWriteHandler.cleanResources(instantTime); } + this.txnStateMemo.slip(instantTime); } /** @@ -389,14 +400,39 @@ private BucketInfo createBucketInfo(HoodieRecord record) { * Refresh the last transaction metadata, * should be called before the Driver starts a new transaction with a reloaded metaclient. */ - public void preTxn(WriteOperationType operationType, HoodieTableMetaClient metaClient) { + public void preTxn(WriteOperationType operationType, HoodieTableMetaClient metaClient, String currentInstant, Collection sameWriterInstants) { if (txnManager.isLockRequired() && config.needResolveWriteConflict(operationType, metaClient.isMetadataTable(), config, metaClient.getTableConfig())) { - this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient); - this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient); + Option>> lastCompletedTxnAndMetadata = + TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient); + Set pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient); + Set conflictResolutionExclusionInstants = getConflictResolutionExclusionInstants(currentInstant, sameWriterInstants); + this.txnStateMemo.memo(currentInstant, lastCompletedTxnAndMetadata, conflictResolutionExclusionInstants, pendingInflightAndRequestedInstants); } tableServiceClient.startAsyncArchiveService(this); } + private Set getConflictResolutionExclusionInstants(String currentInstant, Collection sameWriterInstants) { + Set exclusionInstants = new HashSet<>(sameWriterInstants); + exclusionInstants.add(currentInstant); + return exclusionInstants; + } + + public void loadTxn(String instantTime) { + Option txnState = this.txnStateMemo.get(instantTime); + if (txnState.isPresent()) { + this.lastCompletedTxnAndMetadata = txnState.get().getLastCompletedTxnAndMetadata(); + this.conflictResolutionExclusionInstants = txnState.get().getConflictResolutionExclusionInstants(); + this.pendingInflightAndRequestedInstants = txnState.get().getPendingInflightAndRequestedInstants(); + tableServiceClient.setLastCompletedTxnAndMetadata(this.lastCompletedTxnAndMetadata); + tableServiceClient.setPendingInflightAndRequestedInstants(this.pendingInflightAndRequestedInstants); + } + } + + @Override + protected Set getConflictResolutionExclusionInstants() { + return this.conflictResolutionExclusionInstants; + } + /** * Initialized the metadata table on start up, should only be called once on driver. */ diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TxnStateMemo.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TxnStateMemo.java new file mode 100644 index 0000000000000..d262201eaa6bd --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/TxnStateMemo.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Memorizes transaction states by instant for Flink streaming writes. + */ +public class TxnStateMemo { + + private final Map memo = new HashMap<>(); + + public void memo(String instant, + Option>> lastCompletedTxnAndMetadata, + Set conflictResolutionExclusionInstants, + Set pendingInflightAndRequestedInstants) { + memo.put(instant, new TxnState( + lastCompletedTxnAndMetadata, + new HashSet<>(conflictResolutionExclusionInstants), + new HashSet<>(pendingInflightAndRequestedInstants))); + } + + public Option get(String instant) { + return Option.ofNullable(memo.get(instant)); + } + + public void slip(String instant) { + memo.remove(instant); + } + + public boolean contains(String currentInstant) { + return memo.containsKey(currentInstant); + } + + @Getter + @AllArgsConstructor(access = AccessLevel.PRIVATE) + public static class TxnState { + private final Option>> lastCompletedTxnAndMetadata; + private final Set conflictResolutionExclusionInstants; + private final Set pendingInflightAndRequestedInstants; + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 7b9095258903e..73e57a84db842 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -435,10 +435,9 @@ private CompletableFuture handleInFlightInstantsRequest(Co private void restoreEvents() { if (this.eventBuffers.nonEmpty()) { - final HoodieTimeline completedTimeline = this.metaClient.getActiveTimeline().filterCompletedInstants(); + final HoodieTimeline completedTimeline = this.metaClient.reloadActiveTimeline().filterCompletedInstants(); this.eventBuffers.getEventBufferStream() .forEach(entry -> recommitInstant(completedTimeline, entry.getKey(), entry.getValue().getLeft(), entry.getValue().getRight())); - this.metaClient.reloadActiveTimeline(); } } @@ -510,8 +509,6 @@ private void initEventBufferIfNecessary() { private String startInstant() { // refresh the meta client which is reused metaClient.reloadActiveTimeline(); - // refresh the last txn metadata - this.writeClient.preTxn(tableState.operationType, this.metaClient); // put the assignment in front of metadata generation, // because the instant request from write task is asynchronous. this.instant = this.writeClient.startCommit(tableState.commitAction, this.metaClient); @@ -523,6 +520,8 @@ private String startInstant() { this.writeClient.setWriteTimer(tableState.commitAction); log.info("Create instant [{}] for table [{}] with type [{}]", this.instant, this.conf.get(FlinkOptions.TABLE_NAME), conf.get(FlinkOptions.TABLE_TYPE)); + // refresh the last txn metadata + this.writeClient.preTxn(tableState.operationType, this.metaClient, this.instant, this.eventBuffers.getAllInstants()); return this.instant; } @@ -547,10 +546,14 @@ private boolean recommitInstant(HoodieTimeline completedTimeline, long checkpoin if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) { writeClient.getHeartbeatClient().start(instant); } + // Initialize the transaction state so same-writer instants can be excluded + // during OCC conflict resolution. + writeClient.preTxn(tableState.operationType, this.metaClient, instant, this.eventBuffers.getAllInstants()); return commitInstant(checkpointId, instant, bootstrapBuffer); } else { // clean the corresponding event buffer if the instant is already committed. eventBuffers.reset(checkpointId); + writeClient.cleanResources(instant); return false; } } @@ -614,24 +617,26 @@ private boolean commitInstants(long checkpointId) { * @return true if the write statuses are committed successfully. */ private boolean commitInstant(long checkpointId, String instant, EventBuffer eventBuffer) { - if (eventBuffer.isEmptyDataWriteBuffer()) { - // all the data write tasks are reset by failover, reset the while buffer and returns early. - this.eventBuffers.reset(checkpointId); - // stop the heart beat for lazy cleaning - writeClient.cleanResources(instant); - return false; - } + try { + if (eventBuffer.isEmptyDataWriteBuffer()) { + // all the data write tasks are reset by failover, reset the while buffer and returns early. + this.eventBuffers.reset(checkpointId); + return false; + } - List dataWriteResults = eventBuffer.collectDataWriteStatuses(); - if (dataWriteResults.isEmpty() && !OptionsResolver.allowCommitOnEmptyBatch(conf)) { - // No data has written, reset the buffer and returns early - this.eventBuffers.reset(checkpointId); - // stop the heart beat for lazy cleaning + List dataWriteResults = eventBuffer.collectDataWriteStatuses(); + if (dataWriteResults.isEmpty() && !OptionsResolver.allowCommitOnEmptyBatch(conf)) { + // No data has written, reset the buffer and returns early + this.eventBuffers.reset(checkpointId); + return false; + } + doCommit(checkpointId, instant, dataWriteResults, eventBuffer.collectIndexWriteStatuses()); + return true; + } finally { + // Stop the heartbeat and remove the memoized transaction state regardless of + // whether commit succeeds or fails before the coordinator restarts. writeClient.cleanResources(instant); - return false; } - doCommit(checkpointId, instant, dataWriteResults, eventBuffer.collectIndexWriteStatuses()); - return true; } /** @@ -652,6 +657,7 @@ private void doCommit(long checkpointId, String instant, List dataW FlinkValidatorUtils.runValidators(conf, instant, allWriteStatus, checkpointCommitMetadata, () -> StreamerUtil.getPreviousCommitMetadata(this.metaClient)); + this.writeClient.loadTxn(instant); boolean success = writeClient.commit(instant, allWriteStatus, Option.of(checkpointCommitMetadata), tableState.commitAction, partitionToReplacedFileIds); if (success) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java index 87e1010e2227a..52c6cc8587597 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java @@ -133,6 +133,10 @@ public HashMap getAllCheckpointIdAndInstants() { return result; } + public Collection getAllInstants() { + return this.eventBuffers.values().stream().map(Pair::getLeft).collect(Collectors.toList()); + } + public void initNewEventBuffer(long checkpointId, String instantTime) { this.eventBuffers.put(checkpointId, Pair.of(instantTime, new EventBuffer(dataWriteParallelism, indexWriteParallelism))); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index 5e4deee54c966..2046e0a5d4fa2 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -25,10 +25,12 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.HoodieIndex; @@ -379,6 +381,22 @@ public void testInsertDuplicateRecordsWithCDCMode() throws Exception { assertEquals(firstInstant.requestedTime(), secondWriteStats.get(0).getPrevCommit()); } + @Test + public void testCommittingMultipleInstantsWithOCC() throws Exception { + conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()); + preparePipeline(conf) + .consume(TestData.DATA_SET_INSERT) + .checkpoint(1) + .assertNextEvent(4, "par1,par2,par3,par4") + .consume(TestData.DATA_SET_UPDATE_INSERT) + .checkpoint(2) + .assertNextEvent(4, "par1,par2,par3,par4") + .checkpointComplete(2) + .checkWrittenData(EXPECTED2) + .end(); + } + @Override protected Map getExpectedBeforeCheckpointComplete() { return EXPECTED1;