diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java index 14e412ffcb184..5f40f802821b9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java @@ -278,6 +278,15 @@ public Option getInstantRange() { return instantRangeOpt; } + /** + * Set the {@link InstantRange} filter. Used to bound the instants (base records and log blocks) + * that participate in the merge, e.g. an incremental query reads only up to its window end so that + * log blocks committed after the window are not merged in. + */ + public void setInstantRange(Option instantRange) { + this.instantRangeOpt = instantRange; + } + /** * Apply the {@link InstantRange} filter to the file record iterator. * diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala index aea594d9157ca..7734c122c6522 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala @@ -18,13 +18,15 @@ package org.apache.hudi import org.apache.hudi.HoodieConversionUtils.toScalaOption +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{FileSlice, HoodieRecord} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.log.InstantRange.RangeType import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.table.timeline.TimelineUtils.{concatTimeline, getCommitMetadata} -import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.common.table.view.{FileSystemViewManager, HoodieTableFileSystemView} import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits @@ -32,6 +34,7 @@ import org.apache.hudi.metadata.HoodieTableMetadataUtil.getWritePartitionPaths import org.apache.hudi.storage.StoragePathInfo import org.apache.hadoop.fs.GlobPattern +import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow @@ -104,15 +107,12 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext, listLatestFileSlices(partitionFilters, dataFilters) } else { val latestCommit = includedCommits.last.requestedTime - - val fsView = new HoodieTableFileSystemView( - metaClient, timeline, affectedFilesInCommits) - - val modifiedPartitions = getWritePartitionPaths(commitsMetadata) - - modifiedPartitions.asScala.flatMap { relativePartitionPath => - fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala - }.toSeq + val modifiedPartitions = getWritePartitionPaths(commitsMetadata).asScala.toSeq + if (hasMissingAffectedFiles) { + legacyAffectedFileSlices(modifiedPartitions, latestCommit) + } else { + collectIncrementalFileSlices(modifiedPartitions, latestCommit) + } } buildSplits(filterFileSlices(fileSlices, globPattern)) @@ -127,14 +127,14 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext, listLatestFileSlices(partitionFilters, dataFilters) } else { val latestCommit = includedCommits.last.requestedTime - val fsView = new HoodieTableFileSystemView(metaClient, timeline, affectedFilesInCommits) val modifiedPartitions = getWritePartitionPaths(commitsMetadata) - - fileIndex.listMatchingPartitionPaths(HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters)) + val partitionPaths = fileIndex.listMatchingPartitionPaths(HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters)) .map(p => p.getPath).filter(p => modifiedPartitions.contains(p)) - .flatMap { relativePartitionPath => - fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala - } + if (hasMissingAffectedFiles) { + legacyAffectedFileSlices(partitionPaths, latestCommit) + } else { + collectIncrementalFileSlices(partitionPaths, latestCommit) + } } filterFileSlices(fileSlices, globPattern) } @@ -158,6 +158,76 @@ case class MergeOnReadIncrementalRelationV2(override val sqlContext: SQLContext, override def shouldIncludeLogFiles(): Boolean = fullTableScan + /** + * Collects the incremental file slices for the given modified partitions. + * + * Each returned file slice carries its base file plus all of its log files so the + * [[org.apache.hudi.common.table.read.HoodieFileGroupReader]] can perform a correct runtime merge. + * Resolving the current value of a record changed in the incremental window requires the full file + * slice, not just the files written within the window: + * - EVENT_TIME_ORDERING: a record written in the window may carry a lower ordering value than the + * version already in the base/earlier-log, so the existing version wins; a window-only view + * cannot determine the winner. + * - Partial updates: a window log block holds only the changed columns, so the unchanged columns + * must be filled in from the base file. + * (HUDI #18943.) + * + * The view is built from the (modified) partition listing (which includes the latest base file of + * each file group, honoring the metadata table when enabled) and scoped back to the file groups + * actually touched in the window (see [[affectedFileGroupIds]]) so untouched file groups are not + * read. The commit-time record filter applied during the scan still restricts the returned records + * to the incremental window. + * + * The view timeline is bounded to instants at or before `latestCommit` (the window's last commit) + * so that base/log files written by later commits are not visible. Without this bound a record + * updated again after the window would be merged with those later log files and its merged commit + * time would fall outside the window, dropping the in-window change from the result. + */ + private def collectIncrementalFileSlices(partitionPaths: Seq[String], latestCommit: String): Seq[FileSlice] = { + val engineContext = new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)) + val fsView = FileSystemViewManager.createInMemoryFileSystemViewWithTimeline( + engineContext, metaClient, fileIndex.metadataConfig, timeline.findInstantsBeforeOrEquals(latestCommit)) + try { + partitionPaths.flatMap { relativePartitionPath => + fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala + .filter(fs => affectedFileGroupIds.contains(fs.getFileId)) + } + } finally { + fsView.close() + } + } + + /** + * Builds the incremental file slices directly from the files recorded in the window's commits + * (`affectedFilesInCommits`), as the read path did before HUDI #18943. + * + * This is used only when [[hasMissingAffectedFiles]] is true and the full-table-scan fallback is + * disabled: some files referenced by the window have been removed (e.g. by cleaning), so there is + * no correct incremental result to produce. Listing from the recorded files (which include the + * missing paths) preserves the prior fail-early contract - the scan surfaces a file-not-found + * error pointing the user at `hoodie.datasource.read.incr.fallback.fulltablescan.enable` - instead + * of silently returning an empty/partial result from a fresh listing that no longer sees those + * files. + */ + private def legacyAffectedFileSlices(partitionPaths: Seq[String], latestCommit: String): Seq[FileSlice] = { + val fsView = new HoodieTableFileSystemView(metaClient, timeline, affectedFilesInCommits) + try { + partitionPaths.flatMap { relativePartitionPath => + fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala + } + } finally { + fsView.close() + } + } + + /** + * File group ids touched within the incremental window. Used to scope the partition-listing based + * view (which surfaces every file group in a modified partition) back to only the changed file + * groups, so we do not read and discard untouched file groups. + */ + private lazy val affectedFileGroupIds: Set[String] = + affectedFilesInCommits.asScala.map(f => FSUtils.getFileIdFromFilePath(f.getPath)).toSet + private def filterFileSlices(fileSlices: Seq[FileSlice], pathGlobPattern: String): Seq[FileSlice] = { val filteredFileSlices = if (!StringUtils.isNullOrEmpty(pathGlobPattern)) { val globMatcher = new GlobPattern("*" + pathGlobPattern) @@ -186,6 +256,12 @@ trait HoodieIncrementalRelationV2Trait extends HoodieBaseRelation { protected def startInstantArchived: Boolean = !queryContext.getArchivedInstants.isEmpty + // Some files referenced by the commits in the incremental window no longer exist on storage + // (e.g. they were removed by cleaning). Such a window cannot be served from the incremental + // file listing alone. + protected lazy val hasMissingAffectedFiles: Boolean = + affectedFilesInCommits.asScala.exists(fileStatus => !metaClient.getStorage.exists(fileStatus.getPath)) + // Fallback to full table scan if any of the following conditions matches: // 1. the start commit is archived // 2. the end commit is archived @@ -194,8 +270,7 @@ trait HoodieIncrementalRelationV2Trait extends HoodieBaseRelation { val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.key, DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.defaultValue).toBoolean - fallbackToFullTableScan && (startInstantArchived - || affectedFilesInCommits.asScala.exists(fileStatus => !metaClient.getStorage.exists(fileStatus.getPath))) + fallbackToFullTableScan && (startInstantArchived || hasMissingAffectedFiles) } protected val rangeType: RangeType diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index 3fe8c6ff62f71..7ed6eb0effc78 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -23,13 +23,14 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.client.utils.SparkInternalSchemaConverter import org.apache.hudi.common.config.{HoodieMemoryConfig, TypedProperties} import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.HoodieFileFormat +import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.schema.HoodieSchemaUtils import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, ParquetTableSchemaResolver} +import org.apache.hudi.common.table.log.InstantRange import org.apache.hudi.common.table.read.HoodieFileGroupReader import org.apache.hudi.common.util.{Option => HOption} -import org.apache.hudi.common.util.collection.ClosableIterator +import org.apache.hudi.common.util.collection.{ClosableIterator, CloseableFilterIterator} import org.apache.hudi.data.CloseableIteratorListener import org.apache.hudi.exception.HoodieNotSupportedException import org.apache.hudi.internal.schema.InternalSchema @@ -54,12 +55,13 @@ import org.apache.spark.sql.execution.datasources.orc.OrcUtils import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector} import org.apache.spark.sql.hudi.MultipleColumnarFileFormatReader import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.{Filter, In} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchUtils} import org.apache.spark.util.SerializableConfiguration import java.io.Closeable +import java.util.function.{Predicate => JPredicate} import scala.collection.JavaConverters.mapAsJavaMapConverter @@ -301,9 +303,38 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, .getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)) fileSliceMapping.getSlice(fileGroupName) match { case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty || fileSlice.getLogFiles.findAny().isPresent) => + // For incremental queries on a MOR file group that requires base/log merging, the + // commit-time span filter (`requiredFilters`) must NOT be pushed down into the file + // reads. Pushing it down drops the base-file row of a record whose latest base/log + // version is outside the incremental window, before the runtime merge runs. That breaks + // correctness whenever the merged result depends on data outside the window, e.g. + // event-time ordering (the existing higher-ordering version should win) and partial + // updates (unchanged columns must come from the base file) (HUDI #18943). Instead we + // read and merge the full file group, then filter the merged rows by commit time via + // `postMergeCommitTimeFilter`. We only switch to post-merge filtering when the filter + // can be built (commit-time field present); otherwise we keep the original pushdown. + val needsMerge = fileSlice.getLogFiles.findAny().isPresent + val postMergeCommitTimeFilter: Option[JPredicate[InternalRow]] = + if (isIncremental && needsMerge) { + buildCommitTimeRowFilter(requiredFilters, requestedStructType) + } else { + None + } + val readFilters = if (postMergeCommitTimeFilter.isDefined) Seq.empty[Filter] else requiredFilters val readerContext = new SparkFileFormatInternalRowReaderContext( - fileGroupBaseFileReader.value, filters, requiredFilters, storageConf, metaClient.getTableConfig, + fileGroupBaseFileReader.value, filters, readFilters, storageConf, metaClient.getTableConfig, sparkRequiredSchema = Some(requiredSchema)) + // Bound the merge inputs to the incremental window end: read only base records and log + // blocks committed at or before the window's last commit, so a record updated again + // after the window is not merged with those later log blocks (HUDI #18943). The + // post-merge commit-time filter then selects the records changed within the window. + if (postMergeCommitTimeFilter.isDefined) { + incrementalWindowEnd(requiredFilters).foreach { windowEnd => + readerContext.setInstantRange(HOption.of(InstantRange.builder() + .rangeType(InstantRange.RangeType.CLOSED_CLOSED).nullableBoundary(true) + .endInstant(windowEnd).build())) + } + } readerContext.enableLogicalTimestampFieldRepair(storageConf.getBoolean(ENABLE_LOGICAL_TIMESTAMP_REPAIR, true)) val props = metaClient.getTableConfig.getProps options.foreach(kv => props.setProperty(kv._1, kv._2)) @@ -328,9 +359,13 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, .withLength(baseFileLength) .withShouldUseRecordPosition(shouldUseRecordPosition) .build() + val mergedIter: ClosableIterator[InternalRow] = postMergeCommitTimeFilter match { + case Some(predicate) => new CloseableFilterIterator[InternalRow](reader.getClosableIterator, predicate) + case None => reader.getClosableIterator + } // Append partition values to rows and project to output schema appendPartitionAndProject( - reader.getClosableIterator, + mergedIter, requestedStructType, remainingPartitionSchema, outputSchema, @@ -353,6 +388,48 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, } } + /** + * The incremental window end (latest commit time in the window), extracted from the `In` filter on + * the commit-time metadata field in `requiredFilters`. Used to bound the merge inputs so that log + * blocks committed after the window are not merged. Returns None when no such filter is present. + */ + private def incrementalWindowEnd(requiredFilters: Seq[Filter]): Option[String] = { + val commitTimes: Seq[String] = requiredFilters.collect { + case In(attr, values) if attr == HoodieRecord.COMMIT_TIME_METADATA_FIELD => + values.filter(_ != null).map(_.toString).toSeq + }.flatten + if (commitTimes.isEmpty) None else Some(commitTimes.max) + } + + /** + * Builds a predicate over merged rows that keeps only the records whose `_hoodie_commit_time` + * falls within the incremental query window encoded by `requiredFilters` (an `In` filter on the + * commit-time metadata field, produced by the incremental relations). Returns None when no such + * filter can be built (e.g. the commit-time field is not part of the read schema), in which case + * the caller keeps the original read-time filter pushdown. + */ + private def buildCommitTimeRowFilter(requiredFilters: Seq[Filter], + readSchema: StructType): Option[JPredicate[InternalRow]] = { + if (requiredFilters.isEmpty) { + None + } else { + readSchema.getFieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD).flatMap { idx => + val allowedCommitTimes: Set[String] = requiredFilters.collect { + case In(attr, values) if attr == HoodieRecord.COMMIT_TIME_METADATA_FIELD => + values.filter(_ != null).map(_.toString).toSet + }.flatten.toSet + if (allowedCommitTimes.isEmpty) { + None + } else { + Some(new JPredicate[InternalRow] { + override def test(row: InternalRow): Boolean = + !row.isNullAt(idx) && allowedCommitTimes.contains(row.getUTF8String(idx).toString) + }) + } + } + } + } + private def buildBaseFileReader(spark: SparkSession, options: Map[String, String], configuration: Configuration, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala index 668f1d44fc32f..b7449ed81609a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala @@ -32,6 +32,8 @@ import org.apache.hudi.common.testutils.HoodieTestUtils import org.apache.hudi.common.util.CompactionUtils import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieNotSupportedException +import org.apache.hudi.storage.StoragePath +import org.apache.hudi.testutils.DataSourceTestUtils import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.getMetaClientAndFileSystemView @@ -68,6 +70,539 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { testPartialUpdate("mor", "parquet", commitTimeOrdering = true) } + test("Test partial update incremental query returns full row with MOR and Avro log format") { + testPartialUpdateIncrementalQuery("avro", commitTimeOrdering = true) + } + + test("Test partial update incremental query returns full row with MOR and Parquet log format") { + testPartialUpdateIncrementalQuery("parquet", commitTimeOrdering = true) + } + + test("Test partial update incremental query returns full row with MOR and event time ordering") { + testPartialUpdateIncrementalQuery("avro", commitTimeOrdering = false) + } + + test("Test partial update incremental query returns full row with MOR on a partitioned table") { + testPartialUpdateIncrementalQueryPartitioned("avro") + } + + test("Test incremental query reflects event time ordering on a MOR table (no partial updates)") { + testIncrementalEventTimeOrdering("avro") + } + + test("Test partial update incremental query bounds the window and excludes untouched records") { + testPartialUpdateIncrementalWindowBounds("avro") + } + + test("Test partial update incremental query returns inserts and updates in the same window") { + testPartialUpdateIncrementalInsertAndUpdate("avro") + } + + test("Test partial update incremental query merges multiple partial updates to the same key") { + testPartialUpdateIncrementalMultipleUpdatesSameKey("avro") + } + + test("Test partial update incremental query excludes log files committed after the window end") { + testPartialUpdateIncrementalExcludesLaterCommits("avro") + } + + test("Test partial update incremental query merges a log file on a compacted base file") { + testPartialUpdateIncrementalAfterCompaction("avro") + } + + test("Test incremental query on a COW table is unaffected by the MOR incremental change") { + testIncrementalCowUnaffected("avro") + } + + /** + * HUDI #18943: for a MOR table with partial updates, an incremental query must return ALL columns + * of a changed record (unchanged columns filled in from the base file), not only the columns + * altered by the partial update. + */ + def testPartialUpdateIncrementalQuery(logDataBlockFormat: String, commitTimeOrdering: Boolean): Unit = { + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + "/" + tableName + withSQLConf( + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0", + DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key() -> "true", + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key() -> logDataBlockFormat, + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> "true") { + val mergeMode = if (commitTimeOrdering) { + RecordMergeMode.COMMIT_TIME_ORDERING.name() + } else { + RecordMergeMode.EVENT_TIME_ORDERING.name() + } + val preCombineString = if (commitTimeOrdering) "" else "preCombineField = '_ts'," + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | _ts int, + | description string + |) using hudi + |tblproperties( + | type ='mor', + | primaryKey = 'id', + | $preCombineString + | recordMergeMode = '$mergeMode' + |) + |location '$basePath' + """.stripMargin) + + // Commit 1: full rows land in the base file. + spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')," + + "(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30, 1250, 'a3: desc3')") + val storage = HoodieTestUtils.getStorage(new StoragePath(basePath)) + val firstCompletionTime = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Commit 2: partial update changing only "price" and "_ts" -> written as a partial log block. + // The new "_ts" values are larger than the existing ones so the update wins under event time + // ordering as well. + spark.sql( + s""" + |merge into $tableName t0 + |using ( select 1 as id, 12.0 as price, 1001 as ts + |union select 3 as id, 25.0 as price, 1260 as ts) s0 + |on t0.id = s0.id + |when matched then update set price = s0.price, _ts = s0.ts + |""".stripMargin) + + // Snapshot query returns the merged full rows (unchanged columns preserved). + checkAnswer(s"select id, name, price, _ts, description from $tableName order by id")( + Seq(1, "a1", 12.0, 1001, "a1: desc1"), + Seq(2, "a2", 20.0, 1200, "a2: desc2"), + Seq(3, "a3", 25.0, 1260, "a3: desc3") + ) + + // Incremental query over just commit 2 must return the FULL updated rows, including the + // columns ("name", "description") not part of the partial update. Before the fix these come + // back null/garbled because the base-file row is dropped before the runtime merge. + checkAnswer(readIncremental(basePath, firstCompletionTime, "id", "name", "price", "_ts", "description"))( + Seq(1, "a1", 12.0, 1001, "a1: desc1"), + Seq(3, "a3", 25.0, 1260, "a3: desc3") + ) + } + } + } + + /** + * Same as [[testPartialUpdateIncrementalQuery]] but on a partitioned table where only some file + * groups in the modified partitions are touched. Exercises the file-group scoping in the + * incremental file listing. + */ + def testPartialUpdateIncrementalQueryPartitioned(logDataBlockFormat: String): Unit = { + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + "/" + tableName + withSQLConf( + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0", + DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key() -> "true", + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key() -> logDataBlockFormat, + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> "true") { + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | _ts int, + | description string, + | part string + |) using hudi + |partitioned by (part) + |tblproperties( + | type ='mor', + | primaryKey = 'id', + | recordMergeMode = '${RecordMergeMode.COMMIT_TIME_ORDERING.name()}' + |) + |location '$basePath' + """.stripMargin) + + spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1', 'p1')," + + "(2, 'a2', 20, 1200, 'a2: desc2', 'p1'), (3, 'a3', 30, 1250, 'a3: desc3', 'p2')") + val storage = HoodieTestUtils.getStorage(new StoragePath(basePath)) + val firstCompletionTime = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + spark.sql( + s""" + |merge into $tableName t0 + |using ( select 1 as id, 12.0 as price, 1001 as ts, 'p1' as part + |union select 3 as id, 25.0 as price, 1260 as ts, 'p2' as part) s0 + |on t0.id = s0.id + |when matched then update set price = s0.price, _ts = s0.ts + |""".stripMargin) + + checkAnswer(s"select id, name, price, _ts, description, part from $tableName order by id")( + Seq(1, "a1", 12.0, 1001, "a1: desc1", "p1"), + Seq(2, "a2", 20.0, 1200, "a2: desc2", "p1"), + Seq(3, "a3", 25.0, 1260, "a3: desc3", "p2") + ) + + checkAnswer(readIncremental(basePath, firstCompletionTime, "id", "name", "price", "_ts", "description", "part"))( + Seq(1, "a1", 12.0, 1001, "a1: desc1", "p1"), + Seq(3, "a3", 25.0, 1260, "a3: desc3", "p2") + ) + } + } + } + + /** + * HUDI #18943: even without partial updates, an incremental query on an EVENT_TIME_ORDERING MOR + * table must resolve the winning record version via a runtime merge against the base/earlier log, + * not just surface the value written within the window. A window write with a lower ordering value + * loses, so it must not appear in the incremental result; a window write with a higher ordering + * value wins and is returned. + */ + def testIncrementalEventTimeOrdering(logDataBlockFormat: String): Unit = { + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + "/" + tableName + withSQLConf( + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0", + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key() -> logDataBlockFormat, + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> "true") { + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | _ts int + |) using hudi + |tblproperties( + | type ='mor', + | primaryKey = 'id', + | preCombineField = '_ts', + | recordMergeMode = '${RecordMergeMode.EVENT_TIME_ORDERING.name()}' + |) + |location '$basePath' + """.stripMargin) + + // Commit 1: base file with high ordering values. + spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000), (2, 'a2', 20, 1000)") + val storage = HoodieTestUtils.getStorage(new StoragePath(basePath)) + val firstCompletionTime = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Commit 2: id 1 updated with a HIGHER _ts (wins); id 2 updated with a LOWER _ts (loses). + spark.sql( + s""" + |merge into $tableName t0 + |using ( select 1 as id, 'a1' as name, 99.0 as price, 2000 as ts + |union select 2 as id, 'a2' as name, 88.0 as price, 500 as ts) s0 + |on t0.id = s0.id + |when matched then update set price = s0.price, _ts = s0.ts + |""".stripMargin) + + // Snapshot: id 1 took the higher-ts update; id 2 kept the base value (lower-ts update lost). + checkAnswer(s"select id, name, price, _ts from $tableName order by id")( + Seq(1, "a1", 99.0, 2000), + Seq(2, "a2", 20.0, 1000) + ) + + // Incremental over commit 2: only id 1 actually changed its winning version; id 2's losing + // write must not surface (its merged commit time stays outside the window). + checkAnswer(readIncremental(basePath, firstCompletionTime, "id", "name", "price", "_ts"))( + Seq(1, "a1", 99.0, 2000) + ) + } + } + } + + /** + * Verifies the incremental window is honored across multiple commits: a sub-window returns only + * the records changed within it (fully merged), and a record never touched in the window does not + * appear. Commit-time ordering keeps the latest write winning. + */ + def testPartialUpdateIncrementalWindowBounds(logDataBlockFormat: String): Unit = { + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + "/" + tableName + withSQLConf( + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0", + DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key() -> "true", + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key() -> logDataBlockFormat, + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> "true") { + spark.sql( + s""" + |create table $tableName (id int, name string, price double, _ts int, description string) + |using hudi + |tblproperties(type ='mor', primaryKey = 'id', recordMergeMode = '${RecordMergeMode.COMMIT_TIME_ORDERING.name()}') + |location '$basePath' + """.stripMargin) + val storage = HoodieTestUtils.getStorage(new StoragePath(basePath)) + + // Commit 1: base file with three records. + spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')," + + "(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30, 1250, 'a3: desc3')") + val t1 = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Commit 2: partial update of id 1. + spark.sql(s"merge into $tableName t0 using (select 1 as id, 11.0 as price, 1001 as ts) s0 " + + "on t0.id = s0.id when matched then update set price = s0.price, _ts = s0.ts") + val t2 = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Commit 3: partial update of id 2. + spark.sql(s"merge into $tableName t0 using (select 2 as id, 22.0 as price, 1201 as ts) s0 " + + "on t0.id = s0.id when matched then update set price = s0.price, _ts = s0.ts") + + // Window (t1, t2] = commit 2 only -> just id 1 (full row); id 2 and id 3 excluded. + checkAnswer(readIncrementalBounded(basePath, t1, t2, "id", "name", "price", "_ts", "description"))( + Seq(1, "a1", 11.0, 1001, "a1: desc1") + ) + + // Window (t2, latest] = commit 3 only -> just id 2 (full row). + checkAnswer(readIncremental(basePath, t2, "id", "name", "price", "_ts", "description"))( + Seq(2, "a2", 22.0, 1201, "a2: desc2") + ) + + // Window (t1, latest] = commits 2 and 3 -> id 1 and id 2 (full rows); id 3 never touched. + checkAnswer(readIncremental(basePath, t1, "id", "name", "price", "_ts", "description"))( + Seq(1, "a1", 11.0, 1001, "a1: desc1"), + Seq(2, "a2", 22.0, 1201, "a2: desc2") + ) + } + } + } + + /** + * Verifies a single incremental window that contains both a partial update (existing key) and an + * insert (new key) returns both with all columns. + */ + def testPartialUpdateIncrementalInsertAndUpdate(logDataBlockFormat: String): Unit = { + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + "/" + tableName + withSQLConf( + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0", + DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key() -> "true", + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key() -> logDataBlockFormat, + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> "true") { + spark.sql( + s""" + |create table $tableName (id int, name string, price double, _ts int, description string) + |using hudi + |tblproperties(type ='mor', primaryKey = 'id', recordMergeMode = '${RecordMergeMode.COMMIT_TIME_ORDERING.name()}') + |location '$basePath' + """.stripMargin) + val storage = HoodieTestUtils.getStorage(new StoragePath(basePath)) + + spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1'), (2, 'a2', 20, 1200, 'a2: desc2')") + val t1 = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Commit 2: partial update of id 1 + insert of new id 3. + spark.sql( + s""" + |merge into $tableName t0 + |using ( select 1 as id, 'a1' as name, 12.0 as price, 1001 as _ts, 'a1: desc1' as description + |union select 3 as id, 'a3' as name, 30.0 as price, 1300 as _ts, 'a3: desc3' as description) s0 + |on t0.id = s0.id + |when matched then update set price = s0.price, _ts = s0._ts + |when not matched then insert * + |""".stripMargin) + + // Window over commit 2 -> id 1 (partial update, full row) and id 3 (insert, full row); id 2 excluded. + checkAnswer(readIncremental(basePath, t1, "id", "name", "price", "_ts", "description"))( + Seq(1, "a1", 12.0, 1001, "a1: desc1"), + Seq(3, "a3", 30.0, 1300, "a3: desc3") + ) + } + } + } + + /** + * Verifies multiple partial updates to the same key across commits merge into a single full row + * in the incremental result (later partial columns win, earlier untouched columns retained). + */ + def testPartialUpdateIncrementalMultipleUpdatesSameKey(logDataBlockFormat: String): Unit = { + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + "/" + tableName + withSQLConf( + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0", + DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key() -> "true", + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key() -> logDataBlockFormat, + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> "true") { + spark.sql( + s""" + |create table $tableName (id int, name string, price double, _ts int, description string) + |using hudi + |tblproperties(type ='mor', primaryKey = 'id', recordMergeMode = '${RecordMergeMode.COMMIT_TIME_ORDERING.name()}') + |location '$basePath' + """.stripMargin) + val storage = HoodieTestUtils.getStorage(new StoragePath(basePath)) + + spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')") + val t1 = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Commit 2: update "price". Commit 3: update "description". Both partial, same key. + spark.sql(s"merge into $tableName t0 using (select 1 as id, 12.0 as price, 1001 as ts) s0 " + + "on t0.id = s0.id when matched then update set price = s0.price, _ts = s0.ts") + spark.sql(s"merge into $tableName t0 using (select 1 as id, 'a1: desc1-v2' as descr, 1002 as ts) s0 " + + "on t0.id = s0.id when matched then update set description = s0.descr, _ts = s0.ts") + + // Window (t1, latest] -> a single fully-merged row: name from base, price from commit 2, + // description from commit 3, _ts the latest. + checkAnswer(readIncremental(basePath, t1, "id", "name", "price", "_ts", "description"))( + Seq(1, "a1", 12.0, 1002, "a1: desc1-v2") + ) + } + } + } + + /** + * Verifies that a bounded incremental query (with an END_COMMIT) does not merge in log files + * committed after the window end. A record updated inside the window and again afterwards must be + * returned with its value as of the window end, not its latest value (and must not be dropped). + */ + def testPartialUpdateIncrementalExcludesLaterCommits(logDataBlockFormat: String): Unit = { + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + "/" + tableName + withSQLConf( + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0", + DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key() -> "true", + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key() -> logDataBlockFormat, + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> "true") { + spark.sql( + s""" + |create table $tableName (id int, name string, price double, _ts int, description string) + |using hudi + |tblproperties(type ='mor', primaryKey = 'id', recordMergeMode = '${RecordMergeMode.COMMIT_TIME_ORDERING.name()}') + |location '$basePath' + """.stripMargin) + val storage = HoodieTestUtils.getStorage(new StoragePath(basePath)) + + spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')") + val t1 = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Commit 2 (inside the window): price -> 11. + spark.sql(s"merge into $tableName t0 using (select 1 as id, 11.0 as price, 1001 as ts) s0 " + + "on t0.id = s0.id when matched then update set price = s0.price, _ts = s0.ts") + val t2 = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // Commit 3 (after the window): price -> 99. + spark.sql(s"merge into $tableName t0 using (select 1 as id, 99.0 as price, 9999 as ts) s0 " + + "on t0.id = s0.id when matched then update set price = s0.price, _ts = s0.ts") + + // Snapshot reflects the latest value. + checkAnswer(s"select id, name, price, _ts, description from $tableName")( + Seq(1, "a1", 99.0, 9999, "a1: desc1") + ) + + // Incremental over (t1, t2] must return id 1 as of commit 2 (price 11), not the later value + // and not an empty result. + checkAnswer(readIncrementalBounded(basePath, t1, t2, "id", "name", "price", "_ts", "description"))( + Seq(1, "a1", 11.0, 1001, "a1: desc1") + ) + } + } + } + + /** + * Verifies a partial update written as a log file on top of a compacted base file is merged + * correctly in an incremental query: the value carried into the compacted base file is retained + * and the later partial update is applied on top. + */ + def testPartialUpdateIncrementalAfterCompaction(logDataBlockFormat: String): Unit = { + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + "/" + tableName + withSQLConf( + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0", + DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key() -> "true", + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key() -> logDataBlockFormat, + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> "true", + HoodieCompactionConfig.INLINE_COMPACT.key() -> "true", + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "2") { + spark.sql( + s""" + |create table $tableName (id int, name string, price double, _ts int, description string) + |using hudi + |tblproperties(type ='mor', primaryKey = 'id', recordMergeMode = '${RecordMergeMode.COMMIT_TIME_ORDERING.name()}') + |location '$basePath' + """.stripMargin) + val storage = HoodieTestUtils.getStorage(new StoragePath(basePath)) + + // Delta commit 1 (base) + delta commit 2 (partial update) -> triggers inline compaction, so + // the compacted base file now carries price = 11. + spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'd1')") + spark.sql(s"merge into $tableName t0 using (select 1 as id, 11.0 as price, 1001 as ts) s0 " + + "on t0.id = s0.id when matched then update set price = s0.price, _ts = s0.ts") + val afterCompaction = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + // A further partial update lands as a log file on top of the compacted base file. + spark.sql(s"merge into $tableName t0 using (select 1 as id, 'd1v2' as descr, 1002 as ts) s0 " + + "on t0.id = s0.id when matched then update set description = s0.descr, _ts = s0.ts") + + // Incremental over the last commit merges the log file with the compacted base: price 11 + // (from the compacted base), description d1v2 (from the log), name retained. + checkAnswer(readIncremental(basePath, afterCompaction, "id", "name", "price", "_ts", "description"))( + Seq(1, "a1", 11.0, 1002, "d1v2") + ) + } + } + } + + /** + * Confirms the MOR incremental change does not regress COW incremental queries (COW file groups + * have no log files, so they keep the original read path). + */ + def testIncrementalCowUnaffected(logDataBlockFormat: String): Unit = { + withTempDir { tmp => + val tableName = generateTableName + val basePath = tmp.getCanonicalPath + "/" + tableName + withSQLConf( + HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0", + DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key() -> "true", + HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> "true") { + spark.sql( + s""" + |create table $tableName (id int, name string, price double, _ts int, description string) + |using hudi + |tblproperties(type ='cow', primaryKey = 'id', recordMergeMode = '${RecordMergeMode.COMMIT_TIME_ORDERING.name()}') + |location '$basePath' + """.stripMargin) + val storage = HoodieTestUtils.getStorage(new StoragePath(basePath)) + + spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'd1'), (2, 'a2', 20, 1200, 'd2')") + val t1 = DataSourceTestUtils.latestCommitCompletionTime(storage, basePath) + + spark.sql(s"merge into $tableName t0 using (select 1 as id, 12.0 as price, 1001 as ts) s0 " + + "on t0.id = s0.id when matched then update set price = s0.price, _ts = s0.ts") + + // Incremental over the update returns only id 1 with its full updated row; id 2 untouched. + checkAnswer(readIncremental(basePath, t1, "id", "name", "price", "_ts", "description"))( + Seq(1, "a1", 12.0, 1001, "d1") + ) + } + } + } + + private def readIncrementalBounded(basePath: String, startCompletionTime: String, endCompletionTime: String, + cols: String*): Array[org.apache.spark.sql.Row] = { + spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.START_COMMIT.key, startCompletionTime) + .option(DataSourceReadOptions.END_COMMIT.key, endCompletionTime) + .load(basePath) + .select(cols.head, cols.tail: _*) + .orderBy("id") + .collect() + } + + private def readIncremental(basePath: String, startCompletionTime: String, cols: String*): Array[org.apache.spark.sql.Row] = { + spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.START_COMMIT.key, startCompletionTime) + .load(basePath) + .select(cols.head, cols.tail: _*) + .orderBy("id") + .collect() + } + test("Test partial update and insert with COW and Avro log format") { testPartialUpdateWithInserts("cow", "avro") }