From 9742d560ce7d0022be549df7e733fcfd22fb46c8 Mon Sep 17 00:00:00 2001 From: fhan Date: Fri, 19 Jun 2026 13:54:13 +0800 Subject: [PATCH 1/2] feat(clean): support clean-by-time retention boundary and archive protection --- .../versioning/v1/TimelineArchiverV1.java | 24 +----- .../versioning/v2/TimelineArchiverV2.java | 9 ++- .../hudi/client/utils/ArchivalUtils.java | 34 ++++++++ .../hudi/io/TestHoodieTimelineArchiver.java | 9 +-- .../apache/hudi/common/util/CleanerUtils.java | 16 +++- .../hudi/common/util/TestCleanerUtils.java | 77 ++++++++++++++++++- .../hudi/configuration/FlinkOptions.java | 22 ++++++ .../apache/hudi/util/FlinkWriteClients.java | 3 + .../hudi/utils/TestFlinkWriteClients.java | 19 +++++ 9 files changed, 178 insertions(+), 35 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java index bab78324d7ded..cc200f4c60980 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java @@ -20,7 +20,6 @@ package org.apache.hudi.client.timeline.versioning.v1; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; -import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.client.timeline.HoodieTimelineArchiver; import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.client.utils.ArchivalMetrics; @@ -54,7 +53,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; @@ -77,6 +75,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.client.utils.ArchivalUtils.getEarliestInstantToRetainForClean; import static org.apache.hudi.client.utils.ArchivalUtils.getMinAndMaxInstantsToKeep; import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN; import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN; @@ -278,26 +277,7 @@ private Stream getCommitInstantsToArchive() throws IOException { // If enabled, block archival based on ECTR from the last completed clean to ensure we don't archive // commits that have data files that haven't been cleaned yet. - Option oldestInstantToRetainForClean = Option.empty(); - if (config.shouldBlockArchivalOnCleanECTR()) { - Option lastCleanInstant = table.getCleanTimeline().filterCompletedInstants().lastInstant(); - if (lastCleanInstant.isPresent()) { - try { - HoodieCleanMetadata cleanMetadata = - table.getActiveTimeline().readCleanMetadata(lastCleanInstant.get()); - if (cleanMetadata.getEarliestCommitToRetain() != null - && !cleanMetadata.getEarliestCommitToRetain().trim().isEmpty()) { - oldestInstantToRetainForClean = commitTimeline.findInstantsAfterOrEquals( - cleanMetadata.getEarliestCommitToRetain()).firstInstant(); - log.info("Blocking archival based on earliest commit to retain {} from last clean {}. Oldest to retain is {}", - cleanMetadata.getEarliestCommitToRetain(), lastCleanInstant.get().requestedTime(), oldestInstantToRetainForClean.map(instant -> instant).orElse(null)); - } - } catch (IOException e) { - log.warn("Failed to read clean metadata for {}", lastCleanInstant.get(), e); - throw new HoodieIOException("Failed to read clean metadata for " + lastCleanInstant.get(), e); - } - } - } + Option oldestInstantToRetainForClean = getEarliestInstantToRetainForClean(table, commitTimeline, config); // Actually do the commits Option finalOldestInstantToRetainForClean = oldestInstantToRetainForClean; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java index ebcd16e2361e8..7ba9576bd59e3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java @@ -62,6 +62,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.client.utils.ArchivalUtils.getEarliestInstantToRetainForClean; import static org.apache.hudi.client.utils.ArchivalUtils.getMinAndMaxInstantsToKeep; import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN; import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; @@ -236,7 +237,11 @@ private List getCommitInstantsToArchive() throws IOException { config.getCleanerPolicy()); earliestInstantToRetainCandidates.add(earliestInstantToRetainForClustering); - // 4. If metadata table is enabled, do not archive instants which are more recent than the last compaction on the + // 4. If enabled, block archival based on ECTR from the last completed clean to ensure we don't archive + // commits that have data files that haven't been cleaned yet. + earliestInstantToRetainCandidates.add(getEarliestInstantToRetainForClean(table, completedCommitsTimeline, config)); + + // 5. If metadata table is enabled, do not archive instants which are more recent than the last compaction on the // metadata table. if (config.isMetadataTableEnabled() && table.getMetaClient().getTableConfig().isMetadataTableAvailable() @@ -256,7 +261,7 @@ private List getCommitInstantsToArchive() throws IOException { } } - // 5. If this is a metadata table, do not archive the commits that live in data set + // 6. If this is a metadata table, do not archive the commits that live in data set // active timeline. This is required by metadata table, // see HoodieTableMetadataUtil#processRollbackMetadata for details. if (table.isMetadataTable()) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ArchivalUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ArchivalUtils.java index d5251f3762fcc..802b2caf3cfb2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ArchivalUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ArchivalUtils.java @@ -20,6 +20,7 @@ package org.apache.hudi.client.utils; +import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -28,10 +29,12 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; import lombok.extern.slf4j.Slf4j; +import java.io.IOException; import java.text.ParseException; import java.time.Instant; @@ -110,6 +113,37 @@ public static Pair getMinAndMaxInstantsToKeep(HoodieTable getEarliestInstantToRetainForClean( + HoodieTable table, + HoodieTimeline commitTimeline, + HoodieWriteConfig config) { + if (!config.shouldBlockArchivalOnCleanECTR()) { + return Option.empty(); + } + + Option lastCleanInstant = table.getCleanTimeline().filterCompletedInstants().lastInstant(); + if (!lastCleanInstant.isPresent()) { + return Option.empty(); + } + + try { + HoodieCleanMetadata cleanMetadata = table.getActiveTimeline().readCleanMetadata(lastCleanInstant.get()); + String earliestCommitToRetain = cleanMetadata.getEarliestCommitToRetain(); + if (earliestCommitToRetain == null || earliestCommitToRetain.trim().isEmpty()) { + return Option.empty(); + } + + Option earliestInstantToRetain = + commitTimeline.findInstantsAfterOrEquals(earliestCommitToRetain).firstInstant(); + log.info("Blocking archival based on earliest commit to retain {} from last clean {}. Earliest instant to retain is {}", + earliestCommitToRetain, lastCleanInstant.get().requestedTime(), earliestInstantToRetain.map(instant -> instant).orElse(null)); + return earliestInstantToRetain; + } catch (IOException e) { + log.warn("Failed to read clean metadata for {}", lastCleanInstant.get(), e); + throw new HoodieIOException("Failed to read clean metadata for " + lastCleanInstant.get(), e); + } + } + private static Option getEarliestCommitToRetain( HoodieTableMetaClient metaClient, Option latestCommit, diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index e514e99f941fb..7408588da6a6c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -2380,8 +2380,7 @@ private List getActiveCommitTimes() { } /** - * Tests that TimelineArchiverV2 (LSM-based timeline, v9 tables) does NOT block archival on ECTR. - * ECTR blocking is only for v6 tables using TimelineArchiverV1. + * Tests that TimelineArchiverV2 also blocks archival on ECTR when the setting is enabled. */ @Test public void testArchivalBlocksOnCleanECTRWithTimelineArchiverV2AndVersion9() throws Exception { @@ -2405,12 +2404,12 @@ public void testArchivalBlocksOnCleanECTRWithTimelineArchiverV2AndVersion9() thr TimelineArchiverV2 archiver = new TimelineArchiverV2(writeConfig, table); archiver.archiveIfRequired(context); - // Then: TimelineArchiverV2 should NOT respect ECTR — commit 00000003 gets archived + // Then: TimelineArchiverV2 should respect ECTR and retain commit 00000003 and later commits. metaClient = HoodieTableMetaClient.reload(metaClient); List activeCommitTimes = getActiveCommitTimes(); - assertFalse(activeCommitTimes.contains("00000003"), - "TimelineArchiverV2: Commit 00000003 (ECTR) should be archived"); + assertTrue(activeCommitTimes.contains("00000003"), + "TimelineArchiverV2: Commit 00000003 (ECTR) should not be archived"); assertTrue(activeCommitTimes.contains("00000004"), "TimelineArchiverV2: Commit 00000004 (after ECTR) should not be archived"); assertTrue(activeCommitTimes.contains("00000005"), diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java index e0502956f2e54..f621053729755 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java @@ -51,6 +51,8 @@ import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN_OR_EQUALS; +import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN; +import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS; import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata; @@ -151,8 +153,18 @@ public static Option getEarliestCommitToRetain( } else if (cleaningPolicy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { ZonedDateTime latestDateTime = ZonedDateTime.ofInstant(latestInstant, timeZone.getZoneId()); String earliestTimeToRetain = TimelineUtils.formatDate(Date.from(latestDateTime.minusHours(hoursRetained).toInstant())); - earliestCommitToRetain = Option.fromJavaOptional(completedCommitsTimeline.getInstantsAsStream().filter(i -> compareTimestamps(i.requestedTime(), - GREATER_THAN_OR_EQUALS, earliestTimeToRetain)).findFirst()); + earliestCommitToRetain = completedCommitsTimeline.getReverseOrderedInstants() + .filter(i -> compareTimestamps(i.requestedTime(), LESSER_THAN_OR_EQUALS, earliestTimeToRetain)) + .findFirst() + .map(Option::of) + .orElse(completedCommitsTimeline.firstInstant()); + + Option earliestPendingCommit = commitsTimeline.filter(s -> !s.isCompleted()).firstInstant(); + if (earliestPendingCommit.isPresent() + && (!earliestCommitToRetain.isPresent() + || compareTimestamps(earliestPendingCommit.get().requestedTime(), LESSER_THAN, earliestCommitToRetain.get().requestedTime()))) { + earliestCommitToRetain = completedCommitsTimeline.findInstantsBefore(earliestPendingCommit.get().requestedTime()).lastInstant(); + } } // Apply maxCommitsToClean cap if configured and applicable diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCleanerUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCleanerUtils.java index ec7586157a944..3bba9cc83b5eb 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCleanerUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCleanerUtils.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2; import org.junit.jupiter.api.Test; @@ -32,6 +33,7 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.stream.Collectors; @@ -238,6 +240,51 @@ void testGetEarliestCommitToRetain_WithMaxCommitsToClean_KeepLatestByHours() { assertEquals(timestamps.get(10), result.get().requestedTime()); } + @Test + void testGetEarliestCommitToRetainByHoursUsesLatestCommitBeforeCutoff() { + ZonedDateTime nowUtc = ZonedDateTime.of(2026, 6, 19, 6, 30, 0, 0, ZoneId.of("UTC")); + List timestamps = new ArrayList<>(); + timestamps.add(formatInstant(nowUtc.minusHours(4).minusMinutes(30))); + timestamps.add(formatInstant(nowUtc.minusHours(2).minusMinutes(30))); + timestamps.add(formatInstant(nowUtc.minusHours(1).minusMinutes(30))); + + Option result = CleanerUtils.getEarliestCommitToRetain( + createMockTimelineWithTimestamps(timestamps), + HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS, + 12, + nowUtc.toInstant(), + 2, + HoodieTimelineTimeZone.UTC, + Option.empty(), + Long.MAX_VALUE); + + assertTrue(result.isPresent()); + assertEquals(timestamps.get(1), result.get().requestedTime()); + } + + @Test + void testGetEarliestCommitToRetainByHoursDoesNotPassEarliestPendingInstant() { + ZonedDateTime nowUtc = ZonedDateTime.of(2026, 6, 19, 6, 30, 0, 0, ZoneId.of("UTC")); + List completedTimestamps = new ArrayList<>(); + completedTimestamps.add(formatInstant(nowUtc.minusHours(4).minusMinutes(30))); + completedTimestamps.add(formatInstant(nowUtc.minusHours(2).minusMinutes(30))); + completedTimestamps.add(formatInstant(nowUtc.minusHours(1).minusMinutes(30))); + String pendingTimestamp = formatInstant(nowUtc.minusHours(3)); + + Option result = CleanerUtils.getEarliestCommitToRetain( + createMockTimelineWithTimestamps(completedTimestamps, Collections.singletonList(pendingTimestamp)), + HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS, + 12, + nowUtc.toInstant(), + 2, + HoodieTimelineTimeZone.UTC, + Option.empty(), + Long.MAX_VALUE); + + assertTrue(result.isPresent()); + assertEquals(completedTimestamps.get(0), result.get().requestedTime()); + } + /** * Helper method to create a mock timeline with specified number of commits. * Commits are named as "20000000000000", "20000000000001", etc. @@ -250,10 +297,18 @@ private HoodieTimeline createMockTimeline(int numCommits) { return createMockTimelineWithTimestamps(timestamps); } + private static String formatInstant(ZonedDateTime dateTime) { + return TimelineUtils.formatDate(Date.from(dateTime.toInstant())); + } + /** * Helper method to create a mock timeline with specified timestamps. */ private HoodieTimeline createMockTimelineWithTimestamps(List timestamps) { + return createMockTimelineWithTimestamps(timestamps, Collections.emptyList()); + } + + private HoodieTimeline createMockTimelineWithTimestamps(List timestamps, List pendingTimestamps) { int numCommits = timestamps.size(); HoodieTimeline timeline = mock(HoodieTimeline.class); HoodieTimeline completedTimeline = mock(HoodieTimeline.class); @@ -268,6 +323,12 @@ private HoodieTimeline createMockTimelineWithTimestamps(List timestamps) when(timeline.filterCompletedInstants()).thenReturn(completedTimeline); when(completedTimeline.countInstants()).thenReturn(numCommits); when(completedTimeline.getInstantsAsStream()).thenAnswer(invocation -> instants.stream()); + when(completedTimeline.getReverseOrderedInstants()).thenAnswer(invocation -> { + List reverseInstants = new ArrayList<>(instants); + Collections.reverse(reverseInstants); + return reverseInstants.stream(); + }); + when(completedTimeline.firstInstant()).thenReturn(instants.isEmpty() ? Option.empty() : Option.of(instants.get(0))); // Mock nthInstant to return the nth instant from the list for (int i = 0; i < numCommits; i++) { @@ -284,13 +345,21 @@ private HoodieTimeline createMockTimelineWithTimestamps(List timestamps) .filter(i -> i.requestedTime().compareTo(timestamp) < 0) .collect(Collectors.toList()); when(beforeTimeline.getInstantsAsStream()).thenAnswer(inv -> beforeInstants.stream()); + when(beforeTimeline.lastInstant()).thenReturn(beforeInstants.isEmpty() + ? Option.empty() + : Option.of(beforeInstants.get(beforeInstants.size() - 1))); return beforeTimeline; }); - // Mock filter for pending commits (empty for this test) - HoodieTimeline emptyTimeline = mock(HoodieTimeline.class); - when(emptyTimeline.firstInstant()).thenReturn(Option.empty()); - when(timeline.filter(org.mockito.ArgumentMatchers.any())).thenReturn(emptyTimeline); + List pendingInstants = pendingTimestamps.stream() + .map(timestamp -> new HoodieInstant(HoodieInstant.State.INFLIGHT, + HoodieTimeline.COMMIT_ACTION, timestamp, InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR)) + .collect(Collectors.toList()); + HoodieTimeline pendingTimeline = mock(HoodieTimeline.class); + when(pendingTimeline.firstInstant()).thenReturn(pendingInstants.isEmpty() + ? Option.empty() + : Option.of(pendingInstants.get(0))); + when(timeline.filter(org.mockito.ArgumentMatchers.any())).thenReturn(pendingTimeline); return timeline; } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 357fa51b603b1..eeb2073d736f7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -33,6 +33,8 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.hive.MultiPartKeysValueExtractor; @@ -1059,6 +1061,20 @@ public class FlinkOptions extends HoodieConfig { .withFallbackKeys("hoodie.clean.fileversions.retained") .withDescription("Number of file versions to retain. default 5"); + public static final ConfigOption CLEAN_MAX_COMMITS_TO_CLEAN = ConfigOptions + .key(HoodieCleanConfig.MAX_COMMITS_TO_CLEAN.key()) + .longType() + .defaultValue(Long.MAX_VALUE) + .withDescription("Maximum number of commits to clean in one clean commit. Applicable only when the clean policy is " + + "based on KEEP_LATEST_COMMITS or KEEP_LATEST_BY_HOURS."); + + public static final ConfigOption CLEAN_EMPTY_INTERVAL_HOURS = ConfigOptions + .key(HoodieCleanConfig.INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS.key()) + .longType() + .defaultValue(-1L) + .withDescription("Interval in hours to create an empty clean instant when incremental cleaning has no files to delete. " + + "Set to -1 to disable empty clean instant creation."); + public static final ConfigOption ARCHIVE_MAX_COMMITS = ConfigOptions .key("archive.max_commits") .intType() @@ -1071,6 +1087,12 @@ public class FlinkOptions extends HoodieConfig { .defaultValue(40)// default min 40 commits .withDescription("Min number of commits to keep before archiving older commits into a sequential log, default 40"); + public static final ConfigOption ARCHIVE_BLOCK_ON_CLEAN_ECTR = ConfigOptions + .key(HoodieArchivalConfig.BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR.key()) + .booleanType() + .defaultValue(false) + .withDescription("Whether archival should keep commits at or after the earliest commit to retain from the latest completed clean."); + // ------------------------------------------------------------------------ // Clustering Options // ------------------------------------------------------------------------ diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java index 078f6b6779a32..2241834482aa9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java @@ -206,9 +206,12 @@ public static HoodieWriteConfig getHoodieClientConfig( // actually Flink cleaning is always with parallelism 1 now .withCleanerParallelism(20) .withCleanerPolicy(HoodieCleaningPolicy.valueOf(conf.get(FlinkOptions.CLEAN_POLICY))) + .withMaxCommitsToClean(conf.get(FlinkOptions.CLEAN_MAX_COMMITS_TO_CLEAN)) + .withIntervalToCreateEmptyCleanHours(conf.get(FlinkOptions.CLEAN_EMPTY_INTERVAL_HOURS)) .build()) .withArchivalConfig(HoodieArchivalConfig.newBuilder() .archiveCommitsWith(conf.get(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.get(FlinkOptions.ARCHIVE_MAX_COMMITS)) + .withBlockArchivalOnCleanECTR(conf.get(FlinkOptions.ARCHIVE_BLOCK_ON_CLEAN_ECTR)) .build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withTargetIOPerCompactionInMB(conf.get(FlinkOptions.COMPACTION_TARGET_IO)) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java index 3910b91daebd1..04ade985a1994 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.EventTimeAvroPayload; +import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.PartialUpdateAvroPayload; @@ -280,6 +281,24 @@ void testHoodieClientConfigMetadataConfigConsistentWithGetMetadataConfig() throw assertEquals(directMetadataConfig.getBloomFilterType(), writeConfig.getMetadataConfig().getBloomFilterType()); } + @Test + void testCleanByTimeConfigsPropagateToWriteConfig() throws Exception { + conf.set(FlinkOptions.CLEAN_POLICY, HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.name()); + conf.set(FlinkOptions.CLEAN_RETAIN_HOURS, 48); + conf.set(FlinkOptions.CLEAN_MAX_COMMITS_TO_CLEAN, 7L); + conf.set(FlinkOptions.CLEAN_EMPTY_INTERVAL_HOURS, 2L); + conf.set(FlinkOptions.ARCHIVE_BLOCK_ON_CLEAN_ECTR, true); + StreamerUtil.initTableIfNotExists(conf); + + HoodieWriteConfig writeConfig = FlinkWriteClients.getHoodieClientConfig(conf, false, false); + + assertEquals(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS, writeConfig.getCleanerPolicy()); + assertEquals(48, writeConfig.getCleanerHoursRetained()); + assertEquals(7L, writeConfig.getMaxCommitsToClean()); + assertEquals(2L, writeConfig.getIntervalToCreateEmptyCleanHours()); + assertTrue(writeConfig.shouldBlockArchivalOnCleanECTR()); + } + @Test void testWriteMergeHandleForPreV9Table() throws Exception { conf.set(FlinkOptions.WRITE_TABLE_VERSION, HoodieTableVersion.EIGHT.versionCode()); From be3b5b569be702b0a2a7a2153f6a5c5df8ae67a9 Mon Sep 17 00:00:00 2001 From: fhan Date: Fri, 19 Jun 2026 16:19:50 +0800 Subject: [PATCH 2/2] feat(clean): Fix clean-by-hours ECTR boundary semantics --- .../java/org/apache/hudi/common/util/CleanerUtils.java | 9 +++------ .../org/apache/hudi/common/util/TestCleanerUtils.java | 4 ++-- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java index f621053729755..94c4544c3fdd0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java @@ -52,7 +52,6 @@ import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN_OR_EQUALS; import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN; -import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS; import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata; @@ -153,11 +152,9 @@ public static Option getEarliestCommitToRetain( } else if (cleaningPolicy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { ZonedDateTime latestDateTime = ZonedDateTime.ofInstant(latestInstant, timeZone.getZoneId()); String earliestTimeToRetain = TimelineUtils.formatDate(Date.from(latestDateTime.minusHours(hoursRetained).toInstant())); - earliestCommitToRetain = completedCommitsTimeline.getReverseOrderedInstants() - .filter(i -> compareTimestamps(i.requestedTime(), LESSER_THAN_OR_EQUALS, earliestTimeToRetain)) - .findFirst() - .map(Option::of) - .orElse(completedCommitsTimeline.firstInstant()); + earliestCommitToRetain = Option.fromJavaOptional(completedCommitsTimeline.getInstantsAsStream() + .filter(i -> compareTimestamps(i.requestedTime(), GREATER_THAN_OR_EQUALS, earliestTimeToRetain)) + .findFirst()); Option earliestPendingCommit = commitsTimeline.filter(s -> !s.isCompleted()).firstInstant(); if (earliestPendingCommit.isPresent() diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCleanerUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCleanerUtils.java index 3bba9cc83b5eb..305e66fa076fb 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCleanerUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCleanerUtils.java @@ -241,7 +241,7 @@ void testGetEarliestCommitToRetain_WithMaxCommitsToClean_KeepLatestByHours() { } @Test - void testGetEarliestCommitToRetainByHoursUsesLatestCommitBeforeCutoff() { + void testGetEarliestCommitToRetainByHoursUsesEarliestCommitAfterCutoff() { ZonedDateTime nowUtc = ZonedDateTime.of(2026, 6, 19, 6, 30, 0, 0, ZoneId.of("UTC")); List timestamps = new ArrayList<>(); timestamps.add(formatInstant(nowUtc.minusHours(4).minusMinutes(30))); @@ -259,7 +259,7 @@ void testGetEarliestCommitToRetainByHoursUsesLatestCommitBeforeCutoff() { Long.MAX_VALUE); assertTrue(result.isPresent()); - assertEquals(timestamps.get(1), result.get().requestedTime()); + assertEquals(timestamps.get(2), result.get().requestedTime()); } @Test