Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ protected void resolveWriteConflict(HoodieTable table, HoodieCommitMetadata meta
Timer.Context conflictResolutionTimer = metrics.getConflictResolutionCtx();
try {
TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), true, pendingInflightAndRequestedInstants);
Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), true,
pendingInflightAndRequestedInstants, getConflictResolutionExclusionInstants());
metrics.emitConflictResolutionSuccessful();
} catch (HoodieWriteConflictException e) {
metrics.emitConflictResolutionFailed();
Expand All @@ -242,6 +243,10 @@ protected void resolveWriteConflict(HoodieTable table, HoodieCommitMetadata meta
}
}

protected Set<String> getConflictResolutionExclusionInstants() {
return Collections.emptySet();
}

/**
* Finalize Write operation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,16 @@ public class TransactionUtils {
/**
* Resolve any write conflicts when committing data.
*
* @param table
* @param currentTxnOwnerInstant
* @param thisCommitMetadata
* @param config
* @param lastCompletedTxnOwnerInstant
* @param pendingInstants
* @return
* @throws HoodieWriteConflictException
* @param table hoodie table instance to resolve conflicts against
* @param currentTxnOwnerInstant current transaction owner instant
* @param thisCommitMetadata commit metadata for the current transaction
* @param config write config
* @param lastCompletedTxnOwnerInstant last completed transaction observed before this write
* @param timelineRefreshedWithinTransaction whether the table timeline has already been refreshed within this transaction
* @param pendingInstants instants that were inflight or requested before the current write started
* @param conflictResolutionExclusionInstants instant requested times to exclude from conflict resolution candidates
* @return metadata for the resolved commit when conflict resolution succeeds
* @throws HoodieWriteConflictException when a write conflict cannot be resolved
*/
public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(
final HoodieTable table,
Expand All @@ -68,7 +70,8 @@ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(
final HoodieWriteConfig config,
Option<HoodieInstant> lastCompletedTxnOwnerInstant,
boolean timelineRefreshedWithinTransaction,
Set<String> pendingInstants) throws HoodieWriteConflictException {
Set<String> pendingInstants,
Set<String> conflictResolutionExclusionInstants) throws HoodieWriteConflictException {
WriteOperationType operationType = thisCommitMetadata.map(HoodieCommitMetadata::getOperationType).orElse(null);
if (config.needResolveWriteConflict(operationType, table.isMetadataTable(), config, table.getMetaClient().getTableConfig())) {
// deal with pendingInstants
Expand All @@ -82,7 +85,8 @@ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(

Stream<HoodieInstant> instantStream = Stream.concat(resolutionStrategy.getCandidateInstants(
table.getMetaClient(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant, Option.of(config)),
completedInstantsDuringCurrentWriteOperation);
completedInstantsDuringCurrentWriteOperation)
.filter(instant -> !conflictResolutionExclusionInstants.contains(instant.requestedTime()));

final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.orElseGet(HoodieCommitMetadata::new));
instantStream.forEach(instant -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ protected void autoCommit(HoodieWriteMetadata<O> result) {
setCommitMetadata(result);
// table instance is created outside the transaction boundary so setting `timelineRefreshedWithinTransaction` to false below
TransactionUtils.resolveWriteConflictIfAny(table, txnManager.getCurrentTransactionOwner(),
result.getCommitMetadata(), config, txnManager.getLastCompletedTransactionOwner(), false, pendingInflightAndRequestedInstants);
result.getCommitMetadata(), config, txnManager.getLastCompletedTransactionOwner(), false, pendingInflightAndRequestedInstants, Collections.emptySet());
commit(result);
} finally {
txnManager.endStateChange(inflightInstant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void resolveWriteConflictIfAnyThrowsExceptionIfConflict(boolean timelineRefreshe
when(table.getMetaClient()).thenReturn(spyMetaClient);
assertThrows(HoodieWriteConflictException.class,
() -> TransactionUtils.resolveWriteConflictIfAny(table, currentInstant, Option.of(currentMetadata), writeConfig,
lastSuccessfulInstant, timelineRefreshedWithinTransaction, Collections.singleton(newInstantTime)));
lastSuccessfulInstant, timelineRefreshedWithinTransaction, Collections.singleton(newInstantTime), Collections.emptySet()));
verify(spyMetaClient, times(timelineRefreshedWithinTransaction ? 0 : 1)).reloadActiveTimeline();
}

Expand Down Expand Up @@ -128,7 +128,7 @@ void resolveWriteConflictIfAnyNoExceptionForMetadataTable() throws Exception {
HoodieTableMetaClient spyMetaClient = spy(metaClient);
when(table.getMetaClient()).thenReturn(spyMetaClient);
Option<HoodieCommitMetadata> actualResult = TransactionUtils.resolveWriteConflictIfAny(table, currentInstant, Option.of(currentMetadata), writeConfig,
lastSuccessfulInstant, false, Collections.singleton(newInstantTime));
lastSuccessfulInstant, false, Collections.singleton(newInstantTime), Collections.emptySet());
// since we bypass entire conflict resolution
verify(spyMetaClient, times(0)).reloadActiveTimeline();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void startCommit(String instantTime, HoodieTable table) {
public void cleanResources(String instantTime) {
Option<HoodieTableMetadataWriter> metadataWriterOpt = this.metadataWriterMap.remove(instantTime);
if (metadataWriterOpt == null || metadataWriterOpt.isEmpty()) {
log.warn("Metadata writer for {} has not been initialized, no need to stop heartbeat.", instantTime);
log.debug("Metadata writer for {} has not been initialized or been closed already, skip the close.", instantTime);
return;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndexFactory;
Expand All @@ -49,6 +51,7 @@
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
import org.apache.hudi.util.TxnStateMemo;

import com.codahale.metrics.Timer;
import lombok.AccessLevel;
Expand All @@ -60,6 +63,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -97,6 +101,12 @@ public class HoodieFlinkWriteClient<T>
*/
private final boolean isStreamingWriteMetadataTable;

/**
* Transaction state snapshots keyed by instant.
*/
private final TxnStateMemo txnStateMemo = new TxnStateMemo();
private Set<String> conflictResolutionExclusionInstants = Collections.emptySet();

public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
this(context, writeConfig, false);
}
Expand Down Expand Up @@ -136,6 +146,7 @@ public void cleanResources(String instantTime) {
if (isStreamingWriteMetadataTable) {
this.streamingMetadataWriteHandler.cleanResources(instantTime);
}
this.txnStateMemo.slip(instantTime);
}

/**
Expand Down Expand Up @@ -389,14 +400,39 @@ private BucketInfo createBucketInfo(HoodieRecord record) {
* Refresh the last transaction metadata,
* should be called before the Driver starts a new transaction with a reloaded metaclient.
*/
public void preTxn(WriteOperationType operationType, HoodieTableMetaClient metaClient) {
public void preTxn(WriteOperationType operationType, HoodieTableMetaClient metaClient, String currentInstant, Collection<String> sameWriterInstants) {
if (txnManager.isLockRequired() && config.needResolveWriteConflict(operationType, metaClient.isMetadataTable(), config, metaClient.getTableConfig())) {
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient);
Option<Pair<HoodieInstant, Map<String, String>>> lastCompletedTxnAndMetadata =
TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
Set<String> pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient);
Set<String> conflictResolutionExclusionInstants = getConflictResolutionExclusionInstants(currentInstant, sameWriterInstants);
this.txnStateMemo.memo(currentInstant, lastCompletedTxnAndMetadata, conflictResolutionExclusionInstants, pendingInflightAndRequestedInstants);
}
tableServiceClient.startAsyncArchiveService(this);
}

private Set<String> getConflictResolutionExclusionInstants(String currentInstant, Collection<String> sameWriterInstants) {
Set<String> exclusionInstants = new HashSet<>(sameWriterInstants);
exclusionInstants.add(currentInstant);
return exclusionInstants;
}

public void loadTxn(String instantTime) {
Option<TxnStateMemo.TxnState> txnState = this.txnStateMemo.get(instantTime);
if (txnState.isPresent()) {
this.lastCompletedTxnAndMetadata = txnState.get().getLastCompletedTxnAndMetadata();
this.conflictResolutionExclusionInstants = txnState.get().getConflictResolutionExclusionInstants();
this.pendingInflightAndRequestedInstants = txnState.get().getPendingInflightAndRequestedInstants();
tableServiceClient.setLastCompletedTxnAndMetadata(this.lastCompletedTxnAndMetadata);
tableServiceClient.setPendingInflightAndRequestedInstants(this.pendingInflightAndRequestedInstants);
}
}

@Override
protected Set<String> getConflictResolutionExclusionInstants() {
return this.conflictResolutionExclusionInstants;
}

/**
* Initialized the metadata table on start up, should only be called once on driver.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.util;

import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Memorizes transaction states by instant for Flink streaming writes.
*/
public class TxnStateMemo {

private final Map<String, TxnState> memo = new HashMap<>();

public void memo(String instant,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you rename this to record or put? Using a noun as a verb (txnStateMemo.memo(...) at the call site) is unusual in Java APIs and reads a bit awkwardly.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Option<Pair<HoodieInstant, Map<String, String>>> lastCompletedTxnAndMetadata,
Set<String> conflictResolutionExclusionInstants,
Set<String> pendingInflightAndRequestedInstants) {
memo.put(instant, new TxnState(
lastCompletedTxnAndMetadata,
new HashSet<>(conflictResolutionExclusionInstants),
new HashSet<>(pendingInflightAndRequestedInstants)));
}

public Option<TxnState> get(String instant) {
return Option.ofNullable(memo.get(instant));
}

public void slip(String instant) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: slip doesn't carry an obvious meaning here — remove or evict would tell a reader immediately what this does without needing to look at the body.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

memo.remove(instant);
}

public boolean contains(String currentInstant) {
return memo.containsKey(currentInstant);
}

@Getter
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public static class TxnState {
private final Option<Pair<HoodieInstant, Map<String, String>>> lastCompletedTxnAndMetadata;
private final Set<String> conflictResolutionExclusionInstants;
private final Set<String> pendingInflightAndRequestedInstants;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -435,10 +435,9 @@ private CompletableFuture<CoordinationResponse> handleInFlightInstantsRequest(Co

private void restoreEvents() {
if (this.eventBuffers.nonEmpty()) {
final HoodieTimeline completedTimeline = this.metaClient.getActiveTimeline().filterCompletedInstants();
final HoodieTimeline completedTimeline = this.metaClient.reloadActiveTimeline().filterCompletedInstants();
this.eventBuffers.getEventBufferStream()
.forEach(entry -> recommitInstant(completedTimeline, entry.getKey(), entry.getValue().getLeft(), entry.getValue().getRight()));
this.metaClient.reloadActiveTimeline();
}
}

Expand Down Expand Up @@ -510,8 +509,6 @@ private void initEventBufferIfNecessary() {
private String startInstant() {
// refresh the meta client which is reused
metaClient.reloadActiveTimeline();
// refresh the last txn metadata
this.writeClient.preTxn(tableState.operationType, this.metaClient);
// put the assignment in front of metadata generation,
// because the instant request from write task is asynchronous.
this.instant = this.writeClient.startCommit(tableState.commitAction, this.metaClient);
Expand All @@ -523,6 +520,8 @@ private String startInstant() {
this.writeClient.setWriteTimer(tableState.commitAction);
log.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
this.conf.get(FlinkOptions.TABLE_NAME), conf.get(FlinkOptions.TABLE_TYPE));
// refresh the last txn metadata
this.writeClient.preTxn(tableState.operationType, this.metaClient, this.instant, this.eventBuffers.getAllInstants());
return this.instant;
}

Expand All @@ -547,10 +546,14 @@ private boolean recommitInstant(HoodieTimeline completedTimeline, long checkpoin
if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
writeClient.getHeartbeatClient().start(instant);
}
// Initialize the transaction state so same-writer instants can be excluded
// during OCC conflict resolution.
writeClient.preTxn(tableState.operationType, this.metaClient, instant, this.eventBuffers.getAllInstants());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 I think the multi-pending recommit case is still broken — could you double-check? With A and B both inflight on restart, after A's recommit succeeds, doCommit calls eventBuffers.reset(ckp_A), so when B's preTxn runs next, eventBuffers.getAllInstants() returns {B} only — A is no longer in the exclusion set. The coordinator's metaClient isn't reloaded between iterations either (the single reloadActiveTimeline() happens once at restoreEvents entry), so getLastCompletedTxnInstantAndMetadata(metaClient) still returns the pre-A txn as the cutoff. Inside the lock, preCommit builds a fresh table whose timeline now shows A completed; getCandidateInstants returns A via findInstantsAfter(preA.requestedTime()), the exclusion filter drops only B, and A vs B (same file groups from the same writer) throws HoodieWriteConflictException — exactly the failure this PR is fixing. Snapshotting eventBuffers.getAllInstants() once before the loop (or calling metaClient.reloadActiveTimeline() before each preTxn) would close this. @danny0405 could you sanity-check this trace?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

return commitInstant(checkpointId, instant, bootstrapBuffer);
} else {
// clean the corresponding event buffer if the instant is already committed.
eventBuffers.reset(checkpointId);
writeClient.cleanResources(instant);
return false;
}
}
Expand Down Expand Up @@ -614,24 +617,26 @@ private boolean commitInstants(long checkpointId) {
* @return true if the write statuses are committed successfully.
*/
private boolean commitInstant(long checkpointId, String instant, EventBuffer eventBuffer) {
if (eventBuffer.isEmptyDataWriteBuffer()) {
// all the data write tasks are reset by failover, reset the while buffer and returns early.
this.eventBuffers.reset(checkpointId);
// stop the heart beat for lazy cleaning
writeClient.cleanResources(instant);
return false;
}
try {
if (eventBuffer.isEmptyDataWriteBuffer()) {
// all the data write tasks are reset by failover, reset the while buffer and returns early.
this.eventBuffers.reset(checkpointId);
return false;
}

List<WriteStatus> dataWriteResults = eventBuffer.collectDataWriteStatuses();
if (dataWriteResults.isEmpty() && !OptionsResolver.allowCommitOnEmptyBatch(conf)) {
// No data has written, reset the buffer and returns early
this.eventBuffers.reset(checkpointId);
// stop the heart beat for lazy cleaning
List<WriteStatus> dataWriteResults = eventBuffer.collectDataWriteStatuses();
if (dataWriteResults.isEmpty() && !OptionsResolver.allowCommitOnEmptyBatch(conf)) {
// No data has written, reset the buffer and returns early
this.eventBuffers.reset(checkpointId);
return false;
}
doCommit(checkpointId, instant, dataWriteResults, eventBuffer.collectIndexWriteStatuses());
return true;
} finally {
// Stop the heartbeat and remove the memoized transaction state regardless of
// whether commit succeeds or fails before the coordinator restarts.
writeClient.cleanResources(instant);
return false;
}
doCommit(checkpointId, instant, dataWriteResults, eventBuffer.collectIndexWriteStatuses());
return true;
}

/**
Expand All @@ -652,6 +657,7 @@ private void doCommit(long checkpointId, String instant, List<WriteStatus> dataW
FlinkValidatorUtils.runValidators(conf, instant, allWriteStatus,
checkpointCommitMetadata, () -> StreamerUtil.getPreviousCommitMetadata(this.metaClient));

this.writeClient.loadTxn(instant);
boolean success = writeClient.commit(instant, allWriteStatus, Option.of(checkpointCommitMetadata),
tableState.commitAction, partitionToReplacedFileIds);
if (success) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ public HashMap<Long, String> getAllCheckpointIdAndInstants() {
return result;
}

public Collection<String> getAllInstants() {
return this.eventBuffers.values().stream().map(Pair::getLeft).collect(Collectors.toList());
}

public void initNewEventBuffer(long checkpointId, String instantTime) {
this.eventBuffers.put(checkpointId, Pair.of(instantTime, new EventBuffer(dataWriteParallelism, indexWriteParallelism)));
}
Expand Down
Loading
Loading