Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.hudi.client.timeline.versioning.v1;

import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.ArchivalMetrics;
Expand Down Expand Up @@ -54,7 +53,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
Expand All @@ -77,6 +75,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.client.utils.ArchivalUtils.getEarliestInstantToRetainForClean;
import static org.apache.hudi.client.utils.ArchivalUtils.getMinAndMaxInstantsToKeep;
import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
Expand Down Expand Up @@ -278,26 +277,7 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException {

// If enabled, block archival based on ECTR from the last completed clean to ensure we don't archive
// commits that have data files that haven't been cleaned yet.
Option<HoodieInstant> oldestInstantToRetainForClean = Option.empty();
if (config.shouldBlockArchivalOnCleanECTR()) {
Option<HoodieInstant> lastCleanInstant = table.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastCleanInstant.isPresent()) {
try {
HoodieCleanMetadata cleanMetadata =
table.getActiveTimeline().readCleanMetadata(lastCleanInstant.get());
if (cleanMetadata.getEarliestCommitToRetain() != null
&& !cleanMetadata.getEarliestCommitToRetain().trim().isEmpty()) {
oldestInstantToRetainForClean = commitTimeline.findInstantsAfterOrEquals(
cleanMetadata.getEarliestCommitToRetain()).firstInstant();
log.info("Blocking archival based on earliest commit to retain {} from last clean {}. Oldest to retain is {}",
cleanMetadata.getEarliestCommitToRetain(), lastCleanInstant.get().requestedTime(), oldestInstantToRetainForClean.map(instant -> instant).orElse(null));
}
} catch (IOException e) {
log.warn("Failed to read clean metadata for {}", lastCleanInstant.get(), e);
throw new HoodieIOException("Failed to read clean metadata for " + lastCleanInstant.get(), e);
}
}
}
Option<HoodieInstant> oldestInstantToRetainForClean = getEarliestInstantToRetainForClean(table, commitTimeline, config);

// Actually do the commits
Option<HoodieInstant> finalOldestInstantToRetainForClean = oldestInstantToRetainForClean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.client.utils.ArchivalUtils.getEarliestInstantToRetainForClean;
import static org.apache.hudi.client.utils.ArchivalUtils.getMinAndMaxInstantsToKeep;
import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
Expand Down Expand Up @@ -236,7 +237,11 @@ private List<HoodieInstant> getCommitInstantsToArchive() throws IOException {
config.getCleanerPolicy());
earliestInstantToRetainCandidates.add(earliestInstantToRetainForClustering);

// 4. If metadata table is enabled, do not archive instants which are more recent than the last compaction on the
// 4. If enabled, block archival based on ECTR from the last completed clean to ensure we don't archive
// commits that have data files that haven't been cleaned yet.
earliestInstantToRetainCandidates.add(getEarliestInstantToRetainForClean(table, completedCommitsTimeline, config));

// 5. If metadata table is enabled, do not archive instants which are more recent than the last compaction on the
// metadata table.
if (config.isMetadataTableEnabled()
&& table.getMetaClient().getTableConfig().isMetadataTableAvailable()
Expand All @@ -256,7 +261,7 @@ private List<HoodieInstant> getCommitInstantsToArchive() throws IOException {
}
}

