From 127d370b5ca4405636627d668d2fb976693f4383 Mon Sep 17 00:00:00 2001 From: codope Date: Fri, 12 Jun 2026 14:26:44 +0530 Subject: [PATCH 1/2] feat(client): enrich write commit callback message and fire it for table-service commits Two backward-compatible improvements to the post-commit write callback mechanism: 1. Enrich HoodieWriteCommitCallbackMessage with two optional fields so callback implementations no longer have to rebuild a FileSystemView or reach into engine config: - prevFilePaths: Map -- the previous base file (and bootstrap source, if any) each updated file group replaces, pre-resolved by the write client from its cached file-system view. - extraContext: Map -- free-form context producers can attach. Both default to empty maps; the existing 4-arg and 6-arg constructors are preserved. 2. Fire the callback for table-service commits too (compaction and clustering completion), not just data commits. The shared firing logic (fireCommitCallback) and prev-file resolution (resolvePrevFilePaths) are lifted into BaseHoodieClient so both BaseHoodieWriteClient (data commits, via postCommit) and BaseHoodieTableServiceClient (compaction/clustering completion) reuse them. The commitCallback field is lifted up from BaseHoodieWriteClient. postCommit now receives the resolved commit action type so the callback reports the actual action (e.g. replacecommit for insert_overwrite) rather than the table's base action type. Best-effort by design: callback and prev-file resolution failures are logged and never fail the write. Adds TestBaseHoodieClient covering resolvePrevFilePaths (inserts, updates, bootstrap capture, missing-file skip, best-effort on view failure, null inputs) and the message default/retention contract. --- .../HoodieWriteCommitCallbackMessage.java | 47 ++++- .../apache/hudi/client/BaseHoodieClient.java | 88 +++++++++ .../client/BaseHoodieTableServiceClient.java | 4 + .../hudi/client/BaseHoodieWriteClient.java | 26 +-- .../hudi/client/TestBaseHoodieClient.java | 173 ++++++++++++++++++ 5 files changed, 321 insertions(+), 17 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieClient.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java index 713427b52c01f..aa5f02132e022 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java @@ -26,6 +26,7 @@ import lombok.Getter; import java.io.Serializable; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -69,10 +70,54 @@ public class HoodieWriteCommitCallbackMessage implements Serializable { */ private final Option> extraMetadata; + /** + * Previous base file paths keyed by fileId. Populated by the write client + * using the cached FileSystemView so that callback implementations don't + * have to rebuild a view. Empty for inserts and for callers that don't + * pre-resolve. + */ + private final Map prevFilePaths; + + /** + * Free-form context that producers can attach for downstream callback consumers. + * The OSS write client populates this as empty; specialized callsites or wrappers + * may populate it with whatever context their callbacks need. Mirrors the + * optional shape of {@link #extraMetadata}. + */ + private final Map extraContext; + public HoodieWriteCommitCallbackMessage(String commitTime, String tableName, String basePath, List hoodieWriteStat) { - this(commitTime, tableName, basePath, hoodieWriteStat, Option.empty(), Option.empty()); + this(commitTime, tableName, basePath, hoodieWriteStat, Option.empty(), Option.empty(), + Collections.emptyMap(), Collections.emptyMap()); + } + + public HoodieWriteCommitCallbackMessage(String commitTime, + String tableName, + String basePath, + List hoodieWriteStat, + Option commitActionType, + Option> extraMetadata) { + this(commitTime, tableName, basePath, hoodieWriteStat, commitActionType, extraMetadata, + Collections.emptyMap(), Collections.emptyMap()); + } + + /** + * Container for previously-existing file paths associated with a single fileId in a + * commit. {@link #prevBaseFilePath} is the base file the new write replaces, and + * {@link #bootstrapBaseFilePath} is the bootstrap-source file the previous + * base file referenced (null for non-bootstrap tables). + */ + public static class PrevFilePaths implements Serializable { + private static final long serialVersionUID = 1L; + public final String prevBaseFilePath; + public final String bootstrapBaseFilePath; + + public PrevFilePaths(String prevBaseFilePath, String bootstrapBaseFilePath) { + this.prevBaseFilePath = prevBaseFilePath; + this.bootstrapBaseFilePath = bootstrapBaseFilePath; + } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java index 30aa9f770b5f0..4ac0a7cec7b10 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java @@ -20,12 +20,17 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.callback.HoodieClientInitCallback; +import org.apache.hudi.callback.HoodieWriteCommitCallback; +import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage; +import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage.PrevFilePaths; +import org.apache.hudi.callback.util.HoodieCommitCallbackFactory; import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient; import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.client.utils.TransactionUtils; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -34,6 +39,7 @@ import org.apache.hudi.common.table.timeline.TimeGenerator; import org.apache.hudi.common.table.timeline.TimeGenerators; import org.apache.hudi.common.table.timeline.TimelineUtils; +import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; @@ -87,6 +93,14 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable { protected final TransactionManager txnManager; protected final TimeGenerator timeGenerator; + /** + * Lazily-initialized commit callback (HoodieWriteCommitCallback). Lifted from + * {@link BaseHoodieWriteClient} so that {@link BaseHoodieTableServiceClient} can also + * fire callbacks for compaction and clustering completions. Transient is fine + * because the callback is only ever invoked from the driver after a commit. + */ + protected transient HoodieWriteCommitCallback commitCallback; + /** * Timeline Server has the same lifetime as that of Client. Any operations done on the same timeline service will be * able to take advantage of the cached file-system view. New completed actions will be synced automatically in an @@ -462,4 +476,78 @@ private static Map collectRollingMetadataFromTimeline( protected Option> updateExtraMetadata(Option> extraMetadata) { return CommitMetadataProperties.enrich(extraMetadata, config, context); } + + /** + * Fire {@link HoodieWriteCommitCallback} for a commit, if enabled. Shared by + * {@link BaseHoodieWriteClient#postCommit} (regular auto- and explicit-commit paths) + * and {@link BaseHoodieTableServiceClient} (compaction and clustering completions). + * Lazily constructs the callback instance from {@code hoodie.write.commit.callback.class}. + * + *

