diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index 3fe8c6ff62f71..f3e2330309366 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -250,6 +250,16 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { val outputSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental + // Spark planner only adds the user-provided predicates (from `WHERE` clause or `.filter()`) + // to `filters`; the `requiredFilters` from `HoodieBaseHadoopFsRelationFactory#getRequiredFilters` + // are not visible to the planner, thus the `requiredSchema` passed by Spark can miss the + // columns in `requiredFilters`. This happens for incremental query where `requiredFilters` + // is present. To allow correct projection and filtering, the columns from `requiredFilters` + // are added back to the `readRequiredSchema` for reading the file. + val filterOnlyFields = requiredFilters.flatMap(_.references).distinct + .filterNot(name => requiredSchema.fieldNames.contains(name) || partitionSchema.fieldNames.contains(name)) + .flatMap(name => dataStructType.fields.find(_.name == name)) + val readRequiredSchema = StructType(requiredSchema.fields ++ filterOnlyFields) val augmentedStorageConf = new HadoopStorageConfiguration(hadoopConf).getInline setSchemaEvolutionConfigs(augmentedStorageConf) augmentedStorageConf.set(ENABLE_LOGICAL_TIMESTAMP_REPAIR, hasTimestampMillisFieldInTableSchema.toString) @@ -265,7 +275,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, val exclusionFields = new java.util.HashSet[String]() exclusionFields.add("op") partitionSchema.fields.foreach(f => exclusionFields.add(f.name)) - val requestedStructType = StructType(requiredSchema.fields ++ partitionSchema.fields.filter(f => mandatoryFields.contains(f.name))) + val requestedStructType = StructType(readRequiredSchema.fields ++ partitionSchema.fields.filter(f => mandatoryFields.contains(f.name))) val requestedSchema = HoodieSchemaUtils.pruneDataSchema(schema, HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(requestedStructType, sanitizedTableName), exclusionFields) val dataStructTypeWithMandatoryPartitionFields = StructType(dataStructType.fields ++ partitionSchema.fields.filter(f => mandatoryFields.contains(f.name))) val dataSchema = HoodieSchemaUtils.pruneDataSchema(schema, HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(dataStructTypeWithMandatoryPartitionFields, sanitizedTableName), exclusionFields) @@ -339,7 +349,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, case _ => readBaseFile(file, baseFileReader.value, requestedStructType, remainingPartitionSchema, fixedPartitionIndexes, - requiredSchema, partitionSchema, outputSchema, filters ++ requiredFilters, storageConf) + readRequiredSchema, partitionSchema, outputSchema, filters ++ requiredFilters, storageConf) } // CDC queries. case hoodiePartitionCDCFileGroupSliceMapping: HoodiePartitionCDCFileGroupMapping => @@ -347,7 +357,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, case _ => readBaseFile(file, baseFileReader.value, requestedStructType, remainingPartitionSchema, fixedPartitionIndexes, - requiredSchema, partitionSchema, outputSchema, filters ++ requiredFilters, storageConf) + readRequiredSchema, partitionSchema, outputSchema, filters ++ requiredFilters, storageConf) } CloseableIteratorListener.addListener(iter) } @@ -493,28 +503,31 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, // executor private def readBaseFile(file: PartitionedFile, parquetFileReader: SparkColumnarFileReader, requestedSchema: StructType, - remainingPartitionSchema: StructType, fixedPartitionIndexes: Set[Int], requiredSchema: StructType, + remainingPartitionSchema: StructType, fixedPartitionIndexes: Set[Int], readRequiredSchema: StructType, partitionSchema: StructType, outputSchema: StructType, filters: Seq[Filter], storageConf: StorageConfiguration[Configuration]): Iterator[InternalRow] = { // Detect vector columns and create modified schemas with BinaryType. // Each schema is detected independently because ordinals are relative to the schema being // modified — outputSchema and requestedSchema may have vector columns at different positions - // than requiredSchema (e.g. when partition columns are interleaved). - val (modifiedRequiredSchema, vectorCols) = withVectorRewrite(requiredSchema) + // than readRequiredSchema (e.g. when partition columns are interleaved). + val (modifiedReadRequiredSchema, vectorCols) = withVectorRewrite(readRequiredSchema) val hasVectors = vectorCols.nonEmpty val (modifiedOutputSchema, outputVectorCols) = if (hasVectors) withVectorRewrite(outputSchema) else (outputSchema, Map.empty[Int, HoodieSchema.Vector]) val (modifiedRequestedSchema, _) = if (hasVectors) withVectorRewrite(requestedSchema) else (requestedSchema, Map.empty[Int, HoodieSchema.Vector]) val rawIter = if (remainingPartitionSchema.fields.length == partitionSchema.fields.length) { //none of partition fields are read from the file, so the reader will do the appending for us - parquetFileReader.read(file, modifiedRequiredSchema, partitionSchema, internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType) + val iter = parquetFileReader.read(file, modifiedReadRequiredSchema, partitionSchema, internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType) + projectIfNeeded(iter, StructType(modifiedReadRequiredSchema.fields ++ partitionSchema.fields), modifiedOutputSchema) } else if (remainingPartitionSchema.fields.length == 0) { //we read all of the partition fields from the file val pfileUtils = sparkAdapter.getSparkPartitionedFileUtils //we need to modify the partitioned file so that the partition values are empty val modifiedFile = pfileUtils.createPartitionedFile(InternalRow.empty, pfileUtils.getPathFromPartitionedFile(file), file.start, file.length) + val readSchema = StructType(modifiedReadRequiredSchema.fields ++ partitionSchema.fields) //and we pass an empty schema for the partition schema - parquetFileReader.read(modifiedFile, modifiedOutputSchema, new StructType(), internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType) + val iter = parquetFileReader.read(modifiedFile, readSchema, new StructType(), internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType) + projectIfNeeded(iter, readSchema, modifiedOutputSchema) } else { //need to do an additional projection here. The case in mind is that partition schema is "a,b,c" mandatoryFields is "a,c", //then we will read (dataSchema + a + c) and append b. So the final schema will be (data schema + a + c +b) @@ -527,13 +540,10 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, } if (hasVectors) { - // The raw iterator has BinaryType for vector columns; convert back to ArrayType - val readSchema = if (remainingPartitionSchema.fields.length == partitionSchema.fields.length) { - StructType(modifiedRequiredSchema.fields ++ partitionSchema.fields) - } else { - modifiedOutputSchema - } - wrapWithVectorConversion(rawIter, readSchema, outputSchema, outputVectorCols) + // The raw iterator has BinaryType for vector columns; convert back to ArrayType. + // All branches above produce rows in modifiedOutputSchema: filter-only columns from + // readRequiredSchema are projected away by projectIfNeeded/projectIter. + wrapWithVectorConversion(rawIter, modifiedOutputSchema, outputSchema, outputVectorCols) } else { rawIter } @@ -548,6 +558,18 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, }.asInstanceOf[Iterator[InternalRow]] } + /** + * Projects to `to` only when the read schema was augmented with filter-only columns; + * otherwise returns the iterator as is, preserving columnar batches. + */ + private def projectIfNeeded(iter: Iterator[InternalRow], from: StructType, to: StructType): Iterator[InternalRow] = { + if (from.fieldNames.sameElements(to.fieldNames)) { + iter + } else { + projectIter(iter, from, to) + } + } + private def getFixedPartitionValues(allPartitionValues: InternalRow, partitionSchema: StructType, fixedPartitionIndexes: Set[Int]): InternalRow = { InternalRow.fromSeq(allPartitionValues.toSeq(partitionSchema).zipWithIndex.filter(p => fixedPartitionIndexes.contains(p._2)).map(p => p._1)) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala new file mode 100644 index 0000000000000..ccb23158ad414 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig} +import org.apache.hudi.storage.StoragePath +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{DataFrame, SaveMode} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.CsvSource + +import scala.collection.JavaConverters._ + +/** + * Incremental query correctness with the file group reader across COW/MOR, source table + * versions, read versions, and query shapes that prune `_hoodie_commit_time` from the scan + * schema (count(), isEmpty(), narrow projections). Runs without HoodieSparkSessionExtension, + * so the file format alone must keep the span-filter columns readable. + */ +class TestIncrementalReadWithFileGroupReader extends SparkClientFunctionalTestHarness { + + val columns: Seq[String] = Seq("ts", "key", "rider", "fare", "pt") + + // c1..c3 insert disjoint key pairs (one file group, base files only via small file handling); + // c4..c6 are update commits (log files on MOR), each updating k1 with a different value so a + // range must surface only the targeted update of k1 + val batches: Seq[(Seq[(Int, String, String, Double, String)], String)] = Seq( + (Seq((1, "k1", "rider-c1", 10.0, "pt1"), (1, "k2", "rider-c1", 10.0, "pt1")), + DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL), + (Seq((2, "k3", "rider-c2", 20.0, "pt1"), (2, "k4", "rider-c2", 20.0, "pt1")), + DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL), + (Seq((3, "k5", "rider-c3", 30.0, "pt1"), (3, "k6", "rider-c3", 30.0, "pt1")), + DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL), + (Seq((4, "k1", "rider-c4", 40.0, "pt1"), (4, "k2", "rider-c4", 40.0, "pt1")), + DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL), + (Seq((5, "k1", "rider-c5", 50.0, "pt1"), (5, "k3", "rider-c5", 50.0, "pt1"), (5, "k4", "rider-c5", 50.0, "pt1")), + DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL), + (Seq((6, "k1", "rider-c6", 60.0, "pt1"), (6, "k5", "rider-c6", 60.0, "pt1"), (6, "k6", "rider-c6", 60.0, "pt1")), + DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)) + + @ParameterizedTest + @CsvSource(value = Array( + "COPY_ON_WRITE,6,6", + "COPY_ON_WRITE,8,6", + "COPY_ON_WRITE,8,8", + "MERGE_ON_READ,6,6", + "MERGE_ON_READ,8,6", + "MERGE_ON_READ,8,8" + )) + def testIncrementalReadRanges(tableType: String, sourceVersion: Int, readVersion: Int): Unit = { + batches.zipWithIndex.foreach { case ((data, operation), i) => + val mode = if (i == 0) SaveMode.Overwrite else SaveMode.Append + write(data, tableType, sourceVersion, operation, mode) + if (i == 2) { + // small file handling must have kept a single file group with base files only + val (baseFiles, logFiles) = listDataFiles() + assertEquals(3, baseFiles.size, "Expected one base file per insert commit") + assertEquals(1, baseFiles.map(FSUtils.getFileId).distinct.size, "Expected a single file group") + assertTrue(logFiles.isEmpty, "Expected no log files after insert-only commits") + } + } + + val metaClient = HoodieTableMetaClient.builder() + .setConf(storageConf().newInstance()).setBasePath(basePath()).build() + assertEquals(sourceVersion, metaClient.getTableConfig.getTableVersion.versionCode()) + val (baseFiles, logFiles) = listDataFiles() + assertEquals(1, baseFiles.map(FSUtils.getFileId).distinct.size, "Expected a single file group") + if (tableType == "MERGE_ON_READ") { + assertEquals(3, baseFiles.size, "Update commits must not rewrite MOR base files") + assertEquals(3, logFiles.size, "Expected one log file per update commit") + } else { + assertEquals(6, baseFiles.size, "Expected one base file per commit") + assertTrue(logFiles.isEmpty, "Expected no log files on COW") + } + // records merged into the latest base file keep their original commit times + val latestBaseFile = baseFiles.maxBy(name => FSUtils.getCommitTime(name)) + val commitTimesInBaseFile = spark.read.parquet(new Path(new Path(basePath, "pt1"), latestBaseFile).toString) + .select("_hoodie_commit_time").distinct().count() + assertTrue(commitTimesInBaseFile > 1, + s"Expected multiple commit times in the latest base file, got $commitTimesInBaseFile") + + // c1..c6 ordered by requested time + val instants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants + .getInstants.asScala.toList + assertEquals(6, instants.size) + + // (000, c2]: base files only + assertIncrementalRange(readVersion, instants, 0, 2, + Set(("k1", 1), ("k2", 1), ("k3", 2), ("k4", 2))) + // (c1, c2]: single base file in range + assertIncrementalRange(readVersion, instants, 1, 2, + Set(("k3", 2), ("k4", 2))) + // (c2, c4]: base file of c3 plus c4's log file on MOR; carried-over c1/c2 rows filtered out + assertIncrementalRange(readVersion, instants, 2, 4, + Set(("k5", 3), ("k6", 3), ("k1", 4), ("k2", 4))) + // (c3, c5]: log files of c4/c5 only on MOR; k1 updated in both c4 and c5 must surface once + // with the latest in-range value + assertIncrementalRange(readVersion, instants, 3, 5, + Set(("k1", 5), ("k2", 4), ("k3", 5), ("k4", 5))) + // (c6, c6]: empty range + assertIncrementalRange(readVersion, instants, 6, 6, Set.empty) + } + + private def assertIncrementalRange(readVersion: Int, + instants: List[HoodieInstant], + startIdx: Int, endIdx: Int, + expected: Set[(String, Int)]): Unit = { + def boundary(idx: Int): String = { + if (idx == 0) { + "000" + } else if (readVersion == 6) { + instants(idx - 1).requestedTime + } else { + instants(idx - 1).getCompletionTime + } + } + val start = boundary(startIdx) + val end = boundary(endIdx) + + // select * + val rows = readIncremental(readVersion, start, end).collect() + .map(r => (r.getAs[String]("key"), r.getAs[Int]("ts"))).toSet + assertEquals(expected, rows) + // projection without _hoodie_commit_time + val keys = readIncremental(readVersion, start, end).select("key").collect().map(_.getString(0)).toSet + assertEquals(expected.map(_._1), keys) + // these query shapes prune `_hoodie_commit_time` out of the scan schema + assertEquals(expected.size.toLong, readIncremental(readVersion, start, end).count()) + assertEquals(expected.isEmpty, readIncremental(readVersion, start, end).isEmpty) + } + + private def write(data: Seq[(Int, String, String, Double, String)], tableType: String, + sourceVersion: Int, operation: String, mode: SaveMode): Unit = { + spark.createDataFrame(data).toDF(columns: _*).write.format("hudi") + .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "key") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "pt") + .option(HoodieTableConfig.ORDERING_FIELDS.key, "ts") + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType) + .option(DataSourceWriteOptions.TABLE_NAME.key, "test_incr_read_fgr") + .option(HoodieWriteConfig.WRITE_TABLE_VERSION.key, sourceVersion.toString) + .option(HoodieCompactionConfig.INLINE_COMPACT.key, "false") + .option(DataSourceWriteOptions.OPERATION.key, operation) + .option("hoodie.insert.shuffle.parallelism", "2") + .option("hoodie.upsert.shuffle.parallelism", "2") + .mode(mode) + .save(basePath) + } + + private def readIncremental(readVersion: Int, start: String, end: String): DataFrame = { + val reader = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.START_COMMIT.key(), start) + .option(DataSourceReadOptions.END_COMMIT.key(), end) + val readerWithVersion = if (readVersion == 6) { + // same access pattern as the S3/GCS event incremental sources + reader.option(DataSourceReadOptions.INCREMENTAL_READ_TABLE_VERSION.key(), "6") + } else { + reader + } + readerWithVersion.load(basePath) + } + + private def listDataFiles(): (Seq[String], Seq[String]) = { + val names = fs.listStatus(new Path(basePath, "pt1")).map(_.getPath.getName).toSeq + (names.filter(n => FSUtils.isBaseFile(new StoragePath(n))), names.filter(n => FSUtils.isLogFile(n))) + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java index 792c1e26b98b3..541f90ab6c9c2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java @@ -19,9 +19,9 @@ package org.apache.hudi.utilities.sources.helpers; import org.apache.hudi.DataSourceReadOptions; +import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -42,7 +42,6 @@ import static org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_TABLE_VERSION; import static org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; -import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf; /** * This class is currently used only by s3 and gcs incr sources that supports size based batching @@ -89,15 +88,16 @@ public static Dataset applyOrdering(Dataset dataset, List orde public Pair> runIncrementalQuery(QueryInfo queryInfo) { log.info("Running incremental query"); - HoodieTableVersion tableVersion = HoodieTableMetaClient.builder().setConf(getStorageConf()).setBasePath(sourcePath).build().getTableConfig().getTableVersion(); + // S3/GCS event incremental sources operate with V1 checkpoint (commit#fileKey, requested-time based), + // so force INCREMENTAL_READ_TABLE_VERSION to 6. Use previousInstant so the start-exclusive incremental + // scan still includes the commit (startInstant), required to resume from checkpoint commit#fileKey. return Pair.of(queryInfo, sparkSession.read().format("hudi") .option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()) - .option(INCREMENTAL_READ_TABLE_VERSION().key(), tableVersion.versionCode()) - .option(DataSourceReadOptions.START_COMMIT().key(), queryInfo.getStartInstant()) + .option(INCREMENTAL_READ_TABLE_VERSION().key(), HoodieTableVersion.SIX.versionCode()) + .option(DataSourceReadOptions.START_COMMIT().key(), queryInfo.getPreviousInstant()) .option(DataSourceReadOptions.END_COMMIT().key(), queryInfo.getEndInstant()) .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), - props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), - tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) ? DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().defaultValue() : "false")) + props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), "false")) .load(sourcePath)); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java index ecf34920218d7..acaa7eefba551 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java @@ -73,14 +73,12 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.io.IOException; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.mockito.Mockito.when; @@ -163,7 +161,8 @@ protected String generateS3EventMetadata(Long objectSize, String bucketName, Str } protected HoodieRecord generateS3EventMetadata(String commitTime, String bucketName, String objectKey, Long objectSize) { - String partitionPath = bucketName; + // partition path must match the table config, or the incremental read lists no partitions + String partitionPath = metaClient.getTableConfig().isTablePartitioned() ? bucketName : ""; HoodieSchema schema = S3_METADATA_SCHEMA; GenericRecord rec = new GenericData.Record(schema.toAvroSchema()); HoodieSchemaField s3Field = schema.getField("s3").get(); @@ -222,15 +221,20 @@ protected HoodieWriteConfig getWriteConfig() { } protected Pair> writeS3MetadataRecords(String commitTime) throws IOException { + return writeS3MetadataRecords(commitTime, Collections.singletonList(Pair.of("data-file-1.json", 1L))); + } + + /** Writes one commit with one S3 event record per (objectKey, objectSize) entry. */ + protected Pair> writeS3MetadataRecords(String commitTime, + List> keysAndSizes) throws IOException { HoodieWriteConfig writeConfig = getWriteConfig(); try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) { - WriteClientTestUtils.startCommitWithTime(writeClient, commitTime); - List s3MetadataRecords = Arrays.asList( - generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json", 1L) - ); + List s3MetadataRecords = keysAndSizes.stream() + .map(p -> generateS3EventMetadata(commitTime, "bucket-1", p.getLeft(), p.getRight())) + .collect(Collectors.toList()); List statusList = writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime).collect(); - writeClient.commit(commitTime, jsc.parallelize(statusList), Option.empty(), COMMIT_ACTION, Collections.emptyMap(), Option.empty()); + writeClient.commit(commitTime, jsc.parallelize(statusList), Option.empty(), metaClient.getCommitActionType(), Collections.emptyMap(), Option.empty()); assertNoWriteErrors(statusList); return Pair.of(commitTime, s3MetadataRecords); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index 746760a86307d..6dd8b7035d806 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -27,7 +27,9 @@ import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.checkpoint.Checkpoint; import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; @@ -48,6 +50,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSourceHarness.TestSourceProfile; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; +import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata; import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import org.apache.hudi.utilities.sources.helpers.QueryInfo; @@ -60,11 +63,11 @@ import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -82,14 +85,18 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.stream.Collectors; -import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.verify; @@ -251,7 +258,7 @@ public void testTwoFilesAndContinueAcrossCommits(String extension) throws IOExce List numPartitions = Arrays.asList(12, 2, 1); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Integer.class); verify(cloudObjectsSelectorCommon, atLeastOnce()).loadAsDataset(any(), any(), any(), eq(schemaProvider), argumentCaptor.capture()); - Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues()); + assertEquals(numPartitions, argumentCaptor.getAllValues()); } @ParameterizedTest @@ -309,8 +316,78 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, } else { numPartitions = Arrays.asList(23, sourcePartitions); } - Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues()); - Assertions.assertEquals(numPartitions, argumentCaptorForMetrics.getAllValues()); + assertEquals(numPartitions, argumentCaptor.getAllValues()); + assertEquals(numPartitions, argumentCaptorForMetrics.getAllValues()); + } + + /** + * Resume from `commit#fileKey` must re-include the start commit; runs on v6 and v8, COW and MOR + * source meta-tables since cloud event sources always use V1/requested-time regardless of version. + */ + @ParameterizedTest + @CsvSource({"6,COPY_ON_WRITE", "8,COPY_ON_WRITE", "6,MERGE_ON_READ", "8,MERGE_ON_READ"}) + void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion, HoodieTableType tableType) throws IOException { + Properties tableProps = new Properties(); + tableProps.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(true)); + tableProps.put("hoodie.datasource.write.recordkey.field", "_row_key"); + tableProps.put("hoodie.datasource.write.partitionpath.field", ""); + tableProps.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key"); + tableProps.put(HoodieTableConfig.PARTITION_FIELDS.key(), ""); + tableProps.put(WRITE_TABLE_VERSION.key(), sourceTableVersion); + metaClient = getHoodieMetaClient(storageConf(), basePath(), tableProps, tableType); + + // timestamp-format instants: the incremental read normalizes START_COMMIT/END_COMMIT + // through HoodieSqlCommonUtils.formatIncrementalInstant, which rejects other formats + String startCommit = "20260601000001"; + String laterCommit = "20260601000002"; + writeGcsMetadataRecords(startCommit, Arrays.asList( + Pair.of("name/file-01.json", 100L), + Pair.of("name/file-02.json", 100L), + Pair.of("name/file-03.json", 100L), + Pair.of("name/file-04.json", 100L), + Pair.of("name/file-05.json", 100L))); + // the second commit re-writes an existing key (a re-uploaded object), landing in a log file on MOR + writeGcsMetadataRecords(laterCommit, Arrays.asList(Pair.of("name/file-05.json", 100L))); + if (tableType == HoodieTableType.MERGE_ON_READ) { + boolean hasLogFiles = Arrays.stream(fs().listStatus(new Path(basePath()))) + .anyMatch(f -> f.getPath().getName().contains(".log.")); + assertTrue(hasLogFiles, "Expected log files in the MOR source meta-table"); + } + + TypedProperties props = setProps(READ_UPTO_LATEST_COMMIT); + props.setProperty(CloudSourceConfig.ENABLE_EXISTS_CHECK.key(), "false"); + when(cloudObjectsSelectorCommon.loadAsDataset(any(), any(), any(), eq(schemaProvider), anyInt())) + .thenReturn(Option.empty()); + when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); + + // Real QueryRunner so the actual Spark incremental read against the on-disk meta-table runs. + GcsEventsHoodieIncrSource incrSource = new GcsEventsHoodieIncrSource( + props, jsc(), spark(), + new CloudDataFetcher(props, jsc(), spark(), metrics, cloudObjectsSelectorCommon), + new QueryRunner(spark(), props), + new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier))); + + // Resume mid-commit at file-02; sourceLimit=250B fits file-03+file-04, file-05 would exceed. + Checkpoint resumeFrom = new StreamerCheckpointV1(startCommit + "#name/file-02.json"); + Pair>, Checkpoint> result = incrSource.fetchNextBatch(Option.of(resumeFrom), 250L); + + assertEquals( + new StreamerCheckpointV1(startCommit + "#name/file-04.json"), + result.getRight(), + "Next batch must continue within the start commit, not advance to a bare instant."); + + // Filter must pass exactly file-03 and file-04 to downstream loading. + @SuppressWarnings("unchecked") + ArgumentCaptor> captor = ArgumentCaptor.forClass((Class) List.class); + verify(cloudObjectsSelectorCommon).loadAsDataset( + any(), captor.capture(), any(), eq(schemaProvider), anyInt()); + List selectedPaths = captor.getValue().stream() + .map(CloudObjectMetadata::getPath) + .sorted() + .collect(Collectors.toList()); + assertEquals(2, selectedPaths.size(), "Expected file-03 and file-04, got: " + selectedPaths); + assertTrue(selectedPaths.get(0).endsWith("/name/file-03.json"), selectedPaths.get(0)); + assertTrue(selectedPaths.get(1).endsWith("/name/file-04.json"), selectedPaths.get(1)); } @Test @@ -375,8 +452,8 @@ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingChe Option> datasetOpt = dataAndCheckpoint.getLeft(); Checkpoint nextCheckPoint = dataAndCheckpoint.getRight(); - Assertions.assertNotNull(nextCheckPoint); - Assertions.assertEquals(new StreamerCheckpointV1(expectedCheckpoint), nextCheckPoint); + assertNotNull(nextCheckPoint); + assertEquals(new StreamerCheckpointV1(expectedCheckpoint), nextCheckPoint); } private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, @@ -387,7 +464,12 @@ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingChe } private HoodieRecord getGcsMetadataRecord(String commitTime, String filename, String bucketName, String generation) { - String partitionPath = bucketName; + return getGcsMetadataRecord(commitTime, filename, bucketName, generation, 370L); + } + + private HoodieRecord getGcsMetadataRecord(String commitTime, String filename, String bucketName, String generation, long size) { + // partition path must match the table config, or the incremental read lists no partitions + String partitionPath = metaClient.getTableConfig().isTablePartitioned() ? bucketName : ""; String id = "id:" + bucketName + "/" + filename + "/" + generation; String mediaLink = String.format("https://storage.googleapis.com/download/storage/v1/b/%s/o/%s" @@ -412,7 +494,7 @@ private HoodieRecord getGcsMetadataRecord(String commitTime, String filename, St rec.put("metageneration", "1"); rec.put("name", filename); rec.put("selfLink", selfLink); - rec.put("size", "370"); + rec.put("size", Long.toString(size)); rec.put("storageClass", "STANDARD"); rec.put("timeCreated", "2022-08-29T05:52:55.869Z"); rec.put("timeStorageClassUpdated", "2022-08-29T05:52:55.869Z"); @@ -443,7 +525,23 @@ private Pair> writeGcsMetadataRecords(String commitTi getGcsMetadataRecord(commitTime, "data-file-4.json", "bucket-1", "1") ); List statusList = writeClient.upsert(jsc().parallelize(gcsMetadataRecords, 1), commitTime).collect(); - writeClient.commit(commitTime, jsc.parallelize(statusList), Option.empty(), COMMIT_ACTION, Collections.emptyMap(), Option.empty()); + writeClient.commit(commitTime, jsc.parallelize(statusList), Option.empty(), metaClient.getCommitActionType(), Collections.emptyMap(), Option.empty()); + assertNoWriteErrors(statusList); + return Pair.of(commitTime, gcsMetadataRecords); + } + } + + /** Writes one commit with one GCS event record per (objectKey, objectSize) entry. */ + private Pair> writeGcsMetadataRecords(String commitTime, + List> keysAndSizes) throws IOException { + HoodieWriteConfig writeConfig = getWriteConfig(); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) { + WriteClientTestUtils.startCommitWithTime(writeClient, commitTime); + List gcsMetadataRecords = keysAndSizes.stream() + .map(p -> getGcsMetadataRecord(commitTime, p.getLeft(), "bucket-1", "1", p.getRight())) + .collect(Collectors.toList()); + List statusList = writeClient.upsert(jsc().parallelize(gcsMetadataRecords, 1), commitTime).collect(); + writeClient.commit(commitTime, jsc.parallelize(statusList), Option.empty(), metaClient.getCommitActionType(), Collections.emptyMap(), Option.empty()); assertNoWriteErrors(statusList); return Pair.of(commitTime, gcsMetadataRecords); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index 4218aaca26d94..4a0bca4f9aaf1 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -20,6 +20,10 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.checkpoint.Checkpoint; +import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -27,12 +31,14 @@ import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.config.CloudSourceConfig; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; +import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata; +import org.apache.hudi.utilities.sources.helpers.QueryRunner; import org.apache.hudi.utilities.streamer.DefaultStreamContext; import org.apache.hudi.utilities.streamer.SourceProfile; +import org.apache.hadoop.fs.Path; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -40,18 +46,23 @@ import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Properties; +import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -91,7 +102,7 @@ public void testOneFileInCommit() throws IOException { Dataset inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs); - when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())).thenReturn(Option.empty()); + when(mockCloudObjectsSelectorCommon.loadAsDataset(any(), any(), any(), eq(schemaProvider), anyInt())).thenReturn(Option.empty()); when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json"); @@ -116,7 +127,7 @@ public void testTwoFilesAndContinueInSameCommit() throws IOException { Dataset inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs); - when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())).thenReturn(Option.empty()); + when(mockCloudObjectsSelectorCommon.loadAsDataset(any(), any(), any(), eq(schemaProvider), anyInt())).thenReturn(Option.empty()); when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 250L, "1#path/to/file2.json"); @@ -156,7 +167,7 @@ public void testTwoFilesAndContinueAcrossCommits(String extension) throws IOExce Dataset inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs); - when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())).thenReturn(Option.empty()); + when(mockCloudObjectsSelectorCommon.loadAsDataset(any(), any(), any(), eq(schemaProvider), anyInt())).thenReturn(Option.empty()); when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, @@ -221,7 +232,7 @@ public void testFilterAnEntireCommit(boolean useSourceProfile) throws IOExceptio setMockQueryRunner(inputDs); SourceProfile sourceProfile = new TestSourceProfile(50L, 0, 10L); - when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())).thenReturn(Option.empty()); + when(mockCloudObjectsSelectorCommon.loadAsDataset(any(), any(), any(), eq(schemaProvider), anyInt())).thenReturn(Option.empty()); if (useSourceProfile) { when(sourceProfileSupplier.getSourceProfile()).thenReturn(sourceProfile); } else { @@ -259,7 +270,7 @@ public void testFilterAnEntireMiddleCommit(boolean useSourceProfile) throws IOEx Dataset inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs); - when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())).thenReturn(Option.empty()); + when(mockCloudObjectsSelectorCommon.loadAsDataset(any(), any(), any(), eq(schemaProvider), anyInt())).thenReturn(Option.empty()); SourceProfile sourceProfile = new TestSourceProfile(50L, 0, 10L); if (useSourceProfile) { when(sourceProfileSupplier.getSourceProfile()).thenReturn(sourceProfile); @@ -305,7 +316,7 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, Dataset inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint)); - when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())).thenReturn(Option.empty()); + when(mockCloudObjectsSelectorCommon.loadAsDataset(any(), any(), any(), eq(schemaProvider), anyInt())).thenReturn(Option.empty()); TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix", "path/to/skip"); typedProperties.setProperty("hoodie.streamer.source.cloud.data.select.relative.path.regex", "path/to/file[0-9]+"); @@ -335,7 +346,7 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, // Verify the partitions being passed in getCloudObjectDataDF are correct. ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Integer.class); ArgumentCaptor argumentCaptorForMetrics = ArgumentCaptor.forClass(Integer.class); - verify(mockCloudObjectsSelectorCommon, atLeastOnce()).loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), argumentCaptor.capture()); + verify(mockCloudObjectsSelectorCommon, atLeastOnce()).loadAsDataset(any(), any(), any(), eq(schemaProvider), argumentCaptor.capture()); verify(metrics, atLeastOnce()).updateStreamerSourceParallelism(argumentCaptorForMetrics.capture()); List numPartitions; if (snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2")) { @@ -343,8 +354,79 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, } else { numPartitions = Arrays.asList(23, sourcePartitions); } - Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues()); - Assertions.assertEquals(numPartitions, argumentCaptorForMetrics.getAllValues()); + assertEquals(numPartitions, argumentCaptor.getAllValues()); + assertEquals(numPartitions, argumentCaptorForMetrics.getAllValues()); + } + + /** + * Resume from `commit#fileKey` must re-include the start commit; runs on v6 and v8, COW and MOR + * source meta-tables since cloud event sources always use V1/requested-time regardless of version. + */ + @ParameterizedTest + @CsvSource({"6,COPY_ON_WRITE", "8,COPY_ON_WRITE", "6,MERGE_ON_READ", "8,MERGE_ON_READ"}) + void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion, HoodieTableType tableType) throws IOException { + Properties tableProps = new Properties(); + tableProps.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(true)); + tableProps.put("hoodie.datasource.write.recordkey.field", "_row_key"); + tableProps.put("hoodie.datasource.write.partitionpath.field", ""); + tableProps.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key"); + tableProps.put(HoodieTableConfig.PARTITION_FIELDS.key(), ""); + tableProps.put(WRITE_TABLE_VERSION.key(), sourceTableVersion); + metaClient = getHoodieMetaClient(storageConf(), basePath(), tableProps, tableType); + + // timestamp-format instants: the incremental read normalizes START_COMMIT/END_COMMIT + // through HoodieSqlCommonUtils.formatIncrementalInstant, which rejects other formats + String startCommit = "20260601000001"; + String laterCommit = "20260601000002"; + writeS3MetadataRecords(startCommit, Arrays.asList( + Pair.of("path/to/file-01.json", 100L), + Pair.of("path/to/file-02.json", 100L), + Pair.of("path/to/file-03.json", 100L), + Pair.of("path/to/file-04.json", 100L), + Pair.of("path/to/file-05.json", 100L))); + // the second commit re-writes an existing key (an S3 re-upload), landing in a log file on MOR + writeS3MetadataRecords(laterCommit, Arrays.asList(Pair.of("path/to/file-05.json", 100L))); + if (tableType == HoodieTableType.MERGE_ON_READ) { + boolean hasLogFiles = Arrays.stream(fs().listStatus(new Path(basePath()))) + .anyMatch(f -> f.getPath().getName().contains(".log.")); + assertTrue(hasLogFiles, "Expected log files in the MOR source meta-table"); + } + + TypedProperties props = setProps(READ_UPTO_LATEST_COMMIT); + props.setProperty(CloudSourceConfig.ENABLE_EXISTS_CHECK.key(), "false"); + when(mockCloudObjectsSelectorCommon.loadAsDataset( + any(), any(), any(), eq(schemaProvider), anyInt())) + .thenReturn(Option.empty()); + when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); + + // Real QueryRunner so the actual Spark incremental read against the on-disk meta-table runs. + S3EventsHoodieIncrSource incrSource = new S3EventsHoodieIncrSource( + props, jsc(), spark(), + new QueryRunner(spark(), props), + new CloudDataFetcher(props, jsc(), spark(), metrics, mockCloudObjectsSelectorCommon), + new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier))); + + // Resume mid-commit at file-02; sourceLimit=250B fits file-03+file-04, file-05 would exceed. + Checkpoint resumeFrom = new StreamerCheckpointV1(startCommit + "#path/to/file-02.json"); + Pair>, Checkpoint> result = incrSource.fetchNextBatch(Option.of(resumeFrom), 250L); + + assertEquals( + new StreamerCheckpointV1(startCommit + "#path/to/file-04.json"), + result.getRight(), + "Next batch must continue within the start commit, not advance to a bare instant."); + + // Filter must pass exactly file-03 and file-04 to downstream loading. + @SuppressWarnings("unchecked") + ArgumentCaptor> captor = ArgumentCaptor.forClass((Class) List.class); + verify(mockCloudObjectsSelectorCommon).loadAsDataset( + any(), captor.capture(), any(), eq(schemaProvider), anyInt()); + List selectedPaths = captor.getValue().stream() + .map(CloudObjectMetadata::getPath) + .sorted() + .collect(java.util.stream.Collectors.toList()); + assertEquals(2, selectedPaths.size(), "Expected file-03 and file-04, got: " + selectedPaths); + assertTrue(selectedPaths.get(0).endsWith("/path/to/file-03.json"), selectedPaths.get(0)); + assertTrue(selectedPaths.get(1).endsWith("/path/to/file-04.json"), selectedPaths.get(1)); } @Test