Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,15 @@ public Option<InstantRange> getInstantRange() {
return instantRangeOpt;
}

/**
* Set the {@link InstantRange} filter. Used to bound the instants (base records and log blocks)
* that participate in the merge, e.g. an incremental query reads only up to its window end so that
* log blocks committed after the window are not merged in.
*/
public void setInstantRange(Option<InstantRange> instantRange) {
this.instantRangeOpt = instantRange;
}

/**
* Apply the {@link InstantRange} filter to the file record iterator.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,23 @@
package org.apache.hudi

import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.log.InstantRange.RangeType
import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.timeline.TimelineUtils.{concatTimeline, getCommitMetadata}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.view.{FileSystemViewManager, HoodieTableFileSystemView}
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits
import org.apache.hudi.metadata.HoodieTableMetadataUtil.getWritePartitionPaths
import org.apache.hudi.storage.StoragePathInfo

import org.apache.hadoop.fs.GlobPattern
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -104,15 +107,12 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext,
listLatestFileSlices(partitionFilters, dataFilters)
} else {
val latestCommit = includedCommits.last.requestedTime

val fsView = new HoodieTableFileSystemView(
metaClient, timeline, affectedFilesInCommits)

val modifiedPartitions = getWritePartitionPaths(commitsMetadata)

modifiedPartitions.asScala.flatMap { relativePartitionPath =>
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala
}.toSeq
val modifiedPartitions = getWritePartitionPaths(commitsMetadata).asScala.toSeq
if (hasMissingAffectedFiles) {
legacyAffectedFileSlices(modifiedPartitions, latestCommit)
} else {
collectIncrementalFileSlices(modifiedPartitions, latestCommit)
}
}

buildSplits(filterFileSlices(fileSlices, globPattern))
Expand All @@ -127,14 +127,14 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext,
listLatestFileSlices(partitionFilters, dataFilters)
} else {
val latestCommit = includedCommits.last.requestedTime
val fsView = new HoodieTableFileSystemView(metaClient, timeline, affectedFilesInCommits)
val modifiedPartitions = getWritePartitionPaths(commitsMetadata)

fileIndex.listMatchingPartitionPaths(HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters))
val partitionPaths = fileIndex.listMatchingPartitionPaths(HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters))
.map(p => p.getPath).filter(p => modifiedPartitions.contains(p))
.flatMap { relativePartitionPath =>
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala
}
if (hasMissingAffectedFiles) {
legacyAffectedFileSlices(partitionPaths, latestCommit)
} else {
collectIncrementalFileSlices(partitionPaths, latestCommit)
}
}
filterFileSlices(fileSlices, globPattern)
}
Expand All @@ -158,6 +158,76 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext,

override def shouldIncludeLogFiles(): Boolean = fullTableScan

/**
* Collects the incremental file slices for the given modified partitions.
*
* Each returned file slice carries its base file plus all of its log files so the
* [[org.apache.hudi.common.table.read.HoodieFileGroupReader]] can perform a correct runtime merge.
* Resolving the current value of a record changed in the incremental window requires the full file
* slice, not just the files written within the window:
* - EVENT_TIME_ORDERING: a record written in the window may carry a lower ordering value than the
* version already in the base/earlier-log, so the existing version wins; a window-only view
* cannot determine the winner.
* - Partial updates: a window log block holds only the changed columns, so the unchanged columns
* must be filled in from the base file.
* (HUDI #18943.)
*
* The view is built from the (modified) partition listing (which includes the latest base file of
* each file group, honoring the metadata table when enabled) and scoped back to the file groups
* actually touched in the window (see [[affectedFileGroupIds]]) so untouched file groups are not
* read. The commit-time record filter applied during the scan still restricts the returned records
* to the incremental window.
*
* The view timeline is bounded to instants at or before `latestCommit` (the window's last commit)
* so that base/log files written by later commits are not visible. Without this bound a record
* updated again after the window would be merged with those later log files and its merged commit
* time would fall outside the window, dropping the in-window change from the result.
*/

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This affectedFileGroupIds filter is the only thing scoping the metadata-aware partition view back to the touched file groups, but no test covers a modified partition that holds an untouched sibling file group. testPartialUpdateIncrementalQueryPartitioned writes one updated record per partition, so with MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT=0 each partition is a single file group and this filter is a no-op - removing it would fail no test. Suggest a case that lands multiple file groups in one partition and updates only one, asserting the untouched sibling is excluded. Output stays correct via the post-merge commit-time filter either way, so this guards the scoping/perf path.