Best-effort: catches and logs any exception from the user-supplied callback so a + * misbehaving observer cannot fail the commit. + */ + protected void fireCommitCallback(String commitTime, + String commitActionType, + List stats, + BaseFileOnlyView fsView, + Option> extraMetadata) { + if (!config.writeCommitCallbackOn()) { + return; + } + try { + if (commitCallback == null) { + commitCallback = HoodieCommitCallbackFactory.create(config); + } + commitCallback.call(new HoodieWriteCommitCallbackMessage( + commitTime, config.getTableName(), config.getBasePath(), + stats, Option.of(commitActionType), extraMetadata, resolvePrevFilePaths(stats, fsView), + Collections.emptyMap())); + } catch (Exception e) { + log.warn("HoodieWriteCommitCallback failed for commit {} ({}); ignoring", + commitTime, commitActionType, e); + } + } + + /** + * Pre-resolve the previous base file (and bootstrap base file, if any) for every + * {@link HoodieWriteStat} that represents an update, using a populated + * {@link BaseFileOnlyView}. The lookup is O(1) per stat against the cached view, so + * this adds no I/O on top of what the writer already paid. + * + *

Used by {@link #fireCommitCallback} call sites so the callback message ships + * actual file paths rather than forcing each callback impl to rebuild a + * {@code FileSystemView}. + */ + protected static Map resolvePrevFilePaths(List stats, + BaseFileOnlyView fsView) { + Map out = new HashMap<>(); + if (stats == null || fsView == null) { + return out; + } + for (HoodieWriteStat stat : stats) { + String prevCommit = stat.getPrevCommit(); + if (StringUtils.isNullOrEmpty(prevCommit) || HoodieWriteStat.NULL_COMMIT.equals(prevCommit)) { + continue; + } + Option prev; + try { + prev = fsView.getBaseFileOn(stat.getPartitionPath(), prevCommit, stat.getFileId()); + } catch (Exception e) { + // Best-effort: a remote view 4xx/5xx, a stale view, or a replaced file group must not + // fail the commit. Drop the prev path for this stat and keep going. + log.warn("Could not resolve prev base file for fileId={} prevCommit={}; skipping", + stat.getFileId(), prevCommit, e); + continue; + } + if (!prev.isPresent()) { + continue; + } + String prevPath = prev.get().getPath(); + String bootstrapPath = prev.get().getBootstrapBaseFile().isPresent() + ? prev.get().getBootstrapBaseFile().get().getPath() + : null; + out.put(stat.getFileId(), new PrevFilePaths(prevPath, bootstrapPath)); + } + return out; + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index d29fc0bbd32d8..fa7d607daffbb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -424,6 +424,8 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab ); } log.info("Compacted successfully on commit {}", compactionCommitTime); + fireCommitCallback(compactionCommitTime, HoodieTimeline.COMMIT_ACTION, + writeStats, table.getBaseFileOnlyView(), Option.empty()); } finally { if (config.getWriteConcurrencyMode().supportsMultiWriter()) { this.heartbeatClient.stop(compactionCommitTime); @@ -640,6 +642,8 @@ private void completeClustering(HoodieReplaceCommitMetadata replaceCommitMetadat heartbeatClient.stop(clusteringCommitTime); } log.info("Clustering successfully on commit {} for table {}", clusteringCommitTime, table.getConfig().getBasePath()); + fireCommitCallback(clusteringCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION, + writeStats, table.getBaseFileOnlyView(), Option.empty()); } protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option> extraMetadata) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 14789a4c667df..277f40f949f68 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -24,10 +24,7 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRestorePlan; import org.apache.hudi.avro.model.HoodieRollbackMetadata; -import org.apache.hudi.callback.HoodieWriteCommitCallback; -import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage; import org.apache.hudi.callback.common.WriteStatusValidator; -import org.apache.hudi.callback.util.HoodieCommitCallbackFactory; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.heartbeat.HeartbeatUtils; import org.apache.hudi.client.transaction.TransactionManager; @@ -146,7 +143,6 @@ public abstract class BaseHoodieWriteClient extends BaseHoodieClient @Getter @Setter private transient WriteOperationType operationType; - private transient HoodieWriteCommitCallback commitCallback; protected transient Timer.Context writeTimer = null; @@ -287,7 +283,7 @@ public boolean commitStats(String instantTime, TableWriteStats tableWriteStats, boolean postCommitStatus = true; HoodieTimer postCommitTimer = HoodieTimer.start(); try { - postCommit(table, metadata, instantTime, extraMetadata); + postCommit(table, metadata, instantTime, commitActionType, extraMetadata); mayBeCleanAndArchive(table); runTableServicesInline(table, metadata, extraMetadata); } catch (Exception e) { @@ -303,15 +299,6 @@ public boolean commitStats(String instantTime, TableWriteStats tableWriteStats, } emitCommitMetrics(instantTime, metadata, commitActionType); - - // callback if needed. - if (config.writeCommitCallbackOn()) { - if (null == commitCallback) { - commitCallback = HoodieCommitCallbackFactory.create(config); - } - commitCallback.call(new HoodieWriteCommitCallbackMessage( - instantTime, config.getTableName(), config.getBasePath(), tableWriteStats.getDataTableWriteStats(), Option.of(commitActionType), extraMetadata)); - } return true; } @@ -639,7 +626,8 @@ public O postWrite(HoodieWriteMetadata result, String instantTime, HoodieTabl boolean postCommitStatus = true; HoodieTimer postCommitTimer = HoodieTimer.start(); try { - postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty()); + postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, + hoodieTable.getMetaClient().getCommitActionType(), Option.empty()); mayBeCleanAndArchive(hoodieTable); } catch (Exception e) { postCommitStatus = false; @@ -666,7 +654,7 @@ public O postWrite(HoodieWriteMetadata result, String instantTime, HoodieTabl * @param instantTime Instant Time * @param extraMetadata Additional Metadata passed by user */ - protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option> extraMetadata) { + protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, String commitActionType, Option> extraMetadata) { try { context.setJobStatus(this.getClass().getSimpleName(), "Cleaning up marker directories for commit " + instantTime + " in table " + config.getTableName()); @@ -674,6 +662,12 @@ protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, Stri WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); metrics.updateTableServiceInstantMetrics(table.getActiveTimeline()); + // Fire write commit callback if a callback class is registered. postCommit() is reached + // by both auto-commit and explicit-commit paths; compaction and clustering have their own + // explicit fireCommitCallback call sites in BaseHoodieTableServiceClient. + List stats = metadata.getWriteStats(); + fireCommitCallback(instantTime, commitActionType, stats, + table.getBaseFileOnlyView(), extraMetadata); } finally { this.heartbeatClient.stop(instantTime); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieClient.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieClient.java new file mode 100644 index 0000000000000..02d746479e498 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieClient.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage; +import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage.PrevFilePaths; +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; +import org.apache.hudi.common.util.Option; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for the post-commit write-callback plumbing added to {@link BaseHoodieClient}: + * {@link BaseHoodieClient#resolvePrevFilePaths(List, BaseFileOnlyView)} and the + * {@link HoodieWriteCommitCallbackMessage} contract it feeds. {@code resolvePrevFilePaths} + * pre-resolves the previous base file (and bootstrap source, if any) for each updated file + * group from a cached {@link BaseFileOnlyView}, so callback implementations receive the + * read/write file pairing without rebuilding a file-system view. + */ +public class TestBaseHoodieClient { + + private static final String PARTITION = "2024/01/01"; + private static final String PREV_COMMIT = "001"; + + private static HoodieWriteStat stat(String fileId, String partitionPath, String prevCommit) { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId(fileId); + writeStat.setPartitionPath(partitionPath); + writeStat.setPrevCommit(prevCommit); + return writeStat; + } + + @Test + public void resolvePrevFilePathsReturnsEmptyForNullInputs() { + BaseFileOnlyView view = mock(BaseFileOnlyView.class); + assertTrue(BaseHoodieClient.resolvePrevFilePaths(null, view).isEmpty(), + "null stats must yield an empty map"); + assertTrue(BaseHoodieClient.resolvePrevFilePaths( + Collections.singletonList(stat("f0", PARTITION, PREV_COMMIT)), null).isEmpty(), + "null file-system view must yield an empty map"); + } + + @Test + public void resolvePrevFilePathsSkipsStatsWithoutAPrevCommit() { + BaseFileOnlyView view = mock(BaseFileOnlyView.class); + List inserts = Arrays.asList( + stat("f-null", PARTITION, null), + stat("f-empty", PARTITION, ""), + stat("f-nullcommit", PARTITION, HoodieWriteStat.NULL_COMMIT)); + + Map resolved = BaseHoodieClient.resolvePrevFilePaths(inserts, view); + + assertTrue(resolved.isEmpty(), "inserts (no prevCommit) must not resolve a prev base file"); + // The view must not even be consulted for inserts. + verify(view, never()).getBaseFileOn(org.mockito.ArgumentMatchers.anyString(), + org.mockito.ArgumentMatchers.anyString(), org.mockito.ArgumentMatchers.anyString()); + } + + @Test + public void resolvePrevFilePathsResolvesUpdatePrevBaseFile() { + BaseFileOnlyView view = mock(BaseFileOnlyView.class); + HoodieBaseFile prevBase = new HoodieBaseFile("/tbl/" + PARTITION + "/f0_0-1-1_" + PREV_COMMIT + ".parquet"); + when(view.getBaseFileOn(PARTITION, PREV_COMMIT, "f0")).thenReturn(Option.of(prevBase)); + + Map resolved = BaseHoodieClient.resolvePrevFilePaths( + Collections.singletonList(stat("f0", PARTITION, PREV_COMMIT)), view); + + assertEquals(1, resolved.size()); + assertEquals(prevBase.getPath(), resolved.get("f0").prevBaseFilePath); + assertNull(resolved.get("f0").bootstrapBaseFilePath, "non-bootstrap update has no bootstrap path"); + } + + @Test + public void resolvePrevFilePathsCapturesBootstrapBaseFile() { + BaseFileOnlyView view = mock(BaseFileOnlyView.class); + BaseFile bootstrap = new BaseFile("/bootstrap/source/f0.parquet"); + HoodieBaseFile prevBase = new HoodieBaseFile("/tbl/" + PARTITION + "/f0_0-1-1_" + PREV_COMMIT + ".parquet", bootstrap); + when(view.getBaseFileOn(PARTITION, PREV_COMMIT, "f0")).thenReturn(Option.of(prevBase)); + + Map resolved = BaseHoodieClient.resolvePrevFilePaths( + Collections.singletonList(stat("f0", PARTITION, PREV_COMMIT)), view); + + assertEquals(prevBase.getPath(), resolved.get("f0").prevBaseFilePath); + assertEquals(bootstrap.getPath(), resolved.get("f0").bootstrapBaseFilePath, + "bootstrap source path must be carried through for bootstrapped file groups"); + } + + @Test + public void resolvePrevFilePathsSkipsWhenBaseFileAbsent() { + BaseFileOnlyView view = mock(BaseFileOnlyView.class); + when(view.getBaseFileOn(PARTITION, PREV_COMMIT, "f0")).thenReturn(Option.empty()); + + Map resolved = BaseHoodieClient.resolvePrevFilePaths( + Collections.singletonList(stat("f0", PARTITION, PREV_COMMIT)), view); + + assertTrue(resolved.isEmpty(), "a missing prev base file must be skipped, not mapped to null"); + } + + @Test + public void resolvePrevFilePathsIsBestEffortOnViewFailure() { + BaseFileOnlyView view = mock(BaseFileOnlyView.class); + when(view.getBaseFileOn(PARTITION, PREV_COMMIT, "boom")) + .thenThrow(new RuntimeException("stale or remote view error")); + HoodieBaseFile prevBase = new HoodieBaseFile("/tbl/" + PARTITION + "/ok_0-1-1_" + PREV_COMMIT + ".parquet"); + when(view.getBaseFileOn(PARTITION, PREV_COMMIT, "ok")).thenReturn(Option.of(prevBase)); + + Map resolved = BaseHoodieClient.resolvePrevFilePaths( + Arrays.asList(stat("boom", PARTITION, PREV_COMMIT), stat("ok", PARTITION, PREV_COMMIT)), view); + + // The failing file group is dropped; resolution continues for the rest (must not fail the commit). + assertFalse(resolved.containsKey("boom")); + assertEquals(prevBase.getPath(), resolved.get("ok").prevBaseFilePath); + } + + @Test + public void callbackMessageDefaultsCollectionsToEmpty() { + HoodieWriteCommitCallbackMessage message = new HoodieWriteCommitCallbackMessage( + PREV_COMMIT, "table", "/base", Collections.emptyList()); + + assertFalse(message.getCommitActionType().isPresent()); + assertFalse(message.getExtraMetadata().isPresent()); + assertTrue(message.getPrevFilePaths().isEmpty(), "prevFilePaths must default to an empty map, never null"); + assertTrue(message.getExtraContext().isEmpty(), "extraContext must default to an empty map, never null"); + } + + @Test + public void callbackMessageRetainsPrevFilePathsAndContext() { + Map prevFilePaths = + Collections.singletonMap("f0", new PrevFilePaths("/tbl/prev.parquet", null)); + Map extraContext = Collections.singletonMap("file_id", "f0"); + + HoodieWriteCommitCallbackMessage message = new HoodieWriteCommitCallbackMessage( + PREV_COMMIT, "table", "/base", Collections.emptyList(), + Option.of("commit"), Option.empty(), prevFilePaths, extraContext); + + assertEquals("commit", message.getCommitActionType().get()); + assertEquals(prevFilePaths, message.getPrevFilePaths()); + assertEquals(extraContext, message.getExtraContext()); + } +} From 22eeafbd13ccdc74d8ea01d54f4692c84f5332c2 Mon Sep 17 00:00:00 2001 From: codope Date: Sat, 13 Jun 2026 18:45:01 +0530 Subject: [PATCH 2/2] address commetns Signed-off-by: codope --- .../org/apache/hudi/client/BaseHoodieClient.java | 16 +++++++++------- .../client/BaseHoodieTableServiceClient.java | 8 ++++---- .../hudi/client/BaseHoodieWriteClient.java | 6 +++--- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java index 4ac0a7cec7b10..31729d4e2dcd1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java @@ -69,6 +69,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.hudi.config.HoodieWriteConfig.APPLICATION_ID; @@ -486,11 +487,11 @@ protected Option> updateExtraMetadata(OptionBest-effort: catches and logs any exception from the user-supplied callback so a * misbehaving observer cannot fail the commit. */ - protected void fireCommitCallback(String commitTime, - String commitActionType, - List stats, - BaseFileOnlyView fsView, - Option> extraMetadata) { + protected void fireCommitCallbackIfNecessary(String commitTime, + String commitActionType, + List stats, + Supplier fsViewSupplier, + Option> extraMetadata) { if (!config.writeCommitCallbackOn()) { return; } @@ -500,7 +501,8 @@ protected void fireCommitCallback(String commitTime, } commitCallback.call(new HoodieWriteCommitCallbackMessage( commitTime, config.getTableName(), config.getBasePath(), - stats, Option.of(commitActionType), extraMetadata, resolvePrevFilePaths(stats, fsView), + stats, Option.of(commitActionType), extraMetadata, + resolvePrevFilePaths(stats, fsViewSupplier.get()), Collections.emptyMap())); } catch (Exception e) { log.warn("HoodieWriteCommitCallback failed for commit {} ({}); ignoring", @@ -514,7 +516,7 @@ protected void fireCommitCallback(String commitTime, * {@link BaseFileOnlyView}. The lookup is O(1) per stat against the cached view, so * this adds no I/O on top of what the writer already paid. * - *

Used by {@link #fireCommitCallback} call sites so the callback message ships + *

Used by {@link #fireCommitCallbackIfNecessary} call sites so the callback message ships * actual file paths rather than forcing each callback impl to rebuild a * {@code FileSystemView}. */ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index fa7d607daffbb..52b5a4d8b45f9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -424,8 +424,8 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab ); } log.info("Compacted successfully on commit {}", compactionCommitTime); - fireCommitCallback(compactionCommitTime, HoodieTimeline.COMMIT_ACTION, - writeStats, table.getBaseFileOnlyView(), Option.empty()); + fireCommitCallbackIfNecessary(compactionCommitTime, HoodieTimeline.COMMIT_ACTION, + writeStats, table::getBaseFileOnlyView, Option.empty()); } finally { if (config.getWriteConcurrencyMode().supportsMultiWriter()) { this.heartbeatClient.stop(compactionCommitTime); @@ -642,8 +642,8 @@ private void completeClustering(HoodieReplaceCommitMetadata replaceCommitMetadat heartbeatClient.stop(clusteringCommitTime); } log.info("Clustering successfully on commit {} for table {}", clusteringCommitTime, table.getConfig().getBasePath()); - fireCommitCallback(clusteringCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION, - writeStats, table.getBaseFileOnlyView(), Option.empty()); + fireCommitCallbackIfNecessary(clusteringCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION, + writeStats, table::getBaseFileOnlyView, Option.empty()); } protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option> extraMetadata) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 277f40f949f68..57cd196ee827a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -664,10 +664,10 @@ protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, Stri metrics.updateTableServiceInstantMetrics(table.getActiveTimeline()); // Fire write commit callback if a callback class is registered. postCommit() is reached // by both auto-commit and explicit-commit paths; compaction and clustering have their own - // explicit fireCommitCallback call sites in BaseHoodieTableServiceClient. + // explicit fireCommitCallbackIfNecessary call sites in BaseHoodieTableServiceClient. List stats = metadata.getWriteStats(); - fireCommitCallback(instantTime, commitActionType, stats, - table.getBaseFileOnlyView(), extraMetadata); + fireCommitCallbackIfNecessary(instantTime, commitActionType, stats, + table::getBaseFileOnlyView, extraMetadata); } finally { this.heartbeatClient.stop(instantTime); }