diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java index 713427b52c01f..aa5f02132e022 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java @@ -26,6 +26,7 @@ import lombok.Getter; import java.io.Serializable; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -69,10 +70,54 @@ public class HoodieWriteCommitCallbackMessage implements Serializable { */ private final Option> extraMetadata; + /** + * Previous base file paths keyed by fileId. Populated by the write client + * using the cached FileSystemView so that callback implementations don't + * have to rebuild a view. Empty for inserts and for callers that don't + * pre-resolve. + */ + private final Map prevFilePaths; + + /** + * Free-form context that producers can attach for downstream callback consumers. + * The OSS write client populates this as empty; specialized callsites or wrappers + * may populate it with whatever context their callbacks need. Mirrors the + * optional shape of {@link #extraMetadata}. + */ + private final Map extraContext; + public HoodieWriteCommitCallbackMessage(String commitTime, String tableName, String basePath, List hoodieWriteStat) { - this(commitTime, tableName, basePath, hoodieWriteStat, Option.empty(), Option.empty()); + this(commitTime, tableName, basePath, hoodieWriteStat, Option.empty(), Option.empty(), + Collections.emptyMap(), Collections.emptyMap()); + } + + public HoodieWriteCommitCallbackMessage(String commitTime, + String tableName, + String basePath, + List hoodieWriteStat, + Option commitActionType, + Option> extraMetadata) { + this(commitTime, tableName, basePath, hoodieWriteStat, commitActionType, extraMetadata, + Collections.emptyMap(), Collections.emptyMap()); + } + + /** + * Container for previously-existing file paths associated with a single fileId in a + * commit. {@link #prevBaseFilePath} is the base file the new write replaces, and + * {@link #bootstrapBaseFilePath} is the bootstrap-source file the previous + * base file referenced (null for non-bootstrap tables). + */ + public static class PrevFilePaths implements Serializable { + private static final long serialVersionUID = 1L; + public final String prevBaseFilePath; + public final String bootstrapBaseFilePath; + + public PrevFilePaths(String prevBaseFilePath, String bootstrapBaseFilePath) { + this.prevBaseFilePath = prevBaseFilePath; + this.bootstrapBaseFilePath = bootstrapBaseFilePath; + } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java index 30aa9f770b5f0..31729d4e2dcd1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java @@ -20,12 +20,17 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.callback.HoodieClientInitCallback; +import org.apache.hudi.callback.HoodieWriteCommitCallback; +import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage; +import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage.PrevFilePaths; +import org.apache.hudi.callback.util.HoodieCommitCallbackFactory; import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient; import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.client.utils.TransactionUtils; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -34,6 +39,7 @@ import org.apache.hudi.common.table.timeline.TimeGenerator; import org.apache.hudi.common.table.timeline.TimeGenerators; import org.apache.hudi.common.table.timeline.TimelineUtils; +import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; @@ -63,6 +69,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.hudi.config.HoodieWriteConfig.APPLICATION_ID; @@ -87,6 +94,14 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable { protected final TransactionManager txnManager; protected final TimeGenerator timeGenerator; + /** + * Lazily-initialized commit callback (HoodieWriteCommitCallback). Lifted from + * {@link BaseHoodieWriteClient} so that {@link BaseHoodieTableServiceClient} can also + * fire callbacks for compaction and clustering completions. Transient is fine + * because the callback is only ever invoked from the driver after a commit. + */ + protected transient HoodieWriteCommitCallback commitCallback; + /** * Timeline Server has the same lifetime as that of Client. Any operations done on the same timeline service will be * able to take advantage of the cached file-system view. New completed actions will be synced automatically in an @@ -462,4 +477,79 @@ private static Map collectRollingMetadataFromTimeline( protected Option> updateExtraMetadata(Option> extraMetadata) { return CommitMetadataProperties.enrich(extraMetadata, config, context); } + + /** + * Fire {@link HoodieWriteCommitCallback} for a commit, if enabled. Shared by + * {@link BaseHoodieWriteClient#postCommit} (regular auto- and explicit-commit paths) + * and {@link BaseHoodieTableServiceClient} (compaction and clustering completions). + * Lazily constructs the callback instance from {@code hoodie.write.commit.callback.class}. + * + *

Best-effort: catches and logs any exception from the user-supplied callback so a + * misbehaving observer cannot fail the commit. + */ + protected void fireCommitCallbackIfNecessary(String commitTime, + String commitActionType, + List stats, + Supplier fsViewSupplier, + Option> extraMetadata) { + if (!config.writeCommitCallbackOn()) { + return; + } + try { + if (commitCallback == null) { + commitCallback = HoodieCommitCallbackFactory.create(config); + } + commitCallback.call(new HoodieWriteCommitCallbackMessage( + commitTime, config.getTableName(), config.getBasePath(), + stats, Option.of(commitActionType), extraMetadata, + resolvePrevFilePaths(stats, fsViewSupplier.get()), + Collections.emptyMap())); + } catch (Exception e) { + log.warn("HoodieWriteCommitCallback failed for commit {} ({}); ignoring", + commitTime, commitActionType, e); + } + } + + /** + * Pre-resolve the previous base file (and bootstrap base file, if any) for every + * {@link HoodieWriteStat} that represents an update, using a populated + * {@link BaseFileOnlyView}. The lookup is O(1) per stat against the cached view, so + * this adds no I/O on top of what the writer already paid. + * + *

Used by {@link #fireCommitCallbackIfNecessary} call sites so the callback message ships + * actual file paths rather than forcing each callback impl to rebuild a + * {@code FileSystemView}. + */ + protected static Map resolvePrevFilePaths(List stats, + BaseFileOnlyView fsView) { + Map out = new HashMap<>(); + if (stats == null || fsView == null) { + return out; + } + for (HoodieWriteStat stat : stats) { + String prevCommit = stat.getPrevCommit(); + if (StringUtils.isNullOrEmpty(prevCommit) || HoodieWriteStat.NULL_COMMIT.equals(prevCommit)) { + continue; + } + Option prev; + try { + prev = fsView.getBaseFileOn(stat.getPartitionPath(), prevCommit, stat.getFileId()); + } catch (Exception e) { + // Best-effort: a remote view 4xx/5xx, a stale view, or a replaced file group must not + // fail the commit. Drop the prev path for this stat and keep going. + log.warn("Could not resolve prev base file for fileId={} prevCommit={}; skipping", + stat.getFileId(), prevCommit, e); + continue; + } + if (!prev.isPresent()) { + continue; + } + String prevPath = prev.get().getPath(); + String bootstrapPath = prev.get().getBootstrapBaseFile().isPresent() + ? prev.get().getBootstrapBaseFile().get().getPath() + : null; + out.put(stat.getFileId(), new PrevFilePaths(prevPath, bootstrapPath)); + } + return out; + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index d29fc0bbd32d8..52b5a4d8b45f9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -424,6 +424,8 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab ); } log.info("Compacted successfully on commit {}", compactionCommitTime); + fireCommitCallbackIfNecessary(compactionCommitTime, HoodieTimeline.COMMIT_ACTION, + writeStats, table::getBaseFileOnlyView, Option.empty()); } finally { if (config.getWriteConcurrencyMode().supportsMultiWriter()) { this.heartbeatClient.stop(compactionCommitTime); @@ -640,6 +642,8 @@ private void completeClustering(HoodieReplaceCommitMetadata replaceCommitMetadat heartbeatClient.stop(clusteringCommitTime); } log.info("Clustering successfully on commit {} for table {}", clusteringCommitTime, table.getConfig().getBasePath()); + fireCommitCallbackIfNecessary(clusteringCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION, + writeStats, table::getBaseFileOnlyView, Option.empty()); } protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option> extraMetadata) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 14789a4c667df..57cd196ee827a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -24,10 +24,7 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRestorePlan; import org.apache.hudi.avro.model.HoodieRollbackMetadata; -import org.apache.hudi.callback.HoodieWriteCommitCallback; -import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage; import org.apache.hudi.callback.common.WriteStatusValidator; -import org.apache.hudi.callback.util.HoodieCommitCallbackFactory; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.heartbeat.HeartbeatUtils; import org.apache.hudi.client.transaction.TransactionManager; @@ -146,7 +143,6 @@ public abstract class BaseHoodieWriteClient extends BaseHoodieClient @Getter @Setter private transient WriteOperationType operationType; - private transient HoodieWriteCommitCallback commitCallback; protected transient Timer.Context writeTimer = null; @@ -287,7 +283,7 @@ public boolean commitStats(String instantTime, TableWriteStats tableWriteStats, boolean postCommitStatus = true; HoodieTimer postCommitTimer = HoodieTimer.start(); try { - postCommit(table, metadata, instantTime, extraMetadata); + postCommit(table, metadata, instantTime, commitActionType, extraMetadata); mayBeCleanAndArchive(table); runTableServicesInline(table, metadata, extraMetadata); } catch (Exception e) { @@ -303,15 +299,6 @@ public boolean commitStats(String instantTime, TableWriteStats tableWriteStats, } emitCommitMetrics(instantTime, metadata, commitActionType); - - // callback if needed. - if (config.writeCommitCallbackOn()) { - if (null == commitCallback) { - commitCallback = HoodieCommitCallbackFactory.create(config); - } - commitCallback.call(new HoodieWriteCommitCallbackMessage( - instantTime, config.getTableName(), config.getBasePath(), tableWriteStats.getDataTableWriteStats(), Option.of(commitActionType), extraMetadata)); - } return true; } @@ -639,7 +626,8 @@ public O postWrite(HoodieWriteMetadata result, String instantTime, HoodieTabl boolean postCommitStatus = true; HoodieTimer postCommitTimer = HoodieTimer.start(); try { - postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty()); + postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, + hoodieTable.getMetaClient().getCommitActionType(), Option.empty()); mayBeCleanAndArchive(hoodieTable); } catch (Exception e) { postCommitStatus = false; @@ -666,7 +654,7 @@ public O postWrite(HoodieWriteMetadata result, String instantTime, HoodieTabl * @param instantTime Instant Time * @param extraMetadata Additional Metadata passed by user */ - protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option> extraMetadata) { + protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, String commitActionType, Option> extraMetadata) { try { context.setJobStatus(this.getClass().getSimpleName(), "Cleaning up marker directories for commit " + instantTime + " in table " + config.getTableName()); @@ -674,6 +662,12 @@ protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, Stri WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); metrics.updateTableServiceInstantMetrics(table.getActiveTimeline()); + // Fire write commit callback if a callback class is registered. postCommit() is reached + // by both auto-commit and explicit-commit paths; compaction and clustering have their own + // explicit fireCommitCallbackIfNecessary call sites in BaseHoodieTableServiceClient. + List stats = metadata.getWriteStats(); + fireCommitCallbackIfNecessary(instantTime, commitActionType, stats, + table::getBaseFileOnlyView, extraMetadata); } finally { this.heartbeatClient.stop(instantTime); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieClient.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieClient.java new file mode 100644 index 0000000000000..02d746479e498 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieClient.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage; +import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage.PrevFilePaths; +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; +import org.apache.hudi.common.util.Option; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for the post-commit write-callback plumbing added to {@link BaseHoodieClient}: + * {@link BaseHoodieClient#resolvePrevFilePaths(List, BaseFileOnlyView)} and the + * {@link HoodieWriteCommitCallbackMessage} contract it feeds. {@code resolvePrevFilePaths} + * pre-resolves the previous base file (and bootstrap source, if any) for each updated file + * group from a cached {@link BaseFileOnlyView}, so callback implementations receive the + * read/write file pairing without rebuilding a file-system view. + */ +public class TestBaseHoodieClient { + + private static final String PARTITION = "2024/01/01"; + private static final String PREV_COMMIT = "001"; + + private static HoodieWriteStat stat(String fileId, String partitionPath, String prevCommit) { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId(fileId); + writeStat.setPartitionPath(partitionPath); + writeStat.setPrevCommit(prevCommit); + return writeStat; + } + + @Test + public void resolvePrevFilePathsReturnsEmptyForNullInputs() { + BaseFileOnlyView view = mock(BaseFileOnlyView.class); + assertTrue(BaseHoodieClient.resolvePrevFilePaths(null, view).isEmpty(), + "null stats must yield an empty map"); + assertTrue(BaseHoodieClient.resolvePrevFilePaths( + Collections.singletonList(stat("f0", PARTITION, PREV_COMMIT)), null).isEmpty(), + "null file-system view must yield an empty map"); + } + + @Test + public void resolvePrevFilePathsSkipsStatsWithoutAPrevCommit() { + BaseFileOnlyView view = mock(BaseFileOnlyView.class); + List inserts = Arrays.asList( + stat("f-null", PARTITION, null), + stat("f-empty", PARTITION, ""), + stat("f-nullcommit", PARTITION, HoodieWriteStat.NULL_COMMIT)); + + Map resolved = BaseHoodieClient.resolvePrevFilePaths(inserts, view); + + assertTrue(resolved.isEmpty(), "inserts (no prevCommit) must not resolve a prev base file"); + // The view must not even be consulted for inserts. + verify(view, never()).getBaseFileOn(org.mockito.ArgumentMatchers.anyString(), + org.mockito.ArgumentMatchers.anyString(), org.mockito.ArgumentMatchers.anyString()); + } + + @Test + public void resolvePrevFilePathsResolvesUpdatePrevBaseFile() { + BaseFileOnlyView view = mock(BaseFileOnlyView.class); + HoodieBaseFile prevBase = new HoodieBaseFile("/tbl/" + PARTITION + "/f0_0-1-1_" + PREV_COMMIT + ".parquet"); + when(view.getBaseFileOn(PARTITION, PREV_COMMIT, "f0")).thenReturn(Option.of(prevBase)); + + Map resolved = BaseHoodieClient.resolvePrevFilePaths( + Collections.singletonList(stat("f0", PARTITION, PREV_COMMIT)), view); + + assertEquals(1, resolved.size()); + assertEquals(prevBase.getPath(), resolved.get("f0").prevBaseFilePath); + assertNull(resolved.get("f0").bootstrapBaseFilePath, "non-bootstrap update has no bootstrap path"); + } + + @Test + public void resolvePrevFilePathsCapturesBootstrapBaseFile() { + BaseFileOnlyView view = mock(BaseFileOnlyView.class); + BaseFile bootstrap = new BaseFile("/bootstrap/source/f0.parquet"); + HoodieBaseFile prevBase = new HoodieBaseFile("/tbl/" + PARTITION + "/f0_0-1-1_" + PREV_COMMIT + ".parquet", bootstrap); + when(view.getBaseFileOn(PARTITION, PREV_COMMIT, "f0")).thenReturn(Option.of(prevBase)); + + Map resolved = BaseHoodieClient.resolvePrevFilePaths( + Collections.singletonList(stat("f0", PARTITION, PREV_COMMIT)), view); + + assertEquals(prevBase.getPath(), resolved.get("f0").prevBaseFilePath); + assertEquals(bootstrap.getPath(), resolved.get("f0").bootstrapBaseFilePath, + "bootstrap source path must be carried through for bootstrapped file groups"); + } + + @Test + public void resolvePrevFilePathsSkipsWhenBaseFileAbsent() { + BaseFileOnlyView view = mock(BaseFileOnlyView.class); + when(view.getBaseFileOn(PARTITION, PREV_COMMIT, "f0")).thenReturn(Option.empty()); + + Map resolved = BaseHoodieClient.resolvePrevFilePaths( + Collections.singletonList(stat("f0", PARTITION, PREV_COMMIT)), view); + + assertTrue(resolved.isEmpty(), "a missing prev base file must be skipped, not mapped to null"); + } + + @Test + public void resolvePrevFilePathsIsBestEffortOnViewFailure() { + BaseFileOnlyView view = mock(BaseFileOnlyView.class); + when(view.getBaseFileOn(PARTITION, PREV_COMMIT, "boom")) + .thenThrow(new RuntimeException("stale or remote view error")); + HoodieBaseFile prevBase = new HoodieBaseFile("/tbl/" + PARTITION + "/ok_0-1-1_" + PREV_COMMIT + ".parquet"); + when(view.getBaseFileOn(PARTITION, PREV_COMMIT, "ok")).thenReturn(Option.of(prevBase)); + + Map resolved = BaseHoodieClient.resolvePrevFilePaths( + Arrays.asList(stat("boom", PARTITION, PREV_COMMIT), stat("ok", PARTITION, PREV_COMMIT)), view); + + // The failing file group is dropped; resolution continues for the rest (must not fail the commit). + assertFalse(resolved.containsKey("boom")); + assertEquals(prevBase.getPath(), resolved.get("ok").prevBaseFilePath); + } + + @Test + public void callbackMessageDefaultsCollectionsToEmpty() { + HoodieWriteCommitCallbackMessage message = new HoodieWriteCommitCallbackMessage( + PREV_COMMIT, "table", "/base", Collections.emptyList()); + + assertFalse(message.getCommitActionType().isPresent()); + assertFalse(message.getExtraMetadata().isPresent()); + assertTrue(message.getPrevFilePaths().isEmpty(), "prevFilePaths must default to an empty map, never null"); + assertTrue(message.getExtraContext().isEmpty(), "extraContext must default to an empty map, never null"); + } + + @Test + public void callbackMessageRetainsPrevFilePathsAndContext() { + Map prevFilePaths = + Collections.singletonMap("f0", new PrevFilePaths("/tbl/prev.parquet", null)); + Map extraContext = Collections.singletonMap("file_id", "f0"); + + HoodieWriteCommitCallbackMessage message = new HoodieWriteCommitCallbackMessage( + PREV_COMMIT, "table", "/base", Collections.emptyList(), + Option.of("commit"), Option.empty(), prevFilePaths, extraContext); + + assertEquals("commit", message.getCommitActionType().get()); + assertEquals(prevFilePaths, message.getPrevFilePaths()); + assertEquals(extraContext, message.getExtraContext()); + } +}