From f0b63593dedd5c919d9567e78373da2c9ec7e3e6 Mon Sep 17 00:00:00 2001 From: danny0405 Date: Fri, 12 Jun 2026 12:42:59 +0800 Subject: [PATCH 1/2] feat:(DNM) add a lsm-tree based FG reader --- .../org/apache/hudi/common/fs/FSUtils.java | 82 +++- .../hudi/common/model/HoodieLogFile.java | 12 + .../read/lsm/HoodieLsmFileGroupReader.java | 254 ++++++++++ .../read/lsm/LsmFileGroupRecordIterator.java | 457 ++++++++++++++++++ .../view/AbstractTableFileSystemView.java | 6 +- .../hudi/common/model/TestHoodieLogFile.java | 45 ++ 6 files changed, 843 insertions(+), 13 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/HoodieLsmFileGroupReader.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 1b5f3ea62e787..74b3fbfc850be 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -34,7 +34,6 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieValidationException; -import org.apache.hudi.exception.InvalidHoodieFileNameException; import org.apache.hudi.exception.InvalidHoodiePathException; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.storage.HoodieStorage; @@ -79,6 +78,8 @@ public class FSUtils { public static final String PATH_SEPARATOR = "/"; public static final Pattern LOG_FILE_PATTERN = Pattern.compile("^\\.([^._]+)_([^.]*)\\.(log|archive)\\.(\\d+)(_((\\d+)-(\\d+)-(\\d+))(\\.cdc)?)?$"); + public static final Pattern NATIVE_LOG_FILE_PATTERN = + Pattern.compile("^([^_]+)_((\\d+)-(\\d+)-(\\d+))_([^_]+)_(\\d+)(\\.delete)?\\.(parquet)$"); public static final Pattern PREFIX_BY_FILE_ID_PATTERN = Pattern.compile("^(.+)-(\\d+)"); private static final Pattern BASE_FILE_PATTERN = Pattern.compile("[a-zA-Z0-9-]+_[a-zA-Z0-9-]+_[0-9]+\\.[a-zA-Z0-9]+"); @@ -131,6 +132,10 @@ public static String maskWithoutFileId(String instantTime, int taskPartitionId) public static String getCommitTime(String fullFileName) { try { + Option nativeLogMatcher = matchNativeLogFile(fullFileName); + if (nativeLogMatcher.isPresent()) { + return nativeLogMatcher.get().group(6); + } if (isLogFile(fullFileName)) { return fullFileName.split("_")[1].split("\\.", 2)[0]; } @@ -328,6 +333,10 @@ public static StoragePath getAbsoluteFilePath(StoragePath basePath, String parti * Get the file extension from the log file. */ public static String getFileExtensionFromLog(StoragePath logPath) { + Option nativeLogMatcher = matchNativeLogFile(logPath.getName()); + if (nativeLogMatcher.isPresent()) { + return nativeLogMatcher.get().group(9); + } Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName()); if (!matcher.matches()) { throw new InvalidHoodiePathException(logPath.toString(), "LogFile"); @@ -336,22 +345,19 @@ public static String getFileExtensionFromLog(StoragePath logPath) { } public static String getFileIdFromFileName(String fileName) { - if (FSUtils.isLogFile(fileName)) { - Matcher matcher = LOG_FILE_PATTERN.matcher(fileName); - if (!matcher.matches()) { - throw new InvalidHoodieFileNameException(fileName, "LogFile"); - } - return matcher.group(1); + Option logFileMatcher = matchLogFile(fileName); + if (logFileMatcher.isPresent()) { + return logFileMatcher.get().group(1); } return FSUtils.getFileId(fileName); } public static String getFileIdFromLogPath(StoragePath path) { - Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); - if (!matcher.matches()) { + Option logFileMatcher = matchLogFile(path.getName()); + if (!logFileMatcher.isPresent()) { throw new InvalidHoodiePathException(path, "LogFile"); } - return matcher.group(1); + return logFileMatcher.get().group(1); } public static String getFileIdFromFilePath(StoragePath filePath) { @@ -365,6 +371,10 @@ public static String getFileIdFromFilePath(StoragePath filePath) { * Get the second part of the file name in the log file. That will be the delta commit time. */ public static String getDeltaCommitTimeFromLogPath(StoragePath path) { + Option nativeLogMatcher = matchNativeLogFile(path.getName()); + if (nativeLogMatcher.isPresent()) { + return nativeLogMatcher.get().group(6); + } Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); if (!matcher.matches()) { throw new InvalidHoodiePathException(path.toString(), "LogFile"); @@ -376,6 +386,10 @@ public static String getDeltaCommitTimeFromLogPath(StoragePath path) { * Get TaskPartitionId used in log-path. */ public static Integer getTaskPartitionIdFromLogPath(StoragePath path) { + Option nativeLogMatcher = matchNativeLogFile(path.getName()); + if (nativeLogMatcher.isPresent()) { + return Integer.parseInt(nativeLogMatcher.get().group(3)); + } Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); if (!matcher.matches()) { throw new InvalidHoodiePathException(path.toString(), "LogFile"); @@ -388,6 +402,10 @@ public static Integer getTaskPartitionIdFromLogPath(StoragePath path) { * Get Write-Token used in log-path. */ public static String getWriteTokenFromLogPath(StoragePath path) { + Option nativeLogMatcher = matchNativeLogFile(path.getName()); + if (nativeLogMatcher.isPresent()) { + return nativeLogMatcher.get().group(2); + } Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); if (!matcher.matches()) { throw new InvalidHoodiePathException(path.toString(), "LogFile"); @@ -399,6 +417,10 @@ public static String getWriteTokenFromLogPath(StoragePath path) { * Get StageId used in log-path. */ public static Integer getStageIdFromLogPath(StoragePath path) { + Option nativeLogMatcher = matchNativeLogFile(path.getName()); + if (nativeLogMatcher.isPresent()) { + return Integer.parseInt(nativeLogMatcher.get().group(4)); + } Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); if (!matcher.matches()) { throw new InvalidHoodiePathException(path.toString(), "LogFile"); @@ -411,6 +433,10 @@ public static Integer getStageIdFromLogPath(StoragePath path) { * Get Task Attempt Id used in log-path. */ public static Integer getTaskAttemptIdFromLogPath(StoragePath path) { + Option nativeLogMatcher = matchNativeLogFile(path.getName()); + if (nativeLogMatcher.isPresent()) { + return Integer.parseInt(nativeLogMatcher.get().group(5)); + } Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); if (!matcher.matches()) { throw new InvalidHoodiePathException(path.toString(), "LogFile"); @@ -427,6 +453,10 @@ public static int getFileVersionFromLog(StoragePath logPath) { } public static int getFileVersionFromLog(String logFileName) { + Option nativeLogMatcher = matchNativeLogFile(logFileName); + if (nativeLogMatcher.isPresent()) { + return Integer.parseInt(nativeLogMatcher.get().group(7)); + } Matcher matcher = LOG_FILE_PATTERN.matcher(logFileName); if (!matcher.matches()) { throw new HoodieIOException("Invalid log file name: " + logFileName); @@ -443,6 +473,9 @@ public static String makeLogFileName(String fileId, String logFileExtension, Str } public static boolean isBaseFile(StoragePath path) { + if (matchNativeLogFile(path.getName()).isPresent()) { + return false; + } String extension = getFileExtension(path.getName()); if (HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(extension)) { return BASE_FILE_PATTERN.matcher(path.getName()).matches(); @@ -466,6 +499,9 @@ public static boolean isLogFile(StoragePath logPath) { } public static boolean isLogFile(String fileName) { + if (matchNativeLogFile(fileName).isPresent()) { + return true; + } if (fileName.startsWith(LOG_FILE_START_WITH_CHARACTER)) { Matcher matcher = LOG_FILE_PATTERN.matcher(fileName); return matcher.matches() && matcher.group(3).equals(LOG_FILE_EXTENSION); @@ -473,6 +509,32 @@ public static boolean isLogFile(String fileName) { return false; } + public static Option matchNativeLogFile(String fileName) { + if (StringUtils.isNullOrEmpty(fileName)) { + return Option.empty(); + } + String actualFileName = fileName.contains(StoragePath.SEPARATOR) + ? fileName.substring(fileName.lastIndexOf(StoragePath.SEPARATOR) + 1) + : fileName; + Matcher matcher = NATIVE_LOG_FILE_PATTERN.matcher(actualFileName); + return matcher.matches() ? Option.of(matcher) : Option.empty(); + } + + public static boolean isNativeDeleteLogFile(String fileName) { + return matchNativeLogFile(fileName).map(matcher -> matcher.group(8) != null).orElse(false); + } + + private static Option matchLogFile(String fileName) { + Option nativeLogMatcher = matchNativeLogFile(fileName); + if (nativeLogMatcher.isPresent()) { + return nativeLogMatcher; + } + Matcher matcher = LOG_FILE_PATTERN.matcher(fileName); + return matcher.matches() && matcher.group(3).equals(LOG_FILE_EXTENSION) + ? Option.of(matcher) + : Option.empty(); + } + public static boolean isDataFile(StoragePath path) { return isBaseFile(path) || isLogFile(path); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java index 20fb395d401e9..77ad3ff2752e3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.cdc.HoodieCDCUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.InvalidHoodiePathException; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; @@ -109,6 +110,17 @@ private HoodieLogFile(StoragePathInfo pathInfo, StoragePath logPath, String logP } private void parseFieldsFromPath() { + Option nativeLogMatcherOpt = FSUtils.matchNativeLogFile(getPath().getName()); + if (nativeLogMatcherOpt.isPresent()) { + Matcher matcher = nativeLogMatcherOpt.get(); + this.fileId = matcher.group(1); + this.deltaCommitTime = matcher.group(6); + this.fileExtension = matcher.group(9); + this.logVersion = Integer.parseInt(matcher.group(7)); + this.logWriteToken = matcher.group(2); + this.suffix = matcher.group(8) == null ? "" : matcher.group(8); + return; + } Matcher matcher = LOG_FILE_PATTERN.matcher(getPath().getName()); if (!matcher.matches()) { throw new InvalidHoodiePathException(path, "LogFile"); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/HoodieLsmFileGroupReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/HoodieLsmFileGroupReader.java new file mode 100644 index 0000000000000..83f72c66a08e8 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/HoodieLsmFileGroupReader.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read.lsm; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.read.BaseFileUpdateCallback; +import org.apache.hudi.common.table.read.BufferedRecord; +import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler; +import org.apache.hudi.common.table.read.HoodieReadStats; +import org.apache.hudi.common.table.read.InputSplit; +import org.apache.hudi.common.table.read.IteratorMode; +import org.apache.hudi.common.table.read.ParquetRowIndexBasedSchemaHandler; +import org.apache.hudi.common.table.read.ReaderParameters; +import org.apache.hudi.common.util.ConfigUtils; +import org.apache.hudi.common.util.HoodieRecordUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; + +import lombok.Builder; +import lombok.Getter; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.function.UnaryOperator; +import java.util.stream.Stream; + +/** + * A file group reader for LSM file groups backed by native parquet log files. + * + *

This reader is intentionally separate from {@code HoodieFileGroupReader}. Callers opt into + * this reader when they know the file group follows LSM sorted-file semantics. + */ +public final class HoodieLsmFileGroupReader implements Closeable { + + private final HoodieReaderContext readerContext; + private final HoodieTableMetaClient metaClient; + private final InputSplit inputSplit; + private final List orderingFieldNames; + private final HoodieStorage storage; + private final TypedProperties props; + private final ReaderParameters readerParameters; + private final Option> outputConverter; + private final Option> fileGroupUpdateCallback; + private ClosableIterator> lsmRecordIterator; + @Getter + private final HoodieReadStats readStats; + + @Builder(setterPrefix = "with") + private HoodieLsmFileGroupReader( + HoodieReaderContext readerContext, + String latestCommitTime, + HoodieSchema dataSchema, + HoodieSchema requestedSchema, + Option internalSchemaOpt, + HoodieTableMetaClient hoodieTableMetaClient, + TypedProperties props, + Option baseFileOption, + Stream logFiles, + String partitionPath, + Long start, + Long length, + Boolean allowInflightInstants, + Boolean emitDelete, + Option> fileGroupUpdateCallback) { + + ValidationUtils.checkArgument(readerContext != null, "Reader context is required"); + ValidationUtils.checkArgument(hoodieTableMetaClient != null, "Hoodie table meta client is required"); + ValidationUtils.checkArgument(latestCommitTime != null, "Latest commit time is required"); + ValidationUtils.checkArgument(dataSchema != null, "Data schema is required"); + ValidationUtils.checkArgument(requestedSchema != null, "Requested schema is required"); + ValidationUtils.checkArgument(props != null, "Props is required"); + ValidationUtils.checkArgument(partitionPath != null, "Partition path is required"); + ValidationUtils.checkArgument(hoodieTableMetaClient.getTableConfig().getLogFileFormat() == HoodieFileFormat.PARQUET, + "LSM file group reader expects parquet log files"); + + if (internalSchemaOpt == null) { + internalSchemaOpt = Option.empty(); + } + if (baseFileOption == null) { + baseFileOption = Option.empty(); + } + if (start == null) { + start = 0L; + } + if (length == null) { + length = Long.MAX_VALUE; + } + if (allowInflightInstants == null) { + allowInflightInstants = false; + } + if (emitDelete == null) { + emitDelete = false; + } + if (fileGroupUpdateCallback == null) { + fileGroupUpdateCallback = Option.empty(); + } + + String tablePath = hoodieTableMetaClient.getBasePath().toString(); + HoodieStorage storage = hoodieTableMetaClient.getStorage().newInstance(new StoragePath(tablePath), readerContext.getStorageConfiguration()); + + this.readerParameters = ReaderParameters.builder() + .shouldUseRecordPosition(false) + .emitDeletes(emitDelete) + .sortOutputs(false) + .inflightInstantsAllowed(allowInflightInstants) + .build(); + this.inputSplit = InputSplit.builder() + .baseFileOption(baseFileOption) + .logFileStream(logFiles) + .partitionPath(partitionPath) + .start(start) + .length(length) + .build(); + + this.readerContext = readerContext; + this.fileGroupUpdateCallback = fileGroupUpdateCallback; + this.metaClient = hoodieTableMetaClient; + this.storage = storage; + + readerContext.setHasLogFiles(this.inputSplit.hasLogFiles()); + readerContext.getRecordContext().setPartitionPath(inputSplit.getPartitionPath()); + if (readerContext.getHasLogFiles() && inputSplit.getStart() != 0) { + throw new IllegalArgumentException("LSM file group reader is doing log file merge but not reading from the start of the base file"); + } + HoodieTableConfig tableConfig = hoodieTableMetaClient.getTableConfig(); + this.props = ConfigUtils.getMergeProps(props, tableConfig); + readerContext.initRecordMerger(props); + readerContext.setTablePath(tablePath); + readerContext.setLatestCommitTime(latestCommitTime); + readerContext.setShouldMergeUseRecordPosition(false); + readerContext.setHasBootstrapBaseFile(inputSplit.getBaseFileOption().flatMap(HoodieBaseFile::getBootstrapBaseFile).isPresent()); + readerContext.setSchemaHandler(readerContext.getRecordContext().supportsParquetRowIndex() + ? new ParquetRowIndexBasedSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, props, metaClient) + : new FileGroupReaderSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, props, metaClient)); + this.outputConverter = readerContext.getSchemaHandler().getOutputConverter(); + this.orderingFieldNames = HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(), hoodieTableMetaClient); + this.readStats = new HoodieReadStats(); + } + + private ClosableIterator> getBufferedRecordIterator(IteratorMode iteratorMode, + boolean includeBaseFile) throws IOException { + this.readerContext.setIteratorMode(iteratorMode); + this.lsmRecordIterator = new LsmFileGroupRecordIterator<>( + readerContext, storage, inputSplit, orderingFieldNames, metaClient, props, readerParameters, readStats, fileGroupUpdateCallback, includeBaseFile); + return new HoodieLsmFileGroupReaderIterator<>(this); + } + + public ClosableIterator> getClosableBufferedRecordIterator() throws IOException { + return getBufferedRecordIterator(IteratorMode.HOODIE_RECORD, true); + } + + public ClosableIterator getClosableIterator() throws IOException { + return new CloseableMappingIterator<>(getBufferedRecordIterator(IteratorMode.ENGINE_RECORD, true), BufferedRecord::getRecord); + } + + public ClosableIterator> getClosableHoodieRecordIterator() throws IOException { + return new CloseableMappingIterator<>(getBufferedRecordIterator(IteratorMode.HOODIE_RECORD, true), + bufferedRecord -> readerContext.getRecordContext().constructFinalHoodieRecord(bufferedRecord)); + } + + public ClosableIterator getClosableKeyIterator() throws IOException { + return new CloseableMappingIterator<>(getBufferedRecordIterator(IteratorMode.RECORD_KEY, true), BufferedRecord::getRecordKey); + } + + public ClosableIterator> getLogRecordsOnly() throws IOException { + return getBufferedRecordIterator(IteratorMode.HOODIE_RECORD, false); + } + + boolean hasNext() { + return lsmRecordIterator.hasNext(); + } + + BufferedRecord next() { + BufferedRecord nextVal = lsmRecordIterator.next(); + if (outputConverter.isPresent()) { + return nextVal.project(outputConverter.get()); + } + return nextVal; + } + + public void onWriteFailure(String recordKey) { + this.fileGroupUpdateCallback.ifPresent(callback -> callback.onFailure(recordKey)); + } + + @Override + public void close() throws IOException { + if (lsmRecordIterator != null) { + lsmRecordIterator.close(); + } + } + + private static class HoodieLsmFileGroupReaderIterator implements ClosableIterator> { + private HoodieLsmFileGroupReader reader; + + private HoodieLsmFileGroupReaderIterator(HoodieLsmFileGroupReader reader) { + this.reader = reader; + } + + @Override + public boolean hasNext() { + return reader.hasNext(); + } + + @Override + public BufferedRecord next() { + return reader.next(); + } + + @Override + public void close() { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + throw new HoodieIOException("Failed to close the reader", e); + } finally { + this.reader = null; + } + } + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java new file mode 100644 index 0000000000000..c9ebceafdf8ce --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read.lsm; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.read.BaseFileUpdateCallback; +import org.apache.hudi.common.table.read.BufferedRecord; +import org.apache.hudi.common.table.read.BufferedRecordMerger; +import org.apache.hudi.common.table.read.BufferedRecordMergerFactory; +import org.apache.hudi.common.table.read.BufferedRecords; +import org.apache.hudi.common.table.read.HoodieReadStats; +import org.apache.hudi.common.table.read.InputSplit; +import org.apache.hudi.common.table.read.ReaderParameters; +import org.apache.hudi.common.table.read.UpdateProcessor; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.OrderingValues; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.function.UnaryOperator; + +import static org.apache.hudi.common.schema.HoodieSchemaCompatibility.areSchemasProjectionEquivalent; + +/** + * Streaming sorted-merge reader for LSM file groups whose delta files are parquet files. + * + *

Each input file is expected to be sorted by record key. The iterator keeps one record from + * each file in memory, merges all versions for the same key with the regular file-group reader + * merge semantics, and emits the final row. + */ +public class LsmFileGroupRecordIterator implements ClosableIterator> { + + private static final String DELETE_LOG_RECORD_KEY_FIELD = "record_key"; + private static final HoodieSchema DELETE_LOG_SCHEMA = HoodieSchema.parse( + "{" + + "\"type\":\"record\"," + + "\"name\":\"hudi_delete_log_record\"," + + "\"fields\":[" + + "{\"name\":\"record_key\",\"type\":\"string\"}," + + "{\"name\":\"partition_path\",\"type\":[\"null\",\"string\"],\"default\":null}," + + "{\"name\":\"ordering_val\",\"type\":[\"null\",\"bytes\"],\"default\":null}" + + "]" + + "}"); + + private final HoodieReaderContext readerContext; + private final HoodieStorage storage; + private final InputSplit inputSplit; + private final HoodieSchema readerSchema; + private final List orderingFieldNames; + private final boolean includeBaseFile; + private final BufferedRecordMerger bufferedRecordMerger; + private final UpdateProcessor updateProcessor; + private final LoserTree readers; + private BufferedRecord nextRecord; + + public LsmFileGroupRecordIterator(HoodieReaderContext readerContext, + HoodieStorage storage, + InputSplit inputSplit, + List orderingFieldNames, + HoodieTableMetaClient metaClient, + TypedProperties props, + ReaderParameters readerParameters, + HoodieReadStats readStats, + Option> fileGroupUpdateCallback) throws IOException { + this(readerContext, storage, inputSplit, orderingFieldNames, metaClient, props, readerParameters, readStats, fileGroupUpdateCallback, true); + } + + public LsmFileGroupRecordIterator(HoodieReaderContext readerContext, + HoodieStorage storage, + InputSplit inputSplit, + List orderingFieldNames, + HoodieTableMetaClient metaClient, + TypedProperties props, + ReaderParameters readerParameters, + HoodieReadStats readStats, + Option> fileGroupUpdateCallback, + boolean includeBaseFile) throws IOException { + this.readerContext = readerContext; + this.storage = storage; + this.inputSplit = inputSplit; + this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema(); + this.orderingFieldNames = orderingFieldNames; + this.includeBaseFile = includeBaseFile; + this.bufferedRecordMerger = BufferedRecordMergerFactory.create( + readerContext, readerContext.getMergeMode(), false, readerContext.getRecordMerger(), + readerSchema, readerContext.getPayloadClasses(props), props, metaClient.getTableConfig().getPartialUpdateMode()); + this.updateProcessor = UpdateProcessor.create(readStats, readerContext, readerParameters.isEmitDeletes(), fileGroupUpdateCallback, props); + this.readers = new LoserTree<>(initializeReaders()); + } + + private List> initializeReaders() throws IOException { + List> readerStates = new ArrayList<>(); + int mergeOrder = 0; + if (includeBaseFile && inputSplit.getBaseFileOption().isPresent()) { + addReader(readerStates, mergeOrder++, createBaseFileIterator(inputSplit.getBaseFileOption().get())); + } + for (HoodieLogFile logFile : inputSplit.getLogFiles()) { + addReader(readerStates, mergeOrder++, createFileIterator(logFile.getPathInfo(), logFile.getPath(), logFile.getFileSize())); + } + return readerStates; + } + + private void addReader(List> readerStates, int mergeOrder, ClosableIterator> iterator) { + ReaderState readerState = new ReaderState<>(mergeOrder, iterator); + if (readerState.advance()) { + readerStates.add(readerState); + } else { + readerState.close(); + } + } + + private ClosableIterator> createBaseFileIterator(HoodieBaseFile baseFile) throws IOException { + BaseFile file = baseFile.getBootstrapBaseFile().orElse(baseFile); + return createFileIterator(file.getPathInfo(), file.getStoragePath(), file.getFileSize()); + } + + private ClosableIterator> createFileIterator(StoragePathInfo pathInfo, + StoragePath path, + long fileSize) throws IOException { + StoragePath storagePath = pathInfo != null ? pathInfo.getPath() : path; + if (FSUtils.isNativeDeleteLogFile(storagePath.getName())) { + return createNativeDeleteLogIterator(pathInfo, storagePath, fileSize); + } + Pair> requiredSchemaAndRenamedColumns = + readerContext.getSchemaHandler().getRequiredSchemaForFileAndRenamedColumns(storagePath); + HoodieSchema fileRequiredSchema = requiredSchemaAndRenamedColumns.getLeft(); + ClosableIterator recordIterator; + if (pathInfo != null) { + recordIterator = readerContext.getFileRecordIterator( + pathInfo, 0, pathInfo.getLength(), readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage); + } else { + long length = fileSize >= 0 ? fileSize : storage.getPathInfo(storagePath).getLength(); + recordIterator = readerContext.getFileRecordIterator( + storagePath, 0, length, readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage); + } + if (!areSchemasProjectionEquivalent(fileRequiredSchema, readerSchema) || !requiredSchemaAndRenamedColumns.getRight().isEmpty()) { + UnaryOperator projector = readerContext.getRecordContext() + .projectRecord(fileRequiredSchema, readerSchema, requiredSchemaAndRenamedColumns.getRight()); + recordIterator = new CloseableMappingIterator<>(recordIterator, projector); + } + if (readerContext.getInstantRange().isPresent()) { + recordIterator = readerContext.applyInstantRangeFilter(recordIterator); + } + return new CloseableMappingIterator<>(recordIterator, record -> BufferedRecords.fromEngineRecord( + readerContext.getRecordContext().seal(record), + readerSchema, + readerContext.getRecordContext(), + orderingFieldNames, + readerContext.getRecordContext().isDeleteRecord(record, readerContext.getSchemaHandler().getDeleteContext().withReaderSchema(readerSchema)))); + } + + private ClosableIterator> createNativeDeleteLogIterator(StoragePathInfo pathInfo, + StoragePath storagePath, + long fileSize) throws IOException { + ClosableIterator recordIterator; + if (pathInfo != null) { + recordIterator = readerContext.getFileRecordIterator( + pathInfo, 0, pathInfo.getLength(), DELETE_LOG_SCHEMA, DELETE_LOG_SCHEMA, storage); + } else { + long length = fileSize >= 0 ? fileSize : storage.getPathInfo(storagePath).getLength(); + recordIterator = readerContext.getFileRecordIterator( + storagePath, 0, length, DELETE_LOG_SCHEMA, DELETE_LOG_SCHEMA, storage); + } + return new CloseableMappingIterator<>(recordIterator, record -> { + Object recordKey = readerContext.getRecordContext().getValue(record, DELETE_LOG_SCHEMA, DELETE_LOG_RECORD_KEY_FIELD); + return BufferedRecords.createDelete(recordKey.toString(), OrderingValues.getDefault()); + }); + } + + @Override + public boolean hasNext() { + if (nextRecord != null) { + return true; + } + while (!readers.isEmpty()) { + BufferedRecord mergedRecord = nextMergedRecord(); + nextRecord = updateProcessor.processUpdate( + mergedRecord.getRecordKey(), null, mergedRecord, mergedRecord.isDelete()); + if (nextRecord != null) { + return true; + } + } + return false; + } + + private BufferedRecord nextMergedRecord() { + BufferedRecord firstRecord = readers.peekWinner(); + String recordKey = firstRecord.getRecordKey(); + BufferedRecord mergedRecord = null; + while (!readers.isEmpty() && recordKey.equals(readers.peekWinner().getRecordKey())) { + mergedRecord = merge(mergedRecord, readers.popWinner()); + } + return mergedRecord; + } + + private BufferedRecord merge(BufferedRecord existingRecord, BufferedRecord newRecord) { + if (existingRecord == null) { + return newRecord; + } + try { + return bufferedRecordMerger.deltaMerge(newRecord, existingRecord).orElse(existingRecord); + } catch (IOException e) { + throw new HoodieIOException("Failed to merge LSM records for key " + newRecord.getRecordKey(), e); + } + } + + @Override + public BufferedRecord next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + BufferedRecord record = nextRecord; + nextRecord = null; + return record; + } + + @Override + public void close() { + readers.close(); + } + + private enum State { + WINNER_WITH_NEW_KEY, + WINNER_WITH_SAME_KEY, + WINNER_POPPED, + LOSER_WITH_NEW_KEY, + LOSER_WITH_SAME_KEY, + LOSER_POPPED + } + + /** + * Loser-tree state machine for k-way merging. Each leaf keeps one active record from + * one sorted input stream; {@code tree[0]} stores the current champion and internal + * nodes store the loser from the corresponding tournament match. + */ + private static class LoserTree { + private final List> leaves; + private final int leafBase; + private final int[] tree; + private final int[] winners; + + private LoserTree(List> leaves) { + this.leaves = leaves; + this.leafBase = nextPowerOfTwo(Math.max(1, leaves.size())); + this.tree = new int[leafBase]; + this.winners = new int[leafBase << 1]; + Arrays.fill(tree, -1); + Arrays.fill(winners, -1); + build(); + } + + private void build() { + for (int i = 0; i < leaves.size(); i++) { + winners[leafBase + i] = leaves.get(i).current == null ? -1 : i; + } + if (leafBase == 1) { + tree[0] = winners[leafBase]; + } else { + for (int node = leafBase - 1; node > 0; node--) { + replay(node); + } + } + setChampionState(null); + } + + private boolean isEmpty() { + return tree[0] < 0; + } + + private BufferedRecord peekWinner() { + int winnerIndex = tree[0]; + return winnerIndex < 0 ? null : leaves.get(winnerIndex).current; + } + + private BufferedRecord popWinner() { + int winnerIndex = tree[0]; + ReaderState winner = leaves.get(winnerIndex); + BufferedRecord record = winner.current; + String recordKey = record.getRecordKey(); + winner.state = State.WINNER_POPPED; + winner.firstSameKeyIndex = -1; + if (!winner.advance()) { + winner.state = State.LOSER_POPPED; + winner.close(); + } + update(winnerIndex, recordKey); + return record; + } + + private void update(int leafIndex, String poppedKey) { + winners[leafBase + leafIndex] = leaves.get(leafIndex).current == null ? -1 : leafIndex; + if (leafBase == 1) { + tree[0] = winners[leafBase]; + setChampionState(poppedKey); + return; + } + int node = (leafBase + leafIndex) >> 1; + while (node > 0) { + replay(node); + node >>= 1; + } + setChampionState(poppedKey); + } + + private void replay(int node) { + int left = winners[node << 1]; + int right = winners[(node << 1) + 1]; + if (left < 0 && right < 0) { + winners[node] = -1; + tree[node] = -1; + } else if (left < 0) { + winners[node] = right; + tree[node] = -1; + } else if (right < 0) { + winners[node] = left; + tree[node] = -1; + } else { + int compareResult = compare(left, right); + if (compareResult <= 0) { + winners[node] = left; + tree[node] = right; + markLoser(right, left, compareResult); + } else { + winners[node] = right; + tree[node] = left; + markLoser(left, right, compareResult); + } + } + if (node == 1) { + tree[0] = winners[node]; + } + } + + private int compare(int leftIndex, int rightIndex) { + ReaderState left = leaves.get(leftIndex); + ReaderState right = leaves.get(rightIndex); + int keyCompare = left.current.getRecordKey().compareTo(right.current.getRecordKey()); + if (keyCompare != 0) { + return keyCompare; + } + // Process older sources first so the regular merger sees later sources last. + // This preserves HoodieFileGroupReader tie semantics when ordering values are equal: + // base < older log instant/version < newer log instant/version. + return Integer.compare(left.mergeOrder, right.mergeOrder); + } + + private void markLoser(int loserIndex, int winnerIndex, int compareResult) { + ReaderState loser = leaves.get(loserIndex); + boolean sameKey = leaves.get(loserIndex).current.getRecordKey().equals(leaves.get(winnerIndex).current.getRecordKey()); + loser.state = sameKey ? State.LOSER_WITH_SAME_KEY : State.LOSER_WITH_NEW_KEY; + loser.firstSameKeyIndex = sameKey ? winnerIndex : -1; + } + + private void setChampionState(String poppedKey) { + int championIndex = tree[0]; + if (championIndex < 0) { + return; + } + ReaderState champion = leaves.get(championIndex); + champion.state = poppedKey != null && poppedKey.equals(champion.current.getRecordKey()) + ? State.WINNER_WITH_SAME_KEY + : State.WINNER_WITH_NEW_KEY; + champion.firstSameKeyIndex = findFirstSameKeyIndex(championIndex); + } + + private int findFirstSameKeyIndex(int championIndex) { + String championKey = leaves.get(championIndex).current.getRecordKey(); + int firstSameKeyIndex = -1; + for (int loserIndex : tree) { + if (loserIndex >= 0 && loserIndex != championIndex + && leaves.get(loserIndex).current != null + && championKey.equals(leaves.get(loserIndex).current.getRecordKey()) + && (firstSameKeyIndex < 0 || leaves.get(loserIndex).mergeOrder < leaves.get(firstSameKeyIndex).mergeOrder)) { + firstSameKeyIndex = loserIndex; + } + } + return firstSameKeyIndex; + } + + private void close() { + leaves.forEach(ReaderState::close); + } + + private static int nextPowerOfTwo(int value) { + int result = 1; + while (result < value) { + result <<= 1; + } + return result; + } + } + + private static class ReaderState { + private final int mergeOrder; + private final ClosableIterator> iterator; + private BufferedRecord current; + private State state = State.WINNER_WITH_NEW_KEY; + private int firstSameKeyIndex = -1; + private boolean closed; + + private ReaderState(int mergeOrder, ClosableIterator> iterator) { + this.mergeOrder = mergeOrder; + this.iterator = iterator; + } + + private boolean advance() { + if (iterator.hasNext()) { + current = iterator.next(); + return true; + } + current = null; + return false; + } + + private void close() { + if (!closed) { + iterator.close(); + closed = true; + } + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index 9c4debe193659..991784f811881 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -70,7 +70,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.function.Function; import java.util.function.Predicate; -import java.util.regex.Matcher; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -508,6 +507,8 @@ private Stream convertFileStatusesToBaseFiles(List convertFileStatusesToLogFiles(List rtFilePredicate = pathInfo -> { String fileName = pathInfo.getPath().getName(); - Matcher matcher = FSUtils.LOG_FILE_PATTERN.matcher(fileName); - return matcher.matches() && fileName.contains(logFileExtension); + return FSUtils.isLogFile(pathInfo.getPath()) && fileName.contains(logFileExtension); }; return pathInfoList.stream().filter(rtFilePredicate).map(HoodieLogFile::new); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java index 616801c7fb75c..7c338e3bd33b8 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java @@ -18,12 +18,15 @@ package org.apache.hudi.common.model; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.StoragePathInfo; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieLogFile { private final String pathStr = "file:///tmp/hoodie/2021/01/01/.136281f3-c24e-423b-a65a-95dbfbddce1d_100.log.2_1-0-1"; @@ -77,6 +80,48 @@ void createFromStringWithSuffix() { assertFileGetters(pathWithSuffix, null, hoodieLogFile, -1, suffix); } + @Test + void createFromNativeParquetLogFile() { + String nativeLogPathStr = "file:///tmp/hoodie/2021/01/01/" + + "136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_20250409161256974_2.parquet"; + StoragePath nativeLogPath = new StoragePath(nativeLogPathStr); + HoodieLogFile hoodieLogFile = new HoodieLogFile(nativeLogPath); + + assertTrue(FSUtils.isLogFile(nativeLogPath)); + assertFalse(FSUtils.isBaseFile(nativeLogPath)); + assertEquals(fileId, hoodieLogFile.getFileId()); + assertEquals("20250409161256974", hoodieLogFile.getDeltaCommitTime()); + assertEquals(2, hoodieLogFile.getLogVersion()); + assertEquals("1-0-1", hoodieLogFile.getLogWriteToken()); + assertEquals("parquet", hoodieLogFile.getFileExtension()); + assertEquals("", hoodieLogFile.getSuffix()); + assertEquals("20250409161256974", FSUtils.getCommitTime(nativeLogPath.getName())); + assertEquals("20250409161256974", FSUtils.getDeltaCommitTimeFromLogPath(nativeLogPath)); + assertEquals(2, FSUtils.getFileVersionFromLog(nativeLogPath)); + assertEquals("1-0-1", FSUtils.getWriteTokenFromLogPath(nativeLogPath)); + assertEquals(1, FSUtils.getTaskPartitionIdFromLogPath(nativeLogPath)); + assertEquals(0, FSUtils.getStageIdFromLogPath(nativeLogPath)); + assertEquals(1, FSUtils.getTaskAttemptIdFromLogPath(nativeLogPath)); + } + + @Test + void createFromNativeDeleteParquetLogFile() { + String nativeDeleteLogPathStr = "file:///tmp/hoodie/2021/01/01/" + + "136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_20250409161256974_3.delete.parquet"; + StoragePath nativeDeleteLogPath = new StoragePath(nativeDeleteLogPathStr); + HoodieLogFile hoodieLogFile = new HoodieLogFile(nativeDeleteLogPath); + + assertTrue(FSUtils.isLogFile(nativeDeleteLogPath)); + assertTrue(FSUtils.isNativeDeleteLogFile(nativeDeleteLogPath.getName())); + assertFalse(FSUtils.isBaseFile(nativeDeleteLogPath)); + assertEquals(fileId, hoodieLogFile.getFileId()); + assertEquals("20250409161256974", hoodieLogFile.getDeltaCommitTime()); + assertEquals(3, hoodieLogFile.getLogVersion()); + assertEquals("1-0-1", hoodieLogFile.getLogWriteToken()); + assertEquals("parquet", hoodieLogFile.getFileExtension()); + assertEquals(".delete", hoodieLogFile.getSuffix()); + } + private void assertFileGetters(StoragePathInfo pathInfo, HoodieLogFile hoodieLogFile, long fileLength) { assertFileGetters(pathStr, pathInfo, hoodieLogFile, fileLength, ""); From 8eb99948994e047d630a6c790cb93ce64c552576 Mon Sep 17 00:00:00 2001 From: danny0405 Date: Fri, 12 Jun 2026 13:34:43 +0800 Subject: [PATCH 2/2] add iterator spilling --- .../common/config/HoodieReaderConfig.java | 7 + .../read/lsm/LsmFileGroupRecordIterator.java | 19 ++- .../read/lsm/SpillableLsmRecordIterator.java | 157 ++++++++++++++++++ .../lsm/TestSpillableLsmRecordIterator.java | 72 ++++++++ 4 files changed, 254 insertions(+), 1 deletion(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/SpillableLsmRecordIterator.java create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/table/read/lsm/TestSpillableLsmRecordIterator.java diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java index 9cbab8f4468cd..577863d488567 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java @@ -59,6 +59,13 @@ public class HoodieReaderConfig extends HoodieConfig { .sinceVersion("1.0.0") .withDocumentation("Whether to use positions in the block header for data blocks containing updates and delete blocks for merging."); + public static final ConfigProperty LSM_SORT_MERGE_SPILL_THRESHOLD = ConfigProperty + .key("hoodie.lsm.sort.merge.spill.threshold") + .defaultValue(16) + .markAdvanced() + .withDocumentation("Maximum number of sorted LSM input files to keep as direct readers during sort merge. " + + "When the fan-in is larger, remaining inputs are drained to sequential local spill files and read back during merge."); + public static final String REALTIME_SKIP_MERGE = "skip_merge"; public static final String REALTIME_PAYLOAD_COMBINE = "payload_combine"; public static final ConfigProperty MERGE_TYPE = ConfigProperty diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java index c9ebceafdf8ce..2c7eae300718d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java @@ -54,7 +54,10 @@ import java.util.NoSuchElementException; import java.util.function.UnaryOperator; +import static org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH; +import static org.apache.hudi.common.config.HoodieReaderConfig.LSM_SORT_MERGE_SPILL_THRESHOLD; import static org.apache.hudi.common.schema.HoodieSchemaCompatibility.areSchemasProjectionEquivalent; +import static org.apache.hudi.io.util.FileIOUtils.getDefaultSpillableMapBasePath; /** * Streaming sorted-merge reader for LSM file groups whose delta files are parquet files. @@ -86,6 +89,8 @@ public class LsmFileGroupRecordIterator implements ClosableIterator bufferedRecordMerger; private final UpdateProcessor updateProcessor; private final LoserTree readers; + private final int spillThreshold; + private final String spillBasePath; private BufferedRecord nextRecord; public LsmFileGroupRecordIterator(HoodieReaderContext readerContext, @@ -120,6 +125,8 @@ public LsmFileGroupRecordIterator(HoodieReaderContext readerContext, readerContext, readerContext.getMergeMode(), false, readerContext.getRecordMerger(), readerSchema, readerContext.getPayloadClasses(props), props, metaClient.getTableConfig().getPartialUpdateMode()); this.updateProcessor = UpdateProcessor.create(readStats, readerContext, readerParameters.isEmitDeletes(), fileGroupUpdateCallback, props); + this.spillThreshold = props.getInteger(LSM_SORT_MERGE_SPILL_THRESHOLD.key(), LSM_SORT_MERGE_SPILL_THRESHOLD.defaultValue()); + this.spillBasePath = props.getString(SPILLABLE_MAP_BASE_PATH.key(), getDefaultSpillableMapBasePath()); this.readers = new LoserTree<>(initializeReaders()); } @@ -130,11 +137,21 @@ private List> initializeReaders() throws IOException { addReader(readerStates, mergeOrder++, createBaseFileIterator(inputSplit.getBaseFileOption().get())); } for (HoodieLogFile logFile : inputSplit.getLogFiles()) { - addReader(readerStates, mergeOrder++, createFileIterator(logFile.getPathInfo(), logFile.getPath(), logFile.getFileSize())); + ClosableIterator> iterator = createFileIterator(logFile.getPathInfo(), logFile.getPath(), logFile.getFileSize()); + addReader(readerStates, mergeOrder, maybeSpillIterator(mergeOrder, iterator)); + mergeOrder++; } return readerStates; } + private ClosableIterator> maybeSpillIterator(int mergeOrder, + ClosableIterator> iterator) { + if (mergeOrder < spillThreshold) { + return iterator; + } + return new SpillableLsmRecordIterator<>(iterator, readerContext.getRecordSerializer(), readerContext.getRecordContext(), spillBasePath); + } + private void addReader(List> readerStates, int mergeOrder, ClosableIterator> iterator) { ReaderState readerState = new ReaderState<>(mergeOrder, iterator); if (readerState.advance()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/SpillableLsmRecordIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/SpillableLsmRecordIterator.java new file mode 100644 index 0000000000000..c8c29641b9ab8 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/SpillableLsmRecordIterator.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read.lsm; + +import org.apache.hudi.common.engine.RecordContext; +import org.apache.hudi.common.serialization.CustomSerializer; +import org.apache.hudi.common.table.read.BufferedRecord; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.exception.HoodieIOException; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +/** + * Sequential disk-backed iterator for sorted LSM inputs. + * + *

The source iterator is drained into a length-prefixed spill file and closed. The resulting + * iterator reads the records back sequentially, which matches the loser-tree access pattern. + */ +class SpillableLsmRecordIterator implements ClosableIterator> { + + private static final int BUFFER_SIZE = 128 * 1024; + private static final String SPILL_FILE_PREFIX = "hudi-lsm-"; + private static final String SPILL_FILE_SUFFIX = ".spill"; + + private final CustomSerializer> serializer; + private final RecordContext recordContext; + private final File spillFile; + private final long recordCount; + private DataInputStream inputStream; + private long recordsRead; + private BufferedRecord nextRecord; + private boolean closed; + + SpillableLsmRecordIterator(ClosableIterator> sourceIterator, + CustomSerializer> serializer, + RecordContext recordContext, + String spillBasePath) { + this.serializer = serializer; + this.recordContext = recordContext; + try { + Path spillDirectory = Paths.get(spillBasePath); + Files.createDirectories(spillDirectory); + this.spillFile = Files.createTempFile(spillDirectory, SPILL_FILE_PREFIX, SPILL_FILE_SUFFIX).toFile(); + this.spillFile.deleteOnExit(); + this.recordCount = spill(sourceIterator); + } catch (IOException e) { + throw new HoodieIOException("Failed to spill LSM input iterator", e); + } finally { + sourceIterator.close(); + } + } + + private long spill(ClosableIterator> sourceIterator) throws IOException { + long count = 0; + try (DataOutputStream outputStream = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(spillFile), BUFFER_SIZE))) { + while (sourceIterator.hasNext()) { + byte[] bytes = serializer.serialize(sourceIterator.next().toBinary(recordContext)); + outputStream.writeInt(bytes.length); + outputStream.write(bytes); + count++; + } + } catch (IOException | RuntimeException e) { + deleteSpillFile(); + throw e; + } + return count; + } + + @Override + public boolean hasNext() { + if (nextRecord != null) { + return true; + } + if (recordsRead >= recordCount) { + return false; + } + try { + ensureInputStream(); + int length = inputStream.readInt(); + byte[] bytes = new byte[length]; + inputStream.readFully(bytes); + nextRecord = serializer.deserialize(bytes); + recordsRead++; + return true; + } catch (IOException e) { + throw new HoodieIOException("Failed to read spilled LSM input iterator", e); + } + } + + @Override + public BufferedRecord next() { + if (!hasNext()) { + throw new java.util.NoSuchElementException(); + } + BufferedRecord record = nextRecord; + nextRecord = null; + return record; + } + + @Override + public void close() { + if (closed) { + return; + } + try { + if (inputStream != null) { + inputStream.close(); + } + } catch (IOException e) { + throw new HoodieIOException("Failed to close spilled LSM input iterator", e); + } finally { + deleteSpillFile(); + closed = true; + } + } + + private void ensureInputStream() throws IOException { + if (inputStream == null) { + inputStream = new DataInputStream(new BufferedInputStream(new FileInputStream(spillFile), BUFFER_SIZE)); + } + } + + private void deleteSpillFile() { + try { + Files.deleteIfExists(spillFile.toPath()); + } catch (IOException e) { + throw new HoodieIOException("Failed to delete spilled LSM input file " + spillFile, e); + } + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/lsm/TestSpillableLsmRecordIterator.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/lsm/TestSpillableLsmRecordIterator.java new file mode 100644 index 0000000000000..c5f0221f71dd8 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/lsm/TestSpillableLsmRecordIterator.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read.lsm; + +import org.apache.hudi.common.serialization.DefaultSerializer; +import org.apache.hudi.common.table.read.BufferedRecord; +import org.apache.hudi.common.util.collection.ClosableIterator; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TestSpillableLsmRecordIterator { + + @TempDir + Path tempDir; + + @Test + void testSpillAndReadBackSequentially() throws IOException { + List> records = Arrays.asList( + new BufferedRecord<>("key1", 1, null, null, null), + new BufferedRecord<>("key2", 2, null, null, null), + new BufferedRecord<>("key3", 3, null, null, null)); + + SpillableLsmRecordIterator iterator = new SpillableLsmRecordIterator<>( + ClosableIterator.wrap(records.iterator()), new DefaultSerializer<>(), null, tempDir.toString()); + + assertEquals(1, spillFileCount()); + assertTrue(iterator.hasNext()); + assertTrue(iterator.hasNext()); + assertEquals(records.get(0), iterator.next()); + assertEquals(records.get(1), iterator.next()); + assertEquals(records.get(2), iterator.next()); + assertFalse(iterator.hasNext()); + + iterator.close(); + assertEquals(0, spillFileCount()); + } + + private long spillFileCount() throws IOException { + try (Stream paths = Files.list(tempDir)) { + return paths.count(); + } + } +}