private def collectIncrementalFileSlices(partitionPaths: Seq[String], latestCommit: String): Seq[FileSlice] = {
val engineContext = new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext))
val fsView = FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(
engineContext, metaClient, fileIndex.metadataConfig, timeline.findInstantsBeforeOrEquals(latestCommit))
try {
partitionPaths.flatMap { relativePartitionPath =>
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: fs conventionally means FileSystem/storage throughout Hudi — using it for a FileSlice lambda here could trip up a reader. Could you rename it to slice or fileSlice?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

.filter(fs => affectedFileGroupIds.contains(fs.getFileId))
}
} finally {
fsView.close()
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Just to double-check the scope: the PR description says only the V2 listing changed, but HoodieFileGroupReaderBasedFileFormat is the read path for both V1 and V2. V1's listing still builds the view from affectedFilesInCommits only (no pre-window base file), so the runtime merge sees only log records and the partial-update / EVENT_TIME_ORDERING bugs remain on V1. Is that intentional (V1 = table-version < 8, deprecated path), or should V1 listing get the same treatment in a follow-up?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed V2-only and intentional: the commit message scopes the listing fix to table version 8+. V1 (MergeOnReadIncrementalRelationV1.scala:114-115 and :136) still builds the window-only HoodieTableFileSystemView(metaClient, timeline, affectedFilesInCommits), so a MOR incremental read on a table-version < 8 table retains the partial-update / event-time gap. If table-version < 8 incremental MOR is still supported, this is worth a tracked follow-up rather than leaving it implicit.


/**
* Builds the incremental file slices directly from the files recorded in the window's commits
* (`affectedFilesInCommits`), as the read path did before HUDI #18943.
*
* This is used only when [[hasMissingAffectedFiles]] is true and the full-table-scan fallback is
* disabled: some files referenced by the window have been removed (e.g. by cleaning), so there is
* no correct incremental result to produce. Listing from the recorded files (which include the
* missing paths) preserves the prior fail-early contract - the scan surfaces a file-not-found
* error pointing the user at `hoodie.datasource.read.incr.fallback.fulltablescan.enable` - instead
* of silently returning an empty/partial result from a fresh listing that no longer sees those
* files.
*/
private def legacyAffectedFileSlices(partitionPaths: Seq[String], latestCommit: String): Seq[FileSlice] = {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

legacyAffectedFileSlices and collectIncrementalFileSlices share the same partitionPaths.flatMap { p => view.getLatestMergedFileSlicesBeforeOrOn(p, latestCommit).iterator().asScala [.filter(...)] } body wrapped in try/finally view.close(), differing only in how the view is built and whether the file-group filter applies. Consider a private helper taking the view plus a FileSlice => Boolean predicate that owns the try/finally close, so the two call sites cannot drift on the close() contract.

val fsView = new HoodieTableFileSystemView(metaClient, timeline, affectedFilesInCommits)
try {
partitionPaths.flatMap { relativePartitionPath =>
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala
}
} finally {
fsView.close()
}
}

/**
* File group ids touched within the incremental window. Used to scope the partition-listing based
* view (which surfaces every file group in a modified partition) back to only the changed file
* groups, so we do not read and discard untouched file groups.
*/
private lazy val affectedFileGroupIds: Set[String] =
affectedFilesInCommits.asScala.map(f => FSUtils.getFileIdFromFilePath(f.getPath)).toSet

private def filterFileSlices(fileSlices: Seq[FileSlice], pathGlobPattern: String): Seq[FileSlice] = {
val filteredFileSlices = if (!StringUtils.isNullOrEmpty(pathGlobPattern)) {
val globMatcher = new GlobPattern("*" + pathGlobPattern)
Expand Down Expand Up @@ -186,6 +256,12 @@ trait HoodieIncrementalRelationV2Trait extends HoodieBaseRelation {

protected def startInstantArchived: Boolean = !queryContext.getArchivedInstants.isEmpty

// Some files referenced by the commits in the incremental window no longer exist on storage
// (e.g. they were removed by cleaning). Such a window cannot be served from the incremental
// file listing alone.
protected lazy val hasMissingAffectedFiles: Boolean =
affectedFilesInCommits.asScala.exists(fileStatus => !metaClient.getStorage.exists(fileStatus.getPath))

// Fallback to full table scan if any of the following conditions matches:
// 1. the start commit is archived
// 2. the end commit is archived
Expand All @@ -194,8 +270,7 @@ trait HoodieIncrementalRelationV2Trait extends HoodieBaseRelation {
val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.key,
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.defaultValue).toBoolean

fallbackToFullTableScan && (startInstantArchived
|| affectedFilesInCommits.asScala.exists(fileStatus => !metaClient.getStorage.exists(fileStatus.getPath)))
fallbackToFullTableScan && (startInstantArchived || hasMissingAffectedFiles)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 With fallbackToFullTableScan=false, hasMissingAffectedFiles is now also evaluated in collectFileSplits/listFileSplits to choose between the new and legacy paths, which triggers an exists() per affected file at plan time even when before it was short-circuited. Could be slow for wide windows on object stores (one head request per file). Lazy val avoids repeat eval, so probably fine, but worth confirming this matches the intent.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

}

protected val rangeType: RangeType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.config.{HoodieMemoryConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.schema.HoodieSchemaUtils
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, ParquetTableSchemaResolver}
import org.apache.hudi.common.table.log.InstantRange
import org.apache.hudi.common.table.read.HoodieFileGroupReader
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.common.util.collection.ClosableIterator
import org.apache.hudi.common.util.collection.{ClosableIterator, CloseableFilterIterator}
import org.apache.hudi.data.CloseableIteratorListener
import org.apache.hudi.exception.HoodieNotSupportedException
import org.apache.hudi.internal.schema.InternalSchema
Expand All @@ -54,12 +55,13 @@ import org.apache.spark.sql.execution.datasources.orc.OrcUtils
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
import org.apache.spark.sql.hudi.MultipleColumnarFileFormatReader
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.{Filter, In}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchUtils}
import org.apache.spark.util.SerializableConfiguration

import java.io.Closeable
import java.util.function.{Predicate => JPredicate}

import scala.collection.JavaConverters.mapAsJavaMapConverter

Expand Down Expand Up @@ -301,9 +303,38 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String,
.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file))
fileSliceMapping.getSlice(fileGroupName) match {
case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty || fileSlice.getLogFiles.findAny().isPresent) =>
// For incremental queries on a MOR file group that requires base/log merging, the
// commit-time span filter (`requiredFilters`) must NOT be pushed down into the file
// reads. Pushing it down drops the base-file row of a record whose latest base/log
// version is outside the incremental window, before the runtime merge runs. That breaks
// correctness whenever the merged result depends on data outside the window, e.g.
// event-time ordering (the existing higher-ordering version should win) and partial
// updates (unchanged columns must come from the base file) (HUDI #18943). Instead we
// read and merge the full file group, then filter the merged rows by commit time via
// `postMergeCommitTimeFilter`. We only switch to post-merge filtering when the filter
// can be built (commit-time field present); otherwise we keep the original pushdown.
val needsMerge = fileSlice.getLogFiles.findAny().isPresent
val postMergeCommitTimeFilter: Option[JPredicate[InternalRow]] =
if (isIncremental && needsMerge) {
buildCommitTimeRowFilter(requiredFilters, requestedStructType)
} else {
None
}
val readFilters = if (postMergeCommitTimeFilter.isDefined) Seq.empty[Filter] else requiredFilters
val readerContext = new SparkFileFormatInternalRowReaderContext(
fileGroupBaseFileReader.value, filters, requiredFilters, storageConf, metaClient.getTableConfig,
fileGroupBaseFileReader.value, filters, readFilters, storageConf, metaClient.getTableConfig,
sparkRequiredSchema = Some(requiredSchema))
// Bound the merge inputs to the incremental window end: read only base records and log
// blocks committed at or before the window's last commit, so a record updated again
// after the window is not merged with those later log blocks (HUDI #18943). The
// post-merge commit-time filter then selects the records changed within the window.
if (postMergeCommitTimeFilter.isDefined) {
incrementalWindowEnd(requiredFilters).foreach { windowEnd =>
readerContext.setInstantRange(HOption.of(InstantRange.builder()
.rangeType(InstantRange.RangeType.CLOSED_CLOSED).nullableBoundary(true)
.endInstant(windowEnd).build()))
}
}
readerContext.enableLogicalTimestampFieldRepair(storageConf.getBoolean(ENABLE_LOGICAL_TIMESTAMP_REPAIR, true))
val props = metaClient.getTableConfig.getProps
options.foreach(kv => props.setProperty(kv._1, kv._2))
Expand All @@ -328,9 +359,13 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String,
.withLength(baseFileLength)
.withShouldUseRecordPosition(shouldUseRecordPosition)
.build()
val mergedIter: ClosableIterator[InternalRow] = postMergeCommitTimeFilter match {
case Some(predicate) => new CloseableFilterIterator[InternalRow](reader.getClosableIterator, predicate)
case None => reader.getClosableIterator
}
// Append partition values to rows and project to output schema
appendPartitionAndProject(
reader.getClosableIterator,
mergedIter,
requestedStructType,
remainingPartitionSchema,
outputSchema,
Expand All @@ -353,6 +388,48 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String,
}
}

