From a2d617868b2e3c868fcd183f8bdc3ec001d68ee2 Mon Sep 17 00:00:00 2001 From: Aditya Goenka Date: Thu, 11 Jun 2026 23:45:36 +0530 Subject: [PATCH 1/2] fix(spark): runtime-merge full file groups for MOR incremental queries (#18943) Incremental queries on a MOR table could return incorrect rows because the read resolved each changed record only from the files written within the incremental window, without a correct runtime merge against the rest of the file group: - EVENT_TIME_ORDERING (no partial updates): a record written in the window may carry a lower ordering value than the version already in the base/earlier log, so the existing version should win. A window-only view cannot determine the winner and could surface the losing write. - Partial updates: a window log block holds only the changed columns, so the unchanged columns must be filled in from the base file; otherwise the row comes back partial/garbled. SQL MERGE INTO on MOR writes partial blocks by default (hoodie.spark.sql.merge.into.partial.updates), so this hits the common case. Snapshot and read-optimized queries were already correct; the bug was isolated to the incremental file listing plus commit-time filter pushdown. Fix: for each file group touched in the incremental window, read its base file plus its log files (the full file slice) and run the standard file-group reader merge, then filter the merged output to the incremental window by commit time. Unconditional - no new config and no write-path change. - MergeOnReadIncrementalRelationV2: build the incremental file system view from the (modified) partition listing (metadata-table-aware) so each slice carries its base file, scoped back to the file groups actually touched in the window; bound the view timeline to the window end and close the view after use. - HoodieFileGroupReaderBasedFileFormat: for incremental merging file groups, do not push the commit-time span filter into the file reads; apply it on the merged output instead. Also set an InstantRange bounded to the window end on the reader context so base records and log blocks committed after the window are not merged in (a record updated again after the window must be returned with its value as of the window end, not its latest value). - HoodieReaderContext: add setInstantRange so the read path can bound the merge inputs to the query window. Scope: only the V2 relation listing changed (table version 8+); the format-level gate covers both V1 and V2 reads. Tests in TestPartialUpdateForMergeInto cover partial-update incremental (commit/event-time ordering, avro/parquet, partitioned), non-partial event-time ordering, window bounds, insert+update in one window, multiple partial updates to one key, exclusion of later commits, post-compaction merge, and COW non-regression. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../common/engine/HoodieReaderContext.java | 9 + .../MergeOnReadIncrementalRelationV2.scala | 69 ++- ...HoodieFileGroupReaderBasedFileFormat.scala | 87 ++- .../TestPartialUpdateForMergeInto.scala | 535 ++++++++++++++++++ 4 files changed, 680 insertions(+), 20 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java index 14e412ffcb184..5f40f802821b9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java @@ -278,6 +278,15 @@ public Option getInstantRange() { return instantRangeOpt; } + /** + * Set the {@link InstantRange} filter. Used to bound the instants (base records and log blocks) + * that participate in the merge, e.g. an incremental query reads only up to its window end so that + * log blocks committed after the window are not merged in. + */ + public void setInstantRange(Option instantRange) { + this.instantRangeOpt = instantRange; + } + /** * Apply the {@link InstantRange} filter to the file record iterator. * diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala index aea594d9157ca..32ac9ddff32e9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala @@ -18,13 +18,15 @@ package org.apache.hudi import org.apache.hudi.HoodieConversionUtils.toScalaOption +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{FileSlice, HoodieRecord} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.log.InstantRange.RangeType import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.table.timeline.TimelineUtils.{concatTimeline, getCommitMetadata} -import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.common.table.view.{FileSystemViewManager, HoodieTableFileSystemView} import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits @@ -32,6 +34,7 @@ import org.apache.hudi.metadata.HoodieTableMetadataUtil.getWritePartitionPaths import org.apache.hudi.storage.StoragePathInfo import org.apache.hadoop.fs.GlobPattern +import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow @@ -104,15 +107,8 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext, listLatestFileSlices(partitionFilters, dataFilters) } else { val latestCommit = includedCommits.last.requestedTime - - val fsView = new HoodieTableFileSystemView( - metaClient, timeline, affectedFilesInCommits) - val modifiedPartitions = getWritePartitionPaths(commitsMetadata) - - modifiedPartitions.asScala.flatMap { relativePartitionPath => - fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala - }.toSeq + collectIncrementalFileSlices(modifiedPartitions.asScala.toSeq, latestCommit) } buildSplits(filterFileSlices(fileSlices, globPattern)) @@ -127,14 +123,10 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext, listLatestFileSlices(partitionFilters, dataFilters) } else { val latestCommit = includedCommits.last.requestedTime - val fsView = new HoodieTableFileSystemView(metaClient, timeline, affectedFilesInCommits) val modifiedPartitions = getWritePartitionPaths(commitsMetadata) - - fileIndex.listMatchingPartitionPaths(HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters)) + val partitionPaths = fileIndex.listMatchingPartitionPaths(HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters)) .map(p => p.getPath).filter(p => modifiedPartitions.contains(p)) - .flatMap { relativePartitionPath => - fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala - } + collectIncrementalFileSlices(partitionPaths, latestCommit) } filterFileSlices(fileSlices, globPattern) } @@ -158,6 +150,53 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext, override def shouldIncludeLogFiles(): Boolean = fullTableScan + /** + * Collects the incremental file slices for the given modified partitions. + * + * Each returned file slice carries its base file plus all of its log files so the + * [[org.apache.hudi.common.table.read.HoodieFileGroupReader]] can perform a correct runtime merge. + * Resolving the current value of a record changed in the incremental window requires the full file + * slice, not just the files written within the window: + * - EVENT_TIME_ORDERING: a record written in the window may carry a lower ordering value than the + * version already in the base/earlier-log, so the existing version wins; a window-only view + * cannot determine the winner. + * - Partial updates: a window log block holds only the changed columns, so the unchanged columns + * must be filled in from the base file. + * (HUDI #18943.) + * + * The view is built from the (modified) partition listing (which includes the latest base file of + * each file group, honoring the metadata table when enabled) and scoped back to the file groups + * actually touched in the window (see [[affectedFileGroupIds]]) so untouched file groups are not + * read. The commit-time record filter applied during the scan still restricts the returned records + * to the incremental window. + * + * The view timeline is bounded to instants at or before `latestCommit` (the window's last commit) + * so that base/log files written by later commits are not visible. Without this bound a record + * updated again after the window would be merged with those later log files and its merged commit + * time would fall outside the window, dropping the in-window change from the result. + */ + private def collectIncrementalFileSlices(partitionPaths: Seq[String], latestCommit: String): Seq[FileSlice] = { + val engineContext = new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)) + val fsView = FileSystemViewManager.createInMemoryFileSystemViewWithTimeline( + engineContext, metaClient, fileIndex.metadataConfig, timeline.findInstantsBeforeOrEquals(latestCommit)) + try { + partitionPaths.flatMap { relativePartitionPath => + fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala + .filter(fs => affectedFileGroupIds.contains(fs.getFileId)) + } + } finally { + fsView.close() + } + } + + /** + * File group ids touched within the incremental window. Used to scope the partition-listing based + * view (which surfaces every file group in a modified partition) back to only the changed file + * groups, so we do not read and discard untouched file groups. + */ + private lazy val affectedFileGroupIds: Set[String] = + affectedFilesInCommits.asScala.map(f => FSUtils.getFileIdFromFilePath(f.getPath)).toSet + private def filterFileSlices(fileSlices: Seq[FileSlice], pathGlobPattern: String): Seq[FileSlice] = { val filteredFileSlices = if (!StringUtils.isNullOrEmpty(pathGlobPattern)) { val globMatcher = new GlobPattern("*" + pathGlobPattern) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index 3fe8c6ff62f71..7ed6eb0effc78 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -23,13 +23,14 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.client.utils.SparkInternalSchemaConverter import org.apache.hudi.common.config.{HoodieMemoryConfig, TypedProperties} import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.HoodieFileFormat +import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.schema.HoodieSchemaUtils import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, ParquetTableSchemaResolver} +import org.apache.hudi.common.table.log.InstantRange import org.apache.hudi.common.table.read.HoodieFileGroupReader import org.apache.hudi.common.util.{Option => HOption} -import org.apache.hudi.common.util.collection.ClosableIterator +import org.apache.hudi.common.util.collection.{ClosableIterator, CloseableFilterIterator} import org.apache.hudi.data.CloseableIteratorListener import org.apache.hudi.exception.HoodieNotSupportedException import org.apache.hudi.internal.schema.InternalSchema @@ -54,12 +55,13 @@ import org.apache.spark.sql.execution.datasources.orc.OrcUtils import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector} import org.apache.spark.sql.hudi.MultipleColumnarFileFormatReader import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.{Filter, In} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchUtils} import org.apache.spark.util.SerializableConfiguration import java.io.Closeable +import java.util.function.{Predicate => JPredicate} import scala.collection.JavaConverters.mapAsJavaMapConverter @@ -301,9 +303,38 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, .getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)) fileSliceMapping.getSlice(fileGroupName) match { case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty || fileSlice.getLogFiles.findAny().isPresent) => + // For incremental queries on a MOR file group that requires base/log merging, the + // commit-time span filter (`requiredFilters`) must NOT be pushed down into the file + // reads. Pushing it down drops the base-file row of a record whose latest base/log + // version is outside the incremental window, before the runtime merge runs. That breaks + // correctness whenever the merged result depends on data outside the window, e.g. + // event-time ordering (the existing higher-ordering version should win) and partial + // updates (unchanged columns must come from the base file) (HUDI #18943). Instead we + // read and merge the full file group, then filter the merged rows by commit time via + // `postMergeCommitTimeFilter`. We only switch to post-merge filtering when the filter + // can be built (commit-time field present); otherwise we keep the original pushdown. + val needsMerge = fileSlice.getLogFiles.findAny().isPresent + val postMergeCommitTimeFilter: Option[JPredicate[InternalRow]] = + if (isIncremental && needsMerge) { + buildCommitTimeRowFilter(requiredFilters, requestedStructType) + } else { + None + } + val readFilters = if (postMergeCommitTimeFilter.isDefined) Seq.empty[Filter] else requiredFilters val readerContext = new SparkFileFormatInternalRowReaderContext( - fileGroupBaseFileReader.value, filters, requiredFilters, storageConf, metaClient.getTableConfig, + fileGroupBaseFileReader.value, filters, readFilters, storageConf, metaClient.getTableConfig, sparkRequiredSchema = Some(requiredSchema)) + // Bound the merge inputs to the incremental window end: read only base records and log + // blocks committed at or before the window's last commit, so a record updated again + // after the window is not merged with those later log blocks (HUDI #18943). The + // post-merge commit-time filter then selects the records changed within the window. + if (postMergeCommitTimeFilter.isDefined) { + incrementalWindowEnd(requiredFilters).foreach { windowEnd => + readerContext.setInstantRange(HOption.of(InstantRange.builder() + .rangeType(InstantRange.RangeType.CLOSED_CLOSED).nullableBoundary(true) + .endInstant(windowEnd).build())) + } + } readerContext.enableLogicalTimestampFieldRepair(storageConf.getBoolean(ENABLE_LOGICAL_TIMESTAMP_REPAIR, true)) val props = metaClient.getTableConfig.getProps options.foreach(kv => props.setProperty(kv._1, kv._2)) @@ -328,9 +359,13 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, .withLength(baseFileLength) .withShouldUseRecordPosition(shouldUseRecordPosition) .build() + val mergedIter: ClosableIterator[InternalRow] = postMergeCommitTimeFilter match { + case Some(predicate) => new CloseableFilterIterator[InternalRow](reader.getClosableIterator, predicate) + case None => reader.getClosableIterator + } // Append partition values to rows and project to output schema appendPartitionAndProject( - reader.getClosableIterator, + mergedIter, requestedStructType, remainingPartitionSchema, outputSchema, @@ -353,6 +388,48 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, } } + /** + * The incremental window end (latest commit time in the window), extracted from the `In` filter on + * the commit-time metadata field in `requiredFilters`. Used to bound the merge inputs so that log + * blocks committed after the window are not merged. Returns None when no such filter is present. + */ + private def incrementalWindowEnd(requiredFilters: Seq[Filter]): Option[String] = { + val commitTimes: Seq[String] = requiredFilters.collect { + case In(attr, values) if attr == HoodieRecord.COMMIT_TIME_METADATA_FIELD => + values.filter(_ != null).map(_.toString).toSeq + }.flatten + if (commitTimes.isEmpty) None else Some(commitTimes.max) + } + + /** + * Builds a predicate over merged rows that keeps only the records whose `_hoodie_commit_time` + * falls within the incremental query window encoded by `requiredFilters` (an `In` filter on the + * commit-time metadata field, produced by the incremental relations). Returns None when no such + * filter can be built (e.g. the commit-time field is not part of the read schema), in which case + * the caller keeps the original read-time filter pushdown. + */ + private def buildCommitTimeRowFilter(requiredFilters: Seq[Filter], + readSchema: StructType): Option[JPredicate[InternalRow]] = { + if (requiredFilters.isEmpty) { + None + } else { + readSchema.getFieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD).flatMap { idx => + val allowedCommitTimes: Set[String] = requiredFilters.collect { + case In(attr, values) if attr == HoodieRecord.COMMIT_TIME_METADATA_FIELD => + values.filter(_ != null).map(_.toString).toSet + }.flatten.toSet + if (allowedCommitTimes.isEmpty) { + None + } else { + Some(new JPredicate[InternalRow] { + override def test(row: InternalRow): Boolean = + !row.isNullAt(idx) && allowedCommitTimes.contains(row.getUTF8String(idx).toString) + }) + } + } + } + } + private def buildBaseFileReader(spark: SparkSession, options: Map[String, String], configuration: Configuration, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala index 668f1d44fc32f..b7449ed81609a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala @@ -32,6 +32,8 @@ import org.apache.hudi.common.testutils.HoodieTestUtils import org.apache.hudi.common.util.CompactionUtils import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieNotSupportedException +import org.apache.hudi.storage.StoragePath +import org.apache.hudi.testutils.DataSourceTestUtils import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.getMetaClientAndFileSystemView @@ -68,6 +70,539 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { testPartialUpdate("mor", "parquet", commitTimeOrdering = true) } + test("Test partial update incremental query returns full row with MOR and Avro log format") { + testPartialUpdateIncrementalQuery("avro", commitTimeOrdering = true) + } + + test("Test partial update incremental query returns full row with MOR and Parquet log format") { + testPartialUpdateIncrementalQuery("parquet", commitTimeOrdering = true) + } + + test("Test partial update incremental query returns full row with MOR and event time ordering") { + testPartialUpdateIncrementalQuery("avro", commitTimeOrdering = false) + } + + test("Test partial update incremental query returns full row with MOR on a partitioned table") { + testPartialUpdateIncrementalQueryPartitioned("avro") + } + + test("Test incremental query reflects event time ordering on a MOR table (no partial updates)") { + testIncrementalEventTimeOrdering("avro") + } + + test("Test partial update incremental query bounds the window and excludes untouched records") { + testPartialUpdateIncrementalWindowBounds("avro") + } + + test("Test partial update incremental query returns inserts and updates in the same window") { + testPartialUpdateIncrementalInsertAndUpdate("avro") + } + + test("Test partial update incremental query merges multiple partial updates to the same key") { + testPartialUpdateIncrementalMultipleUpdatesSameKey("avro") + } + + test("Test partial update incremental query excludes log files committed after the window end") { + testPartialUpdateIncrementalExcludesLaterCommits("avro") + } + + test("Test partial update incremental query merges a log file on a compacted base file") { + testPartialUpdateIncrementalAfterCompaction("avro") + } + + test("Test incremental query on a COW table is unaffected by the MOR incremental change") { + testIncrementalCowUnaffected("avro") + } + + /** + * HUDI #18943: for a MOR table with partial updates, an incremental query must return ALL columns + * of a changed record (unchanged columns filled in from the base file), not only the columns + * altered by the partial update. + */ + def testPartialUpdateIncrementalQuery(logDataBlockFormat: String, commitTimeOrdering: Boolean): Unit = { + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + "/" + tableName + withSQLConf( + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0", + DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key() -> "true", + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key() -> logDataBlockFormat, + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> "true") { + val mergeMode = if (commitTimeOrdering) { + RecordMergeMode.COMMIT_TIME_ORDERING.name() + } else { + RecordMergeMode.EVENT_TIME_ORDERING.name() + } + val preCombineString = if (commitTimeOrdering) "" else "preCombineField = '_ts'," + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | _ts int, + | description string + |) using hudi + |tblproperties( + | type ='mor', + | primaryKey = 'id', + | $preCombineString + | recordMergeMode = '$mergeMode' + |) + |location '$basePath' + """.stripMargin) + + // Commit 1: full rows land in the base file. + spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')," + + "(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30, 1250, 'a3: desc3')") + val storage = HoodieTestUtils.getStorage(new StoragePath(basePath)) + val firstCompletionTime = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Commit 2: partial update changing only "price" and "_ts" -> written as a partial log block. + // The new "_ts" values are larger than the existing ones so the update wins under event time + // ordering as well. + spark.sql( + s""" + |merge into $tableName t0 + |using ( select 1 as id, 12.0 as price, 1001 as ts + |union select 3 as id, 25.0 as price, 1260 as ts) s0 + |on t0.id = s0.id + |when matched then update set price = s0.price, _ts = s0.ts + |""".stripMargin) + + // Snapshot query returns the merged full rows (unchanged columns preserved). + checkAnswer(s"select id, name, price, _ts, description from $tableName order by id")( + Seq(1, "a1", 12.0, 1001, "a1: desc1"), + Seq(2, "a2", 20.0, 1200, "a2: desc2"), + Seq(3, "a3", 25.0, 1260, "a3: desc3") + ) + + // Incremental query over just commit 2 must return the FULL updated rows, including the + // columns ("name", "description") not part of the partial update. Before the fix these come + // back null/garbled because the base-file row is dropped before the runtime merge. + checkAnswer(readIncremental(basePath, firstCompletionTime, "id", "name", "price", "_ts", "description"))( + Seq(1, "a1", 12.0, 1001, "a1: desc1"), + Seq(3, "a3", 25.0, 1260, "a3: desc3") + ) + } + } + } + + /** + * Same as [[testPartialUpdateIncrementalQuery]] but on a partitioned table where only some file + * groups in the modified partitions are touched. Exercises the file-group scoping in the + * incremental file listing. + */ + def testPartialUpdateIncrementalQueryPartitioned(logDataBlockFormat: String): Unit = { + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + "/" + tableName + withSQLConf( + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0", + DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key() -> "true", + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key() -> logDataBlockFormat, + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> "true") { + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | _ts int, + | description string, + | part string + |) using hudi + |partitioned by (part) + |tblproperties( + | type ='mor', + | primaryKey = 'id', + | recordMergeMode = '${RecordMergeMode.COMMIT_TIME_ORDERING.name()}' + |) + |location '$basePath' + """.stripMargin) + + spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1', 'p1')," + + "(2, 'a2', 20, 1200, 'a2: desc2', 'p1'), (3, 'a3', 30, 1250, 'a3: desc3', 'p2')") + val storage = HoodieTestUtils.getStorage(new StoragePath(basePath)) + val firstCompletionTime = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + spark.sql( + s""" + |merge into $tableName t0 + |using ( select 1 as id, 12.0 as price, 1001 as ts, 'p1' as part + |union select 3 as id, 25.0 as price, 1260 as ts, 'p2' as part) s0 + |on t0.id = s0.id + |when matched then update set price = s0.price, _ts = s0.ts + |""".stripMargin) + + checkAnswer(s"select id, name, price, _ts, description, part from $tableName order by id")( + Seq(1, "a1", 12.0, 1001, "a1: desc1", "p1"), + Seq(2, "a2", 20.0, 1200, "a2: desc2", "p1"), + Seq(3, "a3", 25.0, 1260, "a3: desc3", "p2") + ) + + checkAnswer(readIncremental(basePath, firstCompletionTime, "id", "name", "price", "_ts", "description", "part"))( + Seq(1, "a1", 12.0, 1001, "a1: desc1", "p1"), + Seq(3, "a3", 25.0, 1260, "a3: desc3", "p2") + ) + } + } + } + + /** + * HUDI #18943: even without partial updates, an incremental query on an EVENT_TIME_ORDERING MOR + * table must resolve the winning record version via a runtime merge against the base/earlier log, + * not just surface the value written within the window. A window write with a lower ordering value + * loses, so it must not appear in the incremental result; a window write with a higher ordering + * value wins and is returned. + */ + def testIncrementalEventTimeOrdering(logDataBlockFormat: String): Unit = { + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + "/" + tableName + withSQLConf( + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0", + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key() -> logDataBlockFormat, + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> "true") { + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | _ts int + |) using hudi + |tblproperties( + | type ='mor', + | primaryKey = 'id', + | preCombineField = '_ts', + | recordMergeMode = '${RecordMergeMode.EVENT_TIME_ORDERING.name()}' + |) + |location '$basePath' + """.stripMargin) + + // Commit 1: base file with high ordering values. + spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000), (2, 'a2', 20, 1000)") + val storage = HoodieTestUtils.getStorage(new StoragePath(basePath)) + val firstCompletionTime = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Commit 2: id 1 updated with a HIGHER _ts (wins); id 2 updated with a LOWER _ts (loses). + spark.sql( + s""" + |merge into $tableName t0 + |using ( select 1 as id, 'a1' as name, 99.0 as price, 2000 as ts + |union select 2 as id, 'a2' as name, 88.0 as price, 500 as ts) s0 + |on t0.id = s0.id + |when matched then update set price = s0.price, _ts = s0.ts + |""".stripMargin) + + // Snapshot: id 1 took the higher-ts update; id 2 kept the base value (lower-ts update lost). + checkAnswer(s"select id, name, price, _ts from $tableName order by id")( + Seq(1, "a1", 99.0, 2000), + Seq(2, "a2", 20.0, 1000) + ) + + // Incremental over commit 2: only id 1 actually changed its winning version; id 2's losing + // write must not surface (its merged commit time stays outside the window). + checkAnswer(readIncremental(basePath, firstCompletionTime, "id", "name", "price", "_ts"))( + Seq(1, "a1", 99.0, 2000) + ) + } + } + } + + /** + * Verifies the incremental window is honored across multiple commits: a sub-window returns only + * the records changed within it (fully merged), and a record never touched in the window does not + * appear. Commit-time ordering keeps the latest write winning. + */ + def testPartialUpdateIncrementalWindowBounds(logDataBlockFormat: String): Unit = { + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + "/" + tableName + withSQLConf( + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0", + DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key() -> "true", + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key() -> logDataBlockFormat, + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> "true") { + spark.sql( + s""" + |create table $tableName (id int, name string, price double, _ts int, description string) + |using hudi + |tblproperties(type ='mor', primaryKey = 'id', recordMergeMode = '${RecordMergeMode.COMMIT_TIME_ORDERING.name()}') + |location '$basePath' + """.stripMargin) + val storage = HoodieTestUtils.getStorage(new StoragePath(basePath)) + + // Commit 1: base file with three records. + spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')," + + "(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30, 1250, 'a3: desc3')") + val t1 = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Commit 2: partial update of id 1. + spark.sql(s"merge into $tableName t0 using (select 1 as id, 11.0 as price, 1001 as ts) s0 " + + "on t0.id = s0.id when matched then update set price = s0.price, _ts = s0.ts") + val t2 = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Commit 3: partial update of id 2. + spark.sql(s"merge into $tableName t0 using (select 2 as id, 22.0 as price, 1201 as ts) s0 " + + "on t0.id = s0.id when matched then update set price = s0.price, _ts = s0.ts") + + // Window (t1, t2] = commit 2 only -> just id 1 (full row); id 2 and id 3 excluded. + checkAnswer(readIncrementalBounded(basePath, t1, t2, "id", "name", "price", "_ts", "description"))( + Seq(1, "a1", 11.0, 1001, "a1: desc1") + ) + + // Window (t2, latest] = commit 3 only -> just id 2 (full row). + checkAnswer(readIncremental(basePath, t2, "id", "name", "price", "_ts", "description"))( + Seq(2, "a2", 22.0, 1201, "a2: desc2") + ) + + // Window (t1, latest] = commits 2 and 3 -> id 1 and id 2 (full rows); id 3 never touched. + checkAnswer(readIncremental(basePath, t1, "id", "name", "price", "_ts", "description"))( + Seq(1, "a1", 11.0, 1001, "a1: desc1"), + Seq(2, "a2", 22.0, 1201, "a2: desc2") + ) + } + } + } + + /** + * Verifies a single incremental window that contains both a partial update (existing key) and an + * insert (new key) returns both with all columns. + */ + def testPartialUpdateIncrementalInsertAndUpdate(logDataBlockFormat: String): Unit = { + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + "/" + tableName + withSQLConf( + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0", + DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key() -> "true", + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key() -> logDataBlockFormat, + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> "true") { + spark.sql( + s""" + |create table $tableName (id int, name string, price double, _ts int, description string) + |using hudi + |tblproperties(type ='mor', primaryKey = 'id', recordMergeMode = '${RecordMergeMode.COMMIT_TIME_ORDERING.name()}') + |location '$basePath' + """.stripMargin) + val storage = HoodieTestUtils.getStorage(new StoragePath(basePath)) + + spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1'), (2, 'a2', 20, 1200, 'a2: desc2')") + val t1 = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Commit 2: partial update of id 1 + insert of new id 3. + spark.sql( + s""" + |merge into $tableName t0 + |using ( select 1 as id, 'a1' as name, 12.0 as price, 1001 as _ts, 'a1: desc1' as description + |union select 3 as id, 'a3' as name, 30.0 as price, 1300 as _ts, 'a3: desc3' as description) s0 + |on t0.id = s0.id + |when matched then update set price = s0.price, _ts = s0._ts + |when not matched then insert * + |""".stripMargin) + + // Window over commit 2 -> id 1 (partial update, full row) and id 3 (insert, full row); id 2 excluded. + checkAnswer(readIncremental(basePath, t1, "id", "name", "price", "_ts", "description"))( + Seq(1, "a1", 12.0, 1001, "a1: desc1"), + Seq(3, "a3", 30.0, 1300, "a3: desc3") + ) + } + } + } + + /** + * Verifies multiple partial updates to the same key across commits merge into a single full row + * in the incremental result (later partial columns win, earlier untouched columns retained). + */ + def testPartialUpdateIncrementalMultipleUpdatesSameKey(logDataBlockFormat: String): Unit = { + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + "/" + tableName + withSQLConf( + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0", + DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key() -> "true", + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key() -> logDataBlockFormat, + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> "true") { + spark.sql( + s""" + |create table $tableName (id int, name string, price double, _ts int, description string) + |using hudi + |tblproperties(type ='mor', primaryKey = 'id', recordMergeMode = '${RecordMergeMode.COMMIT_TIME_ORDERING.name()}') + |location '$basePath' + """.stripMargin) + val storage = HoodieTestUtils.getStorage(new StoragePath(basePath)) + + spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')") + val t1 = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Commit 2: update "price". Commit 3: update "description". Both partial, same key. + spark.sql(s"merge into $tableName t0 using (select 1 as id, 12.0 as price, 1001 as ts) s0 " + + "on t0.id = s0.id when matched then update set price = s0.price, _ts = s0.ts") + spark.sql(s"merge into $tableName t0 using (select 1 as id, 'a1: desc1-v2' as descr, 1002 as ts) s0 " + + "on t0.id = s0.id when matched then update set description = s0.descr, _ts = s0.ts") + + // Window (t1, latest] -> a single fully-merged row: name from base, price from commit 2, + // description from commit 3, _ts the latest. + checkAnswer(readIncremental(basePath, t1, "id", "name", "price", "_ts", "description"))( + Seq(1, "a1", 12.0, 1002, "a1: desc1-v2") + ) + } + } + } + + /** + * Verifies that a bounded incremental query (with an END_COMMIT) does not merge in log files + * committed after the window end. A record updated inside the window and again afterwards must be + * returned with its value as of the window end, not its latest value (and must not be dropped). + */ + def testPartialUpdateIncrementalExcludesLaterCommits(logDataBlockFormat: String): Unit = { + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + "/" + tableName + withSQLConf( + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0", + DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key() -> "true", + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key() -> logDataBlockFormat, + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> "true") { + spark.sql( + s""" + |create table $tableName (id int, name string, price double, _ts int, description string) + |using hudi + |tblproperties(type ='mor', primaryKey = 'id', recordMergeMode = '${RecordMergeMode.COMMIT_TIME_ORDERING.name()}') + |location '$basePath' + """.stripMargin) + val storage = HoodieTestUtils.getStorage(new StoragePath(basePath)) + + spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')") + val t1 = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Commit 2 (inside the window): price -> 11. + spark.sql(s"merge into $tableName t0 using (select 1 as id, 11.0 as price, 1001 as ts) s0 " + + "on t0.id = s0.id when matched then update set price = s0.price, _ts = s0.ts") + val t2 = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Commit 3 (after the window): price -> 99. + spark.sql(s"merge into $tableName t0 using (select 1 as id, 99.0 as price, 9999 as ts) s0 " + + "on t0.id = s0.id when matched then update set price = s0.price, _ts = s0.ts") + + // Snapshot reflects the latest value. + checkAnswer(s"select id, name, price, _ts, description from $tableName")( + Seq(1, "a1", 99.0, 9999, "a1: desc1") + ) + + // Incremental over (t1, t2] must return id 1 as of commit 2 (price 11), not the later value + // and not an empty result. + checkAnswer(readIncrementalBounded(basePath, t1, t2, "id", "name", "price", "_ts", "description"))( + Seq(1, "a1", 11.0, 1001, "a1: desc1") + ) + } + } + } + + /** + * Verifies a partial update written as a log file on top of a compacted base file is merged + * correctly in an incremental query: the value carried into the compacted base file is retained + * and the later partial update is applied on top. + */ + def testPartialUpdateIncrementalAfterCompaction(logDataBlockFormat: String): Unit = { + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + "/" + tableName + withSQLConf( + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0", + DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key() -> "true", + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key() -> logDataBlockFormat, + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> "true", + HoodieCompactionConfig.INLINE_COMPACT.key() -> "true", + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2") { + spark.sql( + s""" + |create table $tableName (id int, name string, price double, _ts int, description string) + |using hudi + |tblproperties(type ='mor', primaryKey = 'id', recordMergeMode = '${RecordMergeMode.COMMIT_TIME_ORDERING.name()}') + |location '$basePath' + """.stripMargin) + val storage = HoodieTestUtils.getStorage(new StoragePath(basePath)) + + // Delta commit 1 (base) + delta commit 2 (partial update) -> triggers inline compaction, so + // the compacted base file now carries price = 11. + spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'd1')") + spark.sql(s"merge into $tableName t0 using (select 1 as id, 11.0 as price, 1001 as ts) s0 " + + "on t0.id = s0.id when matched then update set price = s0.price, _ts = s0.ts") + val afterCompaction = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // A further partial update lands as a log file on top of the compacted base file. + spark.sql(s"merge into $tableName t0 using (select 1 as id, 'd1v2' as descr, 1002 as ts) s0 " + + "on t0.id = s0.id when matched then update set description = s0.descr, _ts = s0.ts") + + // Incremental over the last commit merges the log file with the compacted base: price 11 + // (from the compacted base), description d1v2 (from the log), name retained. + checkAnswer(readIncremental(basePath, afterCompaction, "id", "name", "price", "_ts", "description"))( + Seq(1, "a1", 11.0, 1002, "d1v2") + ) + } + } + } + + /** + * Confirms the MOR incremental change does not regress COW incremental queries (COW file groups + * have no log files, so they keep the original read path). + */ + def testIncrementalCowUnaffected(logDataBlockFormat: String): Unit = { + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + "/" + tableName + withSQLConf( + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0", + DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key() -> "true", + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> "true") { + spark.sql( + s""" + |create table $tableName (id int, name string, price double, _ts int, description string) + |using hudi + |tblproperties(type ='cow', primaryKey = 'id', recordMergeMode = '${RecordMergeMode.COMMIT_TIME_ORDERING.name()}') + |location '$basePath' + """.stripMargin) + val storage = HoodieTestUtils.getStorage(new StoragePath(basePath)) + + spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'd1'), (2, 'a2', 20, 1200, 'd2')") + val t1 = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + spark.sql(s"merge into $tableName t0 using (select 1 as id, 12.0 as price, 1001 as ts) s0 " + + "on t0.id = s0.id when matched then update set price = s0.price, _ts = s0.ts") + + // Incremental over the update returns only id 1 with its full updated row; id 2 untouched. + checkAnswer(readIncremental(basePath, t1, "id", "name", "price", "_ts", "description"))( + Seq(1, "a1", 12.0, 1001, "d1") + ) + } + } + } + + private def readIncrementalBounded(basePath: String, startCompletionTime: String, endCompletionTime: String, + cols: String*): Array[org.apache.spark.sql.Row] = { + spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.START_COMMIT.key, startCompletionTime) + .option(DataSourceReadOptions.END_COMMIT.key, endCompletionTime) + .load(basePath) + .select(cols.head, cols.tail: _*) + .orderBy("id") + .collect() + } + + private def readIncremental(basePath: String, startCompletionTime: String, cols: String*): Array[org.apache.spark.sql.Row] = { + spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.START_COMMIT.key, startCompletionTime) + .load(basePath) + .select(cols.head, cols.tail: _*) + .orderBy("id") + .collect() + } + test("Test partial update and insert with COW and Avro log format") { testPartialUpdateWithInserts("cow", "avro") } From e54305f3df52198f45ef8a06b79fc8f137c0a622 Mon Sep 17 00:00:00 2001 From: Aditya Goenka Date: Mon, 15 Jun 2026 10:49:42 +0530 Subject: [PATCH 2/2] fix(spark): preserve fail-early on missing files for MOR incremental queries The incremental file listing introduced in #18943 builds a fresh, metadata-aware file-system view scoped to the touched file groups. This fixed runtime-merge correctness but silently dropped the fail-early contract verified by TestIncrementalReadWithFullTableScan: when the full-table-scan fallback is disabled and files referenced by the window have been removed (e.g. by cleaning), the query used to throw a read-time FileNotFoundException; with the fresh listing it instead returned an empty/partial result because that listing no longer sees the missing files. Restore the prior behavior: extract the missing-files check into hasMissingAffectedFiles (reused by fullTableScan), and when the fallback is off but files are missing, list slices directly from the recorded affected files (the pre-#18943 path) so the scan surfaces the file-not-found error pointing at hoodie.datasource.read.incr.fallback.fulltablescan.enable. The merge-correct listing is still used whenever the window's files are present, so the partial-update incremental tests are unaffected. Co-Authored-By: Claude Opus 4.8 --- .../MergeOnReadIncrementalRelationV2.scala | 46 +++++++++++++++++-- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala index 32ac9ddff32e9..7734c122c6522 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala @@ -107,8 +107,12 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext, listLatestFileSlices(partitionFilters, dataFilters) } else { val latestCommit = includedCommits.last.requestedTime - val modifiedPartitions = getWritePartitionPaths(commitsMetadata) - collectIncrementalFileSlices(modifiedPartitions.asScala.toSeq, latestCommit) + val modifiedPartitions = getWritePartitionPaths(commitsMetadata).asScala.toSeq + if (hasMissingAffectedFiles) { + legacyAffectedFileSlices(modifiedPartitions, latestCommit) + } else { + collectIncrementalFileSlices(modifiedPartitions, latestCommit) + } } buildSplits(filterFileSlices(fileSlices, globPattern)) @@ -126,7 +130,11 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext, val modifiedPartitions = getWritePartitionPaths(commitsMetadata) val partitionPaths = fileIndex.listMatchingPartitionPaths(HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters)) .map(p => p.getPath).filter(p => modifiedPartitions.contains(p)) - collectIncrementalFileSlices(partitionPaths, latestCommit) + if (hasMissingAffectedFiles) { + legacyAffectedFileSlices(partitionPaths, latestCommit) + } else { + collectIncrementalFileSlices(partitionPaths, latestCommit) + } } filterFileSlices(fileSlices, globPattern) } @@ -189,6 +197,29 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext, } } + /** + * Builds the incremental file slices directly from the files recorded in the window's commits + * (`affectedFilesInCommits`), as the read path did before HUDI #18943. + * + * This is used only when [[hasMissingAffectedFiles]] is true and the full-table-scan fallback is + * disabled: some files referenced by the window have been removed (e.g. by cleaning), so there is + * no correct incremental result to produce. Listing from the recorded files (which include the + * missing paths) preserves the prior fail-early contract - the scan surfaces a file-not-found + * error pointing the user at `hoodie.datasource.read.incr.fallback.fulltablescan.enable` - instead + * of silently returning an empty/partial result from a fresh listing that no longer sees those + * files. + */ + private def legacyAffectedFileSlices(partitionPaths: Seq[String], latestCommit: String): Seq[FileSlice] = { + val fsView = new HoodieTableFileSystemView(metaClient, timeline, affectedFilesInCommits) + try { + partitionPaths.flatMap { relativePartitionPath => + fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala + } + } finally { + fsView.close() + } + } + /** * File group ids touched within the incremental window. Used to scope the partition-listing based * view (which surfaces every file group in a modified partition) back to only the changed file @@ -225,6 +256,12 @@ trait HoodieIncrementalRelationV2Trait extends HoodieBaseRelation { protected def startInstantArchived: Boolean = !queryContext.getArchivedInstants.isEmpty + // Some files referenced by the commits in the incremental window no longer exist on storage + // (e.g. they were removed by cleaning). Such a window cannot be served from the incremental + // file listing alone. + protected lazy val hasMissingAffectedFiles: Boolean = + affectedFilesInCommits.asScala.exists(fileStatus => !metaClient.getStorage.exists(fileStatus.getPath)) + // Fallback to full table scan if any of the following conditions matches: // 1. the start commit is archived // 2. the end commit is archived @@ -233,8 +270,7 @@ trait HoodieIncrementalRelationV2Trait extends HoodieBaseRelation { val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.key, DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.defaultValue).toBoolean - fallbackToFullTableScan && (startInstantArchived - || affectedFilesInCommits.asScala.exists(fileStatus => !metaClient.getStorage.exists(fileStatus.getPath))) + fallbackToFullTableScan && (startInstantArchived || hasMissingAffectedFiles) } protected val rangeType: RangeType