// 5. If this is a metadata table, do not archive the commits that live in data set
// 6. If this is a metadata table, do not archive the commits that live in data set
// active timeline. This is required by metadata table,
// see HoodieTableMetadataUtil#processRollbackMetadata for details.
if (table.isMetadataTable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package org.apache.hudi.client.utils;

import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand All @@ -28,10 +29,12 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.text.ParseException;
import java.time.Instant;

Expand Down Expand Up @@ -110,6 +113,37 @@ public static Pair<Integer,Integer> getMinAndMaxInstantsToKeep(HoodieTable<?, ?,
return Pair.of(minInstantsToKeep, maxInstantsToKeep);
}

public static Option<HoodieInstant> getEarliestInstantToRetainForClean(
HoodieTable<?, ?, ?, ?> table,
HoodieTimeline commitTimeline,
HoodieWriteConfig config) {
if (!config.shouldBlockArchivalOnCleanECTR()) {
return Option.empty();
}

Option<HoodieInstant> lastCleanInstant = table.getCleanTimeline().filterCompletedInstants().lastInstant();
if (!lastCleanInstant.isPresent()) {
return Option.empty();
}

try {
HoodieCleanMetadata cleanMetadata = table.getActiveTimeline().readCleanMetadata(lastCleanInstant.get());
String earliestCommitToRetain = cleanMetadata.getEarliestCommitToRetain();
if (earliestCommitToRetain == null || earliestCommitToRetain.trim().isEmpty()) {
return Option.empty();
}

Option<HoodieInstant> earliestInstantToRetain =
commitTimeline.findInstantsAfterOrEquals(earliestCommitToRetain).firstInstant();
log.info("Blocking archival based on earliest commit to retain {} from last clean {}. Earliest instant to retain is {}",
earliestCommitToRetain, lastCleanInstant.get().requestedTime(), earliestInstantToRetain.map(instant -> instant).orElse(null));
return earliestInstantToRetain;
} catch (IOException e) {
log.warn("Failed to read clean metadata for {}", lastCleanInstant.get(), e);
throw new HoodieIOException("Failed to read clean metadata for " + lastCleanInstant.get(), e);
}
}

private static Option<HoodieInstant> getEarliestCommitToRetain(
HoodieTableMetaClient metaClient,
Option<HoodieInstant> latestCommit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2380,8 +2380,7 @@ private List<String> getActiveCommitTimes() {
}

/**
* Tests that TimelineArchiverV2 (LSM-based timeline, v9 tables) does NOT block archival on ECTR.
* ECTR blocking is only for v6 tables using TimelineArchiverV1.
* Tests that TimelineArchiverV2 also blocks archival on ECTR when the setting is enabled.
*/
@Test
public void testArchivalBlocksOnCleanECTRWithTimelineArchiverV2AndVersion9() throws Exception {
Expand All @@ -2405,12 +2404,12 @@ public void testArchivalBlocksOnCleanECTRWithTimelineArchiverV2AndVersion9() thr
TimelineArchiverV2 archiver = new TimelineArchiverV2(writeConfig, table);
archiver.archiveIfRequired(context);

// Then: TimelineArchiverV2 should NOT respect ECTR commit 00000003 gets archived
// Then: TimelineArchiverV2 should respect ECTR and retain commit 00000003 and later commits.
metaClient = HoodieTableMetaClient.reload(metaClient);
List<String> activeCommitTimes = getActiveCommitTimes();

assertFalse(activeCommitTimes.contains("00000003"),
"TimelineArchiverV2: Commit 00000003 (ECTR) should be archived");
assertTrue(activeCommitTimes.contains("00000003"),
"TimelineArchiverV2: Commit 00000003 (ECTR) should not be archived");
assertTrue(activeCommitTimes.contains("00000004"),
"TimelineArchiverV2: Commit 00000004 (after ECTR) should not be archived");
assertTrue(activeCommitTimes.contains("00000005"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata;

Expand Down Expand Up @@ -151,8 +152,16 @@ public static Option<HoodieInstant> getEarliestCommitToRetain(
} else if (cleaningPolicy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
ZonedDateTime latestDateTime = ZonedDateTime.ofInstant(latestInstant, timeZone.getZoneId());
String earliestTimeToRetain = TimelineUtils.formatDate(Date.from(latestDateTime.minusHours(hoursRetained).toInstant()));
earliestCommitToRetain = Option.fromJavaOptional(completedCommitsTimeline.getInstantsAsStream().filter(i -> compareTimestamps(i.requestedTime(),
GREATER_THAN_OR_EQUALS, earliestTimeToRetain)).findFirst());
earliestCommitToRetain = Option.fromJavaOptional(completedCommitsTimeline.getInstantsAsStream()
.filter(i -> compareTimestamps(i.requestedTime(), GREATER_THAN_OR_EQUALS, earliestTimeToRetain))
.findFirst());

Option<HoodieInstant> earliestPendingCommit = commitsTimeline.filter(s -> !s.isCompleted()).firstInstant();
if (earliestPendingCommit.isPresent()
&& (!earliestCommitToRetain.isPresent()
|| compareTimestamps(earliestPendingCommit.get().requestedTime(), LESSER_THAN, earliestCommitToRetain.get().requestedTime()))) {
earliestCommitToRetain = completedCommitsTimeline.findInstantsBefore(earliestPendingCommit.get().requestedTime()).lastInstant();
}
}

// Apply maxCommitsToClean cap if configured and applicable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;

import org.junit.jupiter.api.Test;
Expand All @@ -32,6 +33,7 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -238,6 +240,51 @@ void testGetEarliestCommitToRetain_WithMaxCommitsToClean_KeepLatestByHours() {
assertEquals(timestamps.get(10), result.get().requestedTime());
}

@Test
void testGetEarliestCommitToRetainByHoursUsesEarliestCommitAfterCutoff() {
ZonedDateTime nowUtc = ZonedDateTime.of(2026, 6, 19, 6, 30, 0, 0, ZoneId.of("UTC"));
List<String> timestamps = new ArrayList<>();
timestamps.add(formatInstant(nowUtc.minusHours(4).minusMinutes(30)));
timestamps.add(formatInstant(nowUtc.minusHours(2).minusMinutes(30)));
timestamps.add(formatInstant(nowUtc.minusHours(1).minusMinutes(30)));

Option<HoodieInstant> result = CleanerUtils.getEarliestCommitToRetain(
createMockTimelineWithTimestamps(timestamps),
HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS,
12,
nowUtc.toInstant(),
2,
HoodieTimelineTimeZone.UTC,
Option.empty(),
Long.MAX_VALUE);

assertTrue(result.isPresent());
assertEquals(timestamps.get(2), result.get().requestedTime());
}

@Test
void testGetEarliestCommitToRetainByHoursDoesNotPassEarliestPendingInstant() {
ZonedDateTime nowUtc = ZonedDateTime.of(2026, 6, 19, 6, 30, 0, 0, ZoneId.of("UTC"));
List<String> completedTimestamps = new ArrayList<>();
completedTimestamps.add(formatInstant(nowUtc.minusHours(4).minusMinutes(30)));
completedTimestamps.add(formatInstant(nowUtc.minusHours(2).minusMinutes(30)));
completedTimestamps.add(formatInstant(nowUtc.minusHours(1).minusMinutes(30)));
String pendingTimestamp = formatInstant(nowUtc.minusHours(3));

Option<HoodieInstant> result = CleanerUtils.getEarliestCommitToRetain(
createMockTimelineWithTimestamps(completedTimestamps, Collections.singletonList(pendingTimestamp)),
HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS,
12,
nowUtc.toInstant(),
2,
HoodieTimelineTimeZone.UTC,
Option.empty(),
Long.MAX_VALUE);

assertTrue(result.isPresent());
assertEquals(completedTimestamps.get(0), result.get().requestedTime());
}

/**
* Helper method to create a mock timeline with specified number of commits.
* Commits are named as "20000000000000", "20000000000001", etc.
Expand All @@ -250,10 +297,18 @@ private HoodieTimeline createMockTimeline(int numCommits) {
return createMockTimelineWithTimestamps(timestamps);
}

private static String formatInstant(ZonedDateTime dateTime) {
return TimelineUtils.formatDate(Date.from(dateTime.toInstant()));
}

/**
* Helper method to create a mock timeline with specified timestamps.
*/
private HoodieTimeline createMockTimelineWithTimestamps(List<String> timestamps) {
return createMockTimelineWithTimestamps(timestamps, Collections.emptyList());
}

private HoodieTimeline createMockTimelineWithTimestamps(List<String> timestamps, List<String> pendingTimestamps) {
int numCommits = timestamps.size();
HoodieTimeline timeline = mock(HoodieTimeline.class);
HoodieTimeline completedTimeline = mock(HoodieTimeline.class);
Expand All @@ -268,6 +323,12 @@ private HoodieTimeline createMockTimelineWithTimestamps(List<String> timestamps)
when(timeline.filterCompletedInstants()).thenReturn(completedTimeline);
when(completedTimeline.countInstants()).thenReturn(numCommits);
when(completedTimeline.getInstantsAsStream()).thenAnswer(invocation -> instants.stream());
when(completedTimeline.getReverseOrderedInstants()).thenAnswer(invocation -> {
List<HoodieInstant> reverseInstants = new ArrayList<>(instants);
Collections.reverse(reverseInstants);
return reverseInstants.stream();
});
when(completedTimeline.firstInstant()).thenReturn(instants.isEmpty() ? Option.empty() : Option.of(instants.get(0)));

// Mock nthInstant to return the nth instant from the list
for (int i = 0; i < numCommits; i++) {
Expand All @@ -284,13 +345,21 @@ private HoodieTimeline createMockTimelineWithTimestamps(List<String> timestamps)
.filter(i -> i.requestedTime().compareTo(timestamp) < 0)
.collect(Collectors.toList());
when(beforeTimeline.getInstantsAsStream()).thenAnswer(inv -> beforeInstants.stream());
when(beforeTimeline.lastInstant()).thenReturn(beforeInstants.isEmpty()
? Option.empty()
: Option.of(beforeInstants.get(beforeInstants.size() - 1)));
return beforeTimeline;
});

// Mock filter for pending commits (empty for this test)
HoodieTimeline emptyTimeline = mock(HoodieTimeline.class);
when(emptyTimeline.firstInstant()).thenReturn(Option.empty());
when(timeline.filter(org.mockito.ArgumentMatchers.any())).thenReturn(emptyTimeline);
List<HoodieInstant> pendingInstants = pendingTimestamps.stream()
.map(timestamp -> new HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, timestamp, InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR))
.collect(Collectors.toList());
HoodieTimeline pendingTimeline = mock(HoodieTimeline.class);
when(pendingTimeline.firstInstant()).thenReturn(pendingInstants.isEmpty()
? Option.empty()
: Option.of(pendingInstants.get(0)));
when(timeline.filter(org.mockito.ArgumentMatchers.any())).thenReturn(pendingTimeline);

return timeline;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
Expand Down Expand Up @@ -1059,6 +1061,20 @@ public class FlinkOptions extends HoodieConfig {
.withFallbackKeys("hoodie.clean.fileversions.retained")
.withDescription("Number of file versions to retain. default 5");

public static final ConfigOption<Long> CLEAN_MAX_COMMITS_TO_CLEAN = ConfigOptions
.key(HoodieCleanConfig.MAX_COMMITS_TO_CLEAN.key())
.longType()
.defaultValue(Long.MAX_VALUE)
.withDescription("Maximum number of commits to clean in one clean commit. Applicable only when the clean policy is "
+ "based on KEEP_LATEST_COMMITS or KEEP_LATEST_BY_HOURS.");

public static final ConfigOption<Long> CLEAN_EMPTY_INTERVAL_HOURS = ConfigOptions
.key(HoodieCleanConfig.INTERVAL_TO_CREATE_EMPTY_CLEAN_HOURS.key())
.longType()
.defaultValue(-1L)
.withDescription("Interval in hours to create an empty clean instant when incremental cleaning has no files to delete. "
+ "Set to -1 to disable empty clean instant creation.");

public static final ConfigOption<Integer> ARCHIVE_MAX_COMMITS = ConfigOptions
.key("archive.max_commits")
.intType()
Expand All @@ -1071,6 +1087,12 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(40)// default min 40 commits
.withDescription("Min number of commits to keep before archiving older commits into a sequential log, default 40");

public static final ConfigOption<Boolean> ARCHIVE_BLOCK_ON_CLEAN_ECTR = ConfigOptions
.key(HoodieArchivalConfig.BLOCK_ARCHIVAL_ON_LATEST_CLEAN_ECTR.key())
.booleanType()
.defaultValue(false)
.withDescription("Whether archival should keep commits at or after the earliest commit to retain from the latest completed clean.");

// ------------------------------------------------------------------------
// Clustering Options
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,12 @@ public static HoodieWriteConfig getHoodieClientConfig(
// actually Flink cleaning is always with parallelism 1 now
.withCleanerParallelism(20)
.withCleanerPolicy(HoodieCleaningPolicy.valueOf(conf.get(FlinkOptions.CLEAN_POLICY)))
.withMaxCommitsToClean(conf.get(FlinkOptions.CLEAN_MAX_COMMITS_TO_CLEAN))
.withIntervalToCreateEmptyCleanHours(conf.get(FlinkOptions.CLEAN_EMPTY_INTERVAL_HOURS))
.build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
.archiveCommitsWith(conf.get(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.get(FlinkOptions.ARCHIVE_MAX_COMMITS))
.withBlockArchivalOnCleanECTR(conf.get(FlinkOptions.ARCHIVE_BLOCK_ON_CLEAN_ECTR))
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withTargetIOPerCompactionInMB(conf.get(FlinkOptions.COMPACTION_TARGET_IO))
Expand Down
Loading
Loading