/**
* The incremental window end (latest commit time in the window), extracted from the `In` filter on
* the commit-time metadata field in `requiredFilters`. Used to bound the merge inputs so that log
* blocks committed after the window are not merged. Returns None when no such filter is present.
*/
private def incrementalWindowEnd(requiredFilters: Seq[Filter]): Option[String] = {
val commitTimes: Seq[String] = requiredFilters.collect {
case In(attr, values) if attr == HoodieRecord.COMMIT_TIME_METADATA_FIELD =>
values.filter(_ != null).map(_.toString).toSeq
}.flatten
if (commitTimes.isEmpty) None else Some(commitTimes.max)
}

/**
* Builds a predicate over merged rows that keeps only the records whose `_hoodie_commit_time`
* falls within the incremental query window encoded by `requiredFilters` (an `In` filter on the
* commit-time metadata field, produced by the incremental relations). Returns None when no such
* filter can be built (e.g. the commit-time field is not part of the read schema), in which case
* the caller keeps the original read-time filter pushdown.
*/
private def buildCommitTimeRowFilter(requiredFilters: Seq[Filter],
readSchema: StructType): Option[JPredicate[InternalRow]] = {
if (requiredFilters.isEmpty) {
None
} else {
readSchema.getFieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD).flatMap { idx =>
val allowedCommitTimes: Set[String] = requiredFilters.collect {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the case In(attr, values) if attr == HoodieRecord.COMMIT_TIME_METADATA_FIELD => values.filter(_ != null).map(_.toString) pattern is extracted verbatim in both incrementalWindowEnd (line 397) and here. Have you considered pulling it into a small private helper like commitTimesFromFilters(filters: Seq[Filter]): Set[String] to keep the two callers in sync if the filter shape ever changes?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

case In(attr, values) if attr == HoodieRecord.COMMIT_TIME_METADATA_FIELD =>
values.filter(_ != null).map(_.toString).toSet
}.flatten.toSet
if (allowedCommitTimes.isEmpty) {
None
} else {
Some(new JPredicate[InternalRow] {
override def test(row: InternalRow): Boolean =
!row.isNullAt(idx) && allowedCommitTimes.contains(row.getUTF8String(idx).toString)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This row.getUTF8String(idx).toString allocates a Java String for every merged row in the file group. Would it be worth pre-converting allowedCommitTimes to Set[UTF8String] once and comparing UTF8String directly to skip the per-row allocation? Could matter on wide incremental windows.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

})
}
}
}
}

private def buildBaseFileReader(spark: SparkSession,
options: Map[String, String],
configuration: Configuration,
Expand Down
Loading
Loading