Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ public class HoodieReaderConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Whether to use positions in the block header for data blocks containing updates and delete blocks for merging.");

public static final ConfigProperty<Integer> LSM_SORT_MERGE_SPILL_THRESHOLD = ConfigProperty
.key("hoodie.lsm.sort.merge.spill.threshold")
.defaultValue(16)
.markAdvanced()
.withDocumentation("Maximum number of sorted LSM input files to keep as direct readers during sort merge. "
+ "When the fan-in is larger, remaining inputs are drained to sequential local spill files and read back during merge.");

public static final String REALTIME_SKIP_MERGE = "skip_merge";
public static final String REALTIME_PAYLOAD_COMBINE = "payload_combine";
public static final ConfigProperty<String> MERGE_TYPE = ConfigProperty
Expand Down
82 changes: 72 additions & 10 deletions hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.exception.InvalidHoodieFileNameException;
import org.apache.hudi.exception.InvalidHoodiePathException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.HoodieStorage;
Expand Down Expand Up @@ -79,6 +78,8 @@ public class FSUtils {
public static final String PATH_SEPARATOR = "/";
public static final Pattern LOG_FILE_PATTERN =
Pattern.compile("^\\.([^._]+)_([^.]*)\\.(log|archive)\\.(\\d+)(_((\\d+)-(\\d+)-(\\d+))(\\.cdc)?)?$");
public static final Pattern NATIVE_LOG_FILE_PATTERN =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The pattern uses [^_]+ for the file id, but Hudi file ids elsewhere in BASE_FILE_PATTERN allow [a-zA-Z0-9-]+ and LOG_FILE_PATTERN allows [^._]+. Are we sure native log file ids will never contain . or other special characters? Tightening this to [a-zA-Z0-9-]+ (or [^._]+ for consistency with the existing log pattern) would avoid accidental matches against unrelated .parquet paths.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Pattern.compile("^([^_]+)_((\\d+)-(\\d+)-(\\d+))_([^_]+)_(\\d+)(\\.delete)?\\.(parquet)$");
public static final Pattern PREFIX_BY_FILE_ID_PATTERN = Pattern.compile("^(.+)-(\\d+)");
private static final Pattern BASE_FILE_PATTERN = Pattern.compile("[a-zA-Z0-9-]+_[a-zA-Z0-9-]+_[0-9]+\\.[a-zA-Z0-9]+");

Expand Down Expand Up @@ -131,6 +132,10 @@ public static String maskWithoutFileId(String instantTime, int taskPartitionId)

public static String getCommitTime(String fullFileName) {
try {
Option<Matcher> nativeLogMatcher = matchNativeLogFile(fullFileName);
if (nativeLogMatcher.isPresent()) {
return nativeLogMatcher.get().group(6);
}
if (isLogFile(fullFileName)) {
return fullFileName.split("_")[1].split("\\.", 2)[0];
}
Expand Down Expand Up @@ -328,6 +333,10 @@ public static StoragePath getAbsoluteFilePath(StoragePath basePath, String parti
* Get the file extension from the log file.
*/
public static String getFileExtensionFromLog(StoragePath logPath) {
Option<Matcher> nativeLogMatcher = matchNativeLogFile(logPath.getName());
if (nativeLogMatcher.isPresent()) {
return nativeLogMatcher.get().group(9);
}
Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName());
if (!matcher.matches()) {
throw new InvalidHoodiePathException(logPath.toString(), "LogFile");
Expand All @@ -336,22 +345,19 @@ public static String getFileExtensionFromLog(StoragePath logPath) {
}

public static String getFileIdFromFileName(String fileName) {
if (FSUtils.isLogFile(fileName)) {
Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
if (!matcher.matches()) {
throw new InvalidHoodieFileNameException(fileName, "LogFile");
}
return matcher.group(1);
Option<Matcher> logFileMatcher = matchLogFile(fileName);
if (logFileMatcher.isPresent()) {
return logFileMatcher.get().group(1);
}
return FSUtils.getFileId(fileName);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Switching to matchLogFile here looks like a regression for archive files — the original LOG_FILE_PATTERN.matcher(...) matched both log|archive, but matchLogFile only returns matches when group(3).equals("log"). The existing testArchiveLogFileName test asserts getFileIdFromLogPath(archive_path) returns the file id, which would now throw InvalidHoodiePathException. Could you preserve the archive-supporting behavior, or use LOG_FILE_PATTERN directly here?

- AI-generated; verify before applying. React 👍/👎 to flag quality.


public static String getFileIdFromLogPath(StoragePath path) {
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if (!matcher.matches()) {
Option<Matcher> logFileMatcher = matchLogFile(path.getName());
if (!logFileMatcher.isPresent()) {
throw new InvalidHoodiePathException(path, "LogFile");
}
return matcher.group(1);
return logFileMatcher.get().group(1);
}

public static String getFileIdFromFilePath(StoragePath filePath) {
Expand All @@ -365,6 +371,10 @@ public static String getFileIdFromFilePath(StoragePath filePath) {
* Get the second part of the file name in the log file. That will be the delta commit time.
*/
public static String getDeltaCommitTimeFromLogPath(StoragePath path) {
Option<Matcher> nativeLogMatcher = matchNativeLogFile(path.getName());
if (nativeLogMatcher.isPresent()) {
return nativeLogMatcher.get().group(6);
}
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if (!matcher.matches()) {
throw new InvalidHoodiePathException(path.toString(), "LogFile");
Expand All @@ -376,6 +386,10 @@ public static String getDeltaCommitTimeFromLogPath(StoragePath path) {
* Get TaskPartitionId used in log-path.
*/
public static Integer getTaskPartitionIdFromLogPath(StoragePath path) {
Option<Matcher> nativeLogMatcher = matchNativeLogFile(path.getName());
if (nativeLogMatcher.isPresent()) {
return Integer.parseInt(nativeLogMatcher.get().group(3));
}
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if (!matcher.matches()) {
throw new InvalidHoodiePathException(path.toString(), "LogFile");
Expand All @@ -388,6 +402,10 @@ public static Integer getTaskPartitionIdFromLogPath(StoragePath path) {
* Get Write-Token used in log-path.
*/
public static String getWriteTokenFromLogPath(StoragePath path) {
Option<Matcher> nativeLogMatcher = matchNativeLogFile(path.getName());
if (nativeLogMatcher.isPresent()) {
return nativeLogMatcher.get().group(2);
}
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if (!matcher.matches()) {
throw new InvalidHoodiePathException(path.toString(), "LogFile");
Expand All @@ -399,6 +417,10 @@ public static String getWriteTokenFromLogPath(StoragePath path) {
* Get StageId used in log-path.
*/
public static Integer getStageIdFromLogPath(StoragePath path) {
Option<Matcher> nativeLogMatcher = matchNativeLogFile(path.getName());
if (nativeLogMatcher.isPresent()) {
return Integer.parseInt(nativeLogMatcher.get().group(4));
}
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if (!matcher.matches()) {
throw new InvalidHoodiePathException(path.toString(), "LogFile");
Expand All @@ -411,6 +433,10 @@ public static Integer getStageIdFromLogPath(StoragePath path) {
* Get Task Attempt Id used in log-path.
*/
public static Integer getTaskAttemptIdFromLogPath(StoragePath path) {
Option<Matcher> nativeLogMatcher = matchNativeLogFile(path.getName());
if (nativeLogMatcher.isPresent()) {
return Integer.parseInt(nativeLogMatcher.get().group(5));
}
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
if (!matcher.matches()) {
throw new InvalidHoodiePathException(path.toString(), "LogFile");
Expand All @@ -427,6 +453,10 @@ public static int getFileVersionFromLog(StoragePath logPath) {
}

public static int getFileVersionFromLog(String logFileName) {
Option<Matcher> nativeLogMatcher = matchNativeLogFile(logFileName);
if (nativeLogMatcher.isPresent()) {
return Integer.parseInt(nativeLogMatcher.get().group(7));
}
Matcher matcher = LOG_FILE_PATTERN.matcher(logFileName);
if (!matcher.matches()) {
throw new HoodieIOException("Invalid log file name: " + logFileName);
Expand All @@ -443,6 +473,9 @@ public static String makeLogFileName(String fileId, String logFileExtension, Str
}

public static boolean isBaseFile(StoragePath path) {
if (matchNativeLogFile(path.getName()).isPresent()) {
return false;
}
String extension = getFileExtension(path.getName());
if (HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(extension)) {
return BASE_FILE_PATTERN.matcher(path.getName()).matches();
Expand All @@ -466,13 +499,42 @@ public static boolean isLogFile(StoragePath logPath) {
}

public static boolean isLogFile(String fileName) {
if (matchNativeLogFile(fileName).isPresent()) {
return true;
}
if (fileName.startsWith(LOG_FILE_START_WITH_CHARACTER)) {
Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
return matcher.matches() && matcher.group(3).equals(LOG_FILE_EXTENSION);
}
return false;
}

public static Option<Matcher> matchNativeLogFile(String fileName) {
if (StringUtils.isNullOrEmpty(fileName)) {
return Option.empty();
}
String actualFileName = fileName.contains(StoragePath.SEPARATOR)
? fileName.substring(fileName.lastIndexOf(StoragePath.SEPARATOR) + 1)
: fileName;
Matcher matcher = NATIVE_LOG_FILE_PATTERN.matcher(actualFileName);
return matcher.matches() ? Option.of(matcher) : Option.empty();
}

public static boolean isNativeDeleteLogFile(String fileName) {
return matchNativeLogFile(fileName).map(matcher -> matcher.group(8) != null).orElse(false);
}

private static Option<Matcher> matchLogFile(String fileName) {
Option<Matcher> nativeLogMatcher = matchNativeLogFile(fileName);
if (nativeLogMatcher.isPresent()) {
return nativeLogMatcher;
}
Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
return matcher.matches() && matcher.group(3).equals(LOG_FILE_EXTENSION)
? Option.of(matcher)
: Option.empty();
}

public static boolean isDataFile(StoragePath path) {
return isBaseFile(path) || isLogFile(path);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.InvalidHoodiePathException;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
Expand Down Expand Up @@ -109,6 +110,17 @@ private HoodieLogFile(StoragePathInfo pathInfo, StoragePath logPath, String logP
}

private void parseFieldsFromPath() {
Option<Matcher> nativeLogMatcherOpt = FSUtils.matchNativeLogFile(getPath().getName());
if (nativeLogMatcherOpt.isPresent()) {
Matcher matcher = nativeLogMatcherOpt.get();
this.fileId = matcher.group(1);
this.deltaCommitTime = matcher.group(6);
this.fileExtension = matcher.group(9);
this.logVersion = Integer.parseInt(matcher.group(7));
this.logWriteToken = matcher.group(2);
this.suffix = matcher.group(8) == null ? "" : matcher.group(8);
return;
}
Matcher matcher = LOG_FILE_PATTERN.matcher(getPath().getName());
if (!matcher.matches()) {
throw new InvalidHoodiePathException(path, "LogFile");
Expand Down
Loading
Loading