From b95efa8b1eba2fcb45cfb74f4fc3e5acf14c0647 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Tue, 9 Jun 2026 17:44:08 -0700 Subject: [PATCH 01/18] fix(streamer): Include start commit in S3/GCS IncrSource incremental query S3/GCS cloud-object incremental sources can silently drop records whenever a previous batch persisted a commit#fileKey mid-commit-pagination checkpoint (i.e., the prior batch hit sourceLimit before exhausting the start commit's files). Files in the start commit after the checkpoint key become unreachable, and the persisted checkpoint advances past them as a bare instant. Root cause: QueryRunner.runIncrementalQuery passes queryInfo.getStartInstant() as the Spark START_COMMIT. The Spark incremental relation's findInstantsInRange is (start, end] (start-exclusive), so the start commit is dropped from the scan. The downstream (commit_time || object_key) > 'commit#fileKey' filter then matches nothing in the start commit, and filterAndGenerateCheckpointBasedOnSourceLimit falls through its empty-batch branch, emitting endInstant as bare with no #fileKey suffix. The next batch resumes past the gap. Fix: pass queryInfo.getPreviousInstant() so the resulting scan range (previousInstant, end] includes the start commit (startInstant) while preserving start-exclusive relation semantics. Required for cloud-object sources whose commit#fileKey pagination depends on re-reading the start commit to find files past the persisted key. Adds testRealQueryRunnerResumesMidCommitPagination to both TestS3EventsHoodieIncrSource and TestGcsEventsHoodieIncrSource. The new tests exercise a real QueryRunner against an on-disk Hudi events meta-table, resuming from a mid-commit commit#fileKey checkpoint with sourceLimit smaller than the remaining files. They assert both the next persisted checkpoint and the exact files passed downstream (via captor on loadAsDataset). The existing tests mocked QueryRunner.run() to return inputDs unfiltered for incremental queries and could not catch a START_COMMIT-handling regression. --- .../sources/helpers/QueryRunner.java | 4 +- .../S3EventsHoodieIncrSourceHarness.java | 17 ++- .../TestGcsEventsHoodieIncrSource.java | 105 +++++++++++++++++- .../sources/TestS3EventsHoodieIncrSource.java | 85 ++++++++++++++ 4 files changed, 205 insertions(+), 6 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java index 792c1e26b98b3..c823b12ba3a03 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java @@ -90,10 +90,12 @@ public Pair> runIncrementalQuery(QueryInfo queryInfo) { log.info("Running incremental query"); HoodieTableVersion tableVersion = HoodieTableMetaClient.builder().setConf(getStorageConf()).setBasePath(sourcePath).build().getTableConfig().getTableVersion(); + // Use previousInstant so the start-exclusive incremental scan still includes the commit (startInstant), + // required to resume from checkpoint commit#fileKey for cloud event incremental source. return Pair.of(queryInfo, sparkSession.read().format("hudi") .option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()) .option(INCREMENTAL_READ_TABLE_VERSION().key(), tableVersion.versionCode()) - .option(DataSourceReadOptions.START_COMMIT().key(), queryInfo.getStartInstant()) + .option(DataSourceReadOptions.START_COMMIT().key(), queryInfo.getPreviousInstant()) .option(DataSourceReadOptions.END_COMMIT().key(), queryInfo.getEndInstant()) .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java index ecf34920218d7..aa5070463244d 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java @@ -222,13 +222,22 @@ protected HoodieWriteConfig getWriteConfig() { } protected Pair> writeS3MetadataRecords(String commitTime) throws IOException { + return writeS3MetadataRecords(commitTime, Collections.singletonList(Pair.of("data-file-1.json", 1L))); + } + + /** + * Writes a single commit to the on-disk S3-events meta-table containing one record per + * (objectKey, objectSize) entry. Useful for tests that need a real source table whose + * single commit holds enough records to force mid-commit pagination under sourceLimit. + */ + protected Pair> writeS3MetadataRecords(String commitTime, + List> keysAndSizes) throws IOException { HoodieWriteConfig writeConfig = getWriteConfig(); try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) { - WriteClientTestUtils.startCommitWithTime(writeClient, commitTime); - List s3MetadataRecords = Arrays.asList( - generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json", 1L) - ); + List s3MetadataRecords = keysAndSizes.stream() + .map(p -> generateS3EventMetadata(commitTime, "bucket-1", p.getLeft(), p.getRight())) + .collect(Collectors.toList()); List statusList = writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime).collect(); writeClient.commit(commitTime, jsc.parallelize(statusList), Option.empty(), COMMIT_ACTION, Collections.emptyMap(), Option.empty()); assertNoWriteErrors(statusList); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index 746760a86307d..2f9bed42d955f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -48,6 +48,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.S3EventsHoodieIncrSourceHarness.TestSourceProfile; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; +import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata; import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import org.apache.hudi.utilities.sources.helpers.QueryInfo; @@ -313,6 +314,84 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, Assertions.assertEquals(numPartitions, argumentCaptorForMetrics.getAllValues()); } + /** + * Regression test: when the persisted checkpoint is `commit#fileKey` (mid-commit + * pagination state, e.g., the previous batch hit sourceLimit before exhausting the + * start commit's files), the next batch must re-include the start commit in its + * Spark scan so the remaining files can be discovered. + * + *

Uses a real {@link QueryRunner} against an on-disk Hudi events meta-table so + * the actual Spark V1 incremental read path is exercised. The mocked {@link QueryRunner} + * used by other tests in this file returns its input dataset unfiltered for the + * incremental branch and therefore cannot catch a START_COMMIT-handling regression. + * + *

Without passing previousInstant to START_COMMIT in + * {@code QueryRunner.runIncrementalQuery}, the V1 relation's + * {@code findInstantsInRange} ({@code (start, end]}) excludes the start commit, all + * rows past the checkpoint key are dropped, and the persisted checkpoint advances + * past them as a bare instant (silent data loss). + */ + @Test + void testRealQueryRunnerResumesMidCommitPagination() throws IOException { + String startCommit = "10"; + String laterCommit = "20"; + writeGcsMetadataRecords(startCommit, Arrays.asList( + Pair.of("name/file-01.json", 100L), + Pair.of("name/file-02.json", 100L), + Pair.of("name/file-03.json", 100L), + Pair.of("name/file-04.json", 100L), + Pair.of("name/file-05.json", 100L))); + writeGcsMetadataRecords(laterCommit); + + TypedProperties props = setProps(READ_UPTO_LATEST_COMMIT); + props.setProperty(CloudSourceConfig.ENABLE_EXISTS_CHECK.key(), "false"); + when(cloudObjectsSelectorCommon.loadAsDataset(any(), any(), any(), eq(schemaProvider), org.mockito.Mockito.anyInt())) + .thenReturn(Option.empty()); + when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); + + // Real QueryRunner (not the @Mock queryRunner) so the actual Spark incremental + // read against the on-disk meta-table runs. + GcsEventsHoodieIncrSource incrSource = new GcsEventsHoodieIncrSource( + props, jsc(), spark(), + new QueryRunner(spark(), props), + new CloudDataFetcher(props, jsc(), spark(), metrics, cloudObjectsSelectorCommon), + new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier))); + + // Resume from a mid-commit checkpoint: prior batch stopped at file-02 in commit 10. + // sourceLimit=250 means the next batch should consume file-03 plus file-04 (200B + // cumulative) and stop before file-05 (would exceed the limit). + Checkpoint resumeFrom = new StreamerCheckpointV1(startCommit + "#name/file-02.json"); + Pair>, Checkpoint> result = incrSource.fetchNextBatch(Option.of(resumeFrom), 250L); + + Assertions.assertEquals( + new StreamerCheckpointV1(startCommit + "#name/file-04.json"), + result.getRight(), + "After mid-commit pagination, next batch must continue within the start commit " + + "(file-03 + file-04 = 200B under the 250B sourceLimit), not skip past it to " + + "the next source commit as a bare instant."); + + // Verify the (commit_time||object_key) > 'commit#fileKey' filter selected exactly + // file-03 and file-04: file-01 and file-02 are at or below the resume key, file-05 + // exceeds the sourceLimit budget, and laterCommit's record must not be reached. + // Captures the metadata passed to downstream file loading. If the bug recurs (Spark + // scan excludes the start commit), this would capture only laterCommit's record or + // nothing at all. + @SuppressWarnings("unchecked") + ArgumentCaptor> captor = ArgumentCaptor.forClass((Class) List.class); + verify(cloudObjectsSelectorCommon).loadAsDataset( + any(), captor.capture(), any(), eq(schemaProvider), org.mockito.Mockito.anyInt()); + List selectedPaths = captor.getValue().stream() + .map(CloudObjectMetadata::getPath) + .sorted() + .collect(Collectors.toList()); + Assertions.assertEquals(2, selectedPaths.size(), + "Filter must select exactly 2 files (file-03 and file-04). Got: " + selectedPaths); + Assertions.assertTrue(selectedPaths.get(0).endsWith("/name/file-03.json"), + "First selected path should be file-03.json (after-key filter + ordering). Got: " + selectedPaths.get(0)); + Assertions.assertTrue(selectedPaths.get(1).endsWith("/name/file-04.json"), + "Second selected path should be file-04.json (sourceLimit cut). Got: " + selectedPaths.get(1)); + } + @Test public void testUnsupportedCheckpoint() { TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); @@ -387,6 +466,10 @@ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingChe } private HoodieRecord getGcsMetadataRecord(String commitTime, String filename, String bucketName, String generation) { + return getGcsMetadataRecord(commitTime, filename, bucketName, generation, 370L); + } + + private HoodieRecord getGcsMetadataRecord(String commitTime, String filename, String bucketName, String generation, long size) { String partitionPath = bucketName; String id = "id:" + bucketName + "/" + filename + "/" + generation; @@ -412,7 +495,7 @@ private HoodieRecord getGcsMetadataRecord(String commitTime, String filename, St rec.put("metageneration", "1"); rec.put("name", filename); rec.put("selfLink", selfLink); - rec.put("size", "370"); + rec.put("size", Long.toString(size)); rec.put("storageClass", "STANDARD"); rec.put("timeCreated", "2022-08-29T05:52:55.869Z"); rec.put("timeStorageClassUpdated", "2022-08-29T05:52:55.869Z"); @@ -449,6 +532,26 @@ private Pair> writeGcsMetadataRecords(String commitTi } } + /** + * Writes a single commit containing one record per (objectKey, objectSize) entry. Used by + * tests that need a real on-disk source meta-table with multiple records in one commit to + * exercise mid-commit pagination under sourceLimit. + */ + private Pair> writeGcsMetadataRecords(String commitTime, + List> keysAndSizes) throws IOException { + HoodieWriteConfig writeConfig = getWriteConfig(); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) { + WriteClientTestUtils.startCommitWithTime(writeClient, commitTime); + List gcsMetadataRecords = keysAndSizes.stream() + .map(p -> getGcsMetadataRecord(commitTime, p.getLeft(), "bucket-1", "1", p.getRight())) + .collect(Collectors.toList()); + List statusList = writeClient.upsert(jsc().parallelize(gcsMetadataRecords, 1), commitTime).collect(); + writeClient.commit(commitTime, jsc.parallelize(statusList), Option.empty(), COMMIT_ACTION, Collections.emptyMap(), Option.empty()); + assertNoWriteErrors(statusList); + return Pair.of(commitTime, gcsMetadataRecords); + } + } + private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy) { TypedProperties properties = new TypedProperties(); //String schemaFilePath = TestGcsEventsHoodieIncrSource.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index 4218aaca26d94..6821117e22450 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -20,6 +20,8 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.checkpoint.Checkpoint; +import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -27,6 +29,8 @@ import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.config.CloudSourceConfig; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; +import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata; +import org.apache.hudi.utilities.sources.helpers.QueryRunner; import org.apache.hudi.utilities.streamer.DefaultStreamContext; import org.apache.hudi.utilities.streamer.SourceProfile; @@ -347,6 +351,87 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, Assertions.assertEquals(numPartitions, argumentCaptorForMetrics.getAllValues()); } + /** + * Regression test: when the persisted checkpoint is `commit#fileKey` (mid-commit + * pagination state, e.g., the previous batch hit sourceLimit before exhausting the + * start commit's files), the next batch must re-include the start commit in its + * Spark scan so the remaining files can be discovered. + * + *

Uses a real {@link QueryRunner} against an on-disk Hudi events meta-table so + * the actual Spark V1 incremental read path is exercised. The mocked {@link QueryRunner} + * used by other tests in this file returns its input dataset unfiltered for the + * incremental branch and therefore cannot catch a START_COMMIT-handling regression. + * + *

Without passing previousInstant to START_COMMIT in + * {@code QueryRunner.runIncrementalQuery}, the V1 relation's + * {@code findInstantsInRange} ({@code (start, end]}) excludes the start commit, all + * rows past the checkpoint key are dropped, and the persisted checkpoint advances + * past them as a bare instant (silent data loss). + */ + @Test + void testRealQueryRunnerResumesMidCommitPagination() throws IOException { + // One source commit with 5 file events (100B each), followed by a later commit to + // ensure the source timeline endInstant moves past the start commit. + String startCommit = "10"; + String laterCommit = "20"; + writeS3MetadataRecords(startCommit, Arrays.asList( + Pair.of("path/to/file-01.json", 100L), + Pair.of("path/to/file-02.json", 100L), + Pair.of("path/to/file-03.json", 100L), + Pair.of("path/to/file-04.json", 100L), + Pair.of("path/to/file-05.json", 100L))); + writeS3MetadataRecords(laterCommit); + + TypedProperties props = setProps(READ_UPTO_LATEST_COMMIT); + props.setProperty(CloudSourceConfig.ENABLE_EXISTS_CHECK.key(), "false"); + Mockito.when(mockCloudObjectsSelectorCommon.loadAsDataset( + Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())) + .thenReturn(Option.empty()); + Mockito.when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); + + // Real QueryRunner (not the harness's mock) so the actual Spark incremental read + // against the on-disk meta-table runs. + S3EventsHoodieIncrSource incrSource = new S3EventsHoodieIncrSource( + props, jsc(), spark(), + new QueryRunner(spark(), props), + new CloudDataFetcher(props, jsc(), spark(), metrics, mockCloudObjectsSelectorCommon), + new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier))); + + // Resume from a mid-commit checkpoint: prior batch stopped at file-02 within + // commit 10. sourceLimit=250B means the next batch should consume file-03 plus + // file-04 (200B cumulative) and stop before file-05 (would exceed the limit). + Checkpoint resumeFrom = new StreamerCheckpointV1(startCommit + "#path/to/file-02.json"); + Pair>, Checkpoint> result = incrSource.fetchNextBatch(Option.of(resumeFrom), 250L); + + Assertions.assertEquals( + new StreamerCheckpointV1(startCommit + "#path/to/file-04.json"), + result.getRight(), + "After mid-commit pagination, next batch must continue within the start commit " + + "(file-03 + file-04 = 200B under the 250B sourceLimit), not skip past it to " + + "the next source commit as a bare instant."); + + // Verify the (commit_time||object_key) > 'commit#fileKey' filter selected exactly + // file-03 and file-04: file-01 and file-02 are at or below the resume key, file-05 + // exceeds the sourceLimit budget, and laterCommit's record must not be reached. + // Captures the metadata passed to downstream file loading. If the bug recurs (Spark + // scan excludes the start commit), this would capture only laterCommit's record or + // nothing at all. + @SuppressWarnings("unchecked") + ArgumentCaptor> captor = ArgumentCaptor.forClass((Class) List.class); + verify(mockCloudObjectsSelectorCommon).loadAsDataset( + Mockito.any(), captor.capture(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt()); + List selectedPaths = captor.getValue().stream() + .map(CloudObjectMetadata::getPath) + .sorted() + .collect(java.util.stream.Collectors.toList()); + Assertions.assertEquals(2, selectedPaths.size(), + "Filter must select exactly 2 files (file-03 and file-04). Got: " + selectedPaths); + Assertions.assertTrue(selectedPaths.get(0).endsWith("/path/to/file-03.json"), + "First selected path should be file-03.json (after-key filter + ordering). Got: " + selectedPaths.get(0)); + Assertions.assertTrue(selectedPaths.get(1).endsWith("/path/to/file-04.json"), + "Second selected path should be file-04.json (sourceLimit cut). Got: " + selectedPaths.get(1)); + } + @Test public void testUnsupportedCheckpoint() { TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); From ae54e8fa5009b8f8daa3c8be077cb6feda9c335c Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Tue, 9 Jun 2026 18:04:14 -0700 Subject: [PATCH 02/18] Remove unused Arrays import in S3EventsHoodieIncrSourceHarness --- .../hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java index aa5070463244d..303723aafec8a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java @@ -73,7 +73,6 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.io.IOException; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; From 40dc1bf5f133d7ea497d9b213268cf3fb55d313b Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Tue, 9 Jun 2026 18:09:39 -0700 Subject: [PATCH 03/18] Simplify javadocs for cloud incr source mid-commit pagination tests --- .../S3EventsHoodieIncrSourceHarness.java | 6 +---- .../TestGcsEventsHoodieIncrSource.java | 25 ++++--------------- .../sources/TestS3EventsHoodieIncrSource.java | 19 +++----------- 3 files changed, 10 insertions(+), 40 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java index 303723aafec8a..ad49e3227b933 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java @@ -224,11 +224,7 @@ protected Pair> writeS3MetadataRecords(String commitT return writeS3MetadataRecords(commitTime, Collections.singletonList(Pair.of("data-file-1.json", 1L))); } - /** - * Writes a single commit to the on-disk S3-events meta-table containing one record per - * (objectKey, objectSize) entry. Useful for tests that need a real source table whose - * single commit holds enough records to force mid-commit pagination under sourceLimit. - */ + /** Writes one commit with one S3 event record per (objectKey, objectSize) entry. */ protected Pair> writeS3MetadataRecords(String commitTime, List> keysAndSizes) throws IOException { HoodieWriteConfig writeConfig = getWriteConfig(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index 2f9bed42d955f..282392a0f251e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -315,21 +315,10 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, } /** - * Regression test: when the persisted checkpoint is `commit#fileKey` (mid-commit - * pagination state, e.g., the previous batch hit sourceLimit before exhausting the - * start commit's files), the next batch must re-include the start commit in its - * Spark scan so the remaining files can be discovered. - * - *

Uses a real {@link QueryRunner} against an on-disk Hudi events meta-table so - * the actual Spark V1 incremental read path is exercised. The mocked {@link QueryRunner} - * used by other tests in this file returns its input dataset unfiltered for the - * incremental branch and therefore cannot catch a START_COMMIT-handling regression. - * - *

Without passing previousInstant to START_COMMIT in - * {@code QueryRunner.runIncrementalQuery}, the V1 relation's - * {@code findInstantsInRange} ({@code (start, end]}) excludes the start commit, all - * rows past the checkpoint key are dropped, and the persisted checkpoint advances - * past them as a bare instant (silent data loss). + * Resuming from a {@code commit#fileKey} checkpoint must re-include the start commit so + * files past the key are discovered. Uses a real {@link QueryRunner} against an on-disk + * meta-table; the mocked QueryRunner in other tests returns inputDs unfiltered and would + * hide a START_COMMIT-handling regression. */ @Test void testRealQueryRunnerResumesMidCommitPagination() throws IOException { @@ -532,11 +521,7 @@ private Pair> writeGcsMetadataRecords(String commitTi } } - /** - * Writes a single commit containing one record per (objectKey, objectSize) entry. Used by - * tests that need a real on-disk source meta-table with multiple records in one commit to - * exercise mid-commit pagination under sourceLimit. - */ + /** Writes one commit with one GCS event record per (objectKey, objectSize) entry. */ private Pair> writeGcsMetadataRecords(String commitTime, List> keysAndSizes) throws IOException { HoodieWriteConfig writeConfig = getWriteConfig(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index 6821117e22450..78094e7a99e08 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -352,21 +352,10 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, } /** - * Regression test: when the persisted checkpoint is `commit#fileKey` (mid-commit - * pagination state, e.g., the previous batch hit sourceLimit before exhausting the - * start commit's files), the next batch must re-include the start commit in its - * Spark scan so the remaining files can be discovered. - * - *

Uses a real {@link QueryRunner} against an on-disk Hudi events meta-table so - * the actual Spark V1 incremental read path is exercised. The mocked {@link QueryRunner} - * used by other tests in this file returns its input dataset unfiltered for the - * incremental branch and therefore cannot catch a START_COMMIT-handling regression. - * - *

Without passing previousInstant to START_COMMIT in - * {@code QueryRunner.runIncrementalQuery}, the V1 relation's - * {@code findInstantsInRange} ({@code (start, end]}) excludes the start commit, all - * rows past the checkpoint key are dropped, and the persisted checkpoint advances - * past them as a bare instant (silent data loss). + * Resuming from a {@code commit#fileKey} checkpoint must re-include the start commit so + * files past the key are discovered. Uses a real {@link QueryRunner} against an on-disk + * meta-table; the mocked QueryRunner in other tests returns inputDs unfiltered and would + * hide a START_COMMIT-handling regression. */ @Test void testRealQueryRunnerResumesMidCommitPagination() throws IOException { From 6bd672574ba808075ad58cabd61362109c045af4 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Tue, 9 Jun 2026 18:21:37 -0700 Subject: [PATCH 04/18] Fix GcsEventsHoodieIncrSource constructor arg order in test --- .../hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index 282392a0f251e..8e89a87f837ce 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -342,8 +342,8 @@ void testRealQueryRunnerResumesMidCommitPagination() throws IOException { // read against the on-disk meta-table runs. GcsEventsHoodieIncrSource incrSource = new GcsEventsHoodieIncrSource( props, jsc(), spark(), - new QueryRunner(spark(), props), new CloudDataFetcher(props, jsc(), spark(), metrics, cloudObjectsSelectorCommon), + new QueryRunner(spark(), props), new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier))); // Resume from a mid-commit checkpoint: prior batch stopped at file-02 in commit 10. From b343c5a3b2bd25a42f6471d9d926c9045c4fe3b6 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Tue, 9 Jun 2026 20:02:04 -0700 Subject: [PATCH 05/18] Use single-digit commit times in mid-commit pagination tests Long.parseLong(startCommit) - 1 was producing a previousInstant string of shorter length than the real timeline instants, so findInstantsInRange's lexicographic compare excluded the start commit and the empty-batch path silently advanced the checkpoint past it. --- .../hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java | 4 ++-- .../hudi/utilities/sources/TestS3EventsHoodieIncrSource.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index 8e89a87f837ce..babbbb658b34f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -322,8 +322,8 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, */ @Test void testRealQueryRunnerResumesMidCommitPagination() throws IOException { - String startCommit = "10"; - String laterCommit = "20"; + String startCommit = "1"; + String laterCommit = "2"; writeGcsMetadataRecords(startCommit, Arrays.asList( Pair.of("name/file-01.json", 100L), Pair.of("name/file-02.json", 100L), diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index 78094e7a99e08..6793d2df468fc 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -361,8 +361,8 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, void testRealQueryRunnerResumesMidCommitPagination() throws IOException { // One source commit with 5 file events (100B each), followed by a later commit to // ensure the source timeline endInstant moves past the start commit. - String startCommit = "10"; - String laterCommit = "20"; + String startCommit = "1"; + String laterCommit = "2"; writeS3MetadataRecords(startCommit, Arrays.asList( Pair.of("path/to/file-01.json", 100L), Pair.of("path/to/file-02.json", 100L), From 92d67e54e7ef47687acc54c0b5f78cba978cd522 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Tue, 9 Jun 2026 22:35:18 -0700 Subject: [PATCH 06/18] Force source meta-table to v6 in mid-commit pagination tests The V1 incremental relation (where the QueryRunner fix actually takes effect) is only chosen when the source table version is < 8. The test harness defaults to v8, which routed the test through the V2 relation and broke the assertion. --- .../utilities/sources/TestGcsEventsHoodieIncrSource.java | 5 +++++ .../hudi/utilities/sources/TestS3EventsHoodieIncrSource.java | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index babbbb658b34f..9861c0e593740 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -322,6 +322,11 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, */ @Test void testRealQueryRunnerResumesMidCommitPagination() throws IOException { + // Force the source meta-table to be at table version 6 so the V1 incremental relation + // (which interprets START_COMMIT as requested time and applies the start-exclusive + // findInstantsInRange filter) is the read path exercised. + metaClient = getHoodieMetaClientWithTableVersion(storageConf(), basePath(), "6"); + String startCommit = "1"; String laterCommit = "2"; writeGcsMetadataRecords(startCommit, Arrays.asList( diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index 6793d2df468fc..5098621c17786 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -359,6 +359,11 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, */ @Test void testRealQueryRunnerResumesMidCommitPagination() throws IOException { + // Force the source meta-table to be at table version 6 so the V1 incremental relation + // (which interprets START_COMMIT as requested time and applies the start-exclusive + // findInstantsInRange filter) is the read path exercised. + metaClient = getHoodieMetaClientWithTableVersion(storageConf(), basePath(), "6"); + // One source commit with 5 file events (100B each), followed by a later commit to // ensure the source timeline endInstant moves past the start commit. String startCommit = "1"; From 4d8c8af29fbe4c27ebb308644ee7c83af2ea7882 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Tue, 9 Jun 2026 22:49:30 -0700 Subject: [PATCH 07/18] Force INCREMENTAL_READ_TABLE_VERSION=6 in cloud event incr source QueryRunner Cloud event incremental sources (S3/GCS) always use V1 checkpoint (commit#fileKey, requested-time). They should always route through the V1 incremental relation regardless of the source meta-table's actual version. Parameterizes the regression test on source version {6, 8} to cover both. --- .../utilities/sources/helpers/QueryRunner.java | 12 ++++++------ .../sources/TestGcsEventsHoodieIncrSource.java | 16 ++++++++++------ .../sources/TestS3EventsHoodieIncrSource.java | 16 ++++++++++------ 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java index c823b12ba3a03..d8cb663acdb4f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java @@ -89,17 +89,17 @@ public static Dataset applyOrdering(Dataset dataset, List orde public Pair> runIncrementalQuery(QueryInfo queryInfo) { log.info("Running incremental query"); - HoodieTableVersion tableVersion = HoodieTableMetaClient.builder().setConf(getStorageConf()).setBasePath(sourcePath).build().getTableConfig().getTableVersion(); - // Use previousInstant so the start-exclusive incremental scan still includes the commit (startInstant), - // required to resume from checkpoint commit#fileKey for cloud event incremental source. + // S3/GCS event incremental sources operate with V1 checkpoint (commit#fileKey, requested-time based). + // Force INCREMENTAL_READ_TABLE_VERSION to 6 so the V1 incremental relation is always chosen, regardless + // of the source meta-table's actual version. Use previousInstant so the start-exclusive incremental scan + // still includes the commit (startInstant), required to resume from checkpoint commit#fileKey. return Pair.of(queryInfo, sparkSession.read().format("hudi") .option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()) - .option(INCREMENTAL_READ_TABLE_VERSION().key(), tableVersion.versionCode()) + .option(INCREMENTAL_READ_TABLE_VERSION().key(), HoodieTableVersion.SIX.versionCode()) .option(DataSourceReadOptions.START_COMMIT().key(), queryInfo.getPreviousInstant()) .option(DataSourceReadOptions.END_COMMIT().key(), queryInfo.getEndInstant()) .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), - props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), - tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) ? DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().defaultValue() : "false")) + props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), "false")) .load(sourcePath)); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index 9861c0e593740..24f2856ebcbba 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -320,12 +320,16 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, * meta-table; the mocked QueryRunner in other tests returns inputDs unfiltered and would * hide a START_COMMIT-handling regression. */ - @Test - void testRealQueryRunnerResumesMidCommitPagination() throws IOException { - // Force the source meta-table to be at table version 6 so the V1 incremental relation - // (which interprets START_COMMIT as requested time and applies the start-exclusive - // findInstantsInRange filter) is the read path exercised. - metaClient = getHoodieMetaClientWithTableVersion(storageConf(), basePath(), "6"); + /** + * Verified across both v6 and v8 source meta-tables: cloud event incremental sources + * stick to V1 checkpoint (commit#fileKey, requested-time) regardless of source version, + * so QueryRunner forces INCREMENTAL_READ_TABLE_VERSION=6 to always route through the V1 + * relation. + */ + @ParameterizedTest + @ValueSource(strings = {"6", "8"}) + void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion) throws IOException { + metaClient = getHoodieMetaClientWithTableVersion(storageConf(), basePath(), sourceTableVersion); String startCommit = "1"; String laterCommit = "2"; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index 5098621c17786..d3b8581de43ce 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -357,12 +357,16 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, * meta-table; the mocked QueryRunner in other tests returns inputDs unfiltered and would * hide a START_COMMIT-handling regression. */ - @Test - void testRealQueryRunnerResumesMidCommitPagination() throws IOException { - // Force the source meta-table to be at table version 6 so the V1 incremental relation - // (which interprets START_COMMIT as requested time and applies the start-exclusive - // findInstantsInRange filter) is the read path exercised. - metaClient = getHoodieMetaClientWithTableVersion(storageConf(), basePath(), "6"); + /** + * Verified across both v6 and v8 source meta-tables: cloud event incremental sources + * stick to V1 checkpoint (commit#fileKey, requested-time) regardless of source version, + * so QueryRunner forces INCREMENTAL_READ_TABLE_VERSION=6 to always route through the V1 + * relation. + */ + @ParameterizedTest + @ValueSource(strings = {"6", "8"}) + void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion) throws IOException { + metaClient = getHoodieMetaClientWithTableVersion(storageConf(), basePath(), sourceTableVersion); // One source commit with 5 file events (100B each), followed by a later commit to // ensure the source timeline endInstant moves past the start commit. From 1367365a8b986b69575275c1266fb108c6d22ad7 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Tue, 9 Jun 2026 23:16:19 -0700 Subject: [PATCH 08/18] Add partition_path field to S3 test schema; simplify test comments The S3 metadata test schema lacked a top-level partition_path field, so the V1 incremental relation's partition-schema lookup failed when the test ran a real read against the on-disk meta-table. Mirroring the GCS test schema. --- .../S3EventsHoodieIncrSourceHarness.java | 1 + .../TestGcsEventsHoodieIncrSource.java | 41 +++++------------- .../sources/TestS3EventsHoodieIncrSource.java | 43 +++++-------------- .../streamer-config/s3-metadata.avsc | 5 +++ 4 files changed, 26 insertions(+), 64 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java index ad49e3227b933..bc3e522ccd0a3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java @@ -185,6 +185,7 @@ protected HoodieRecord generateS3EventMetadata(String commitTime, String bucketN s3Record.put("bucket", s3BucketRec); s3Record.put("object", s3ObjectRec); rec.put("s3", s3Record); + rec.put("partition_path", bucketName); rec.put("_hoodie_commit_time", commitTime); HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(rec)); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index 24f2856ebcbba..e23670e7f0889 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -315,16 +315,8 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, } /** - * Resuming from a {@code commit#fileKey} checkpoint must re-include the start commit so - * files past the key are discovered. Uses a real {@link QueryRunner} against an on-disk - * meta-table; the mocked QueryRunner in other tests returns inputDs unfiltered and would - * hide a START_COMMIT-handling regression. - */ - /** - * Verified across both v6 and v8 source meta-tables: cloud event incremental sources - * stick to V1 checkpoint (commit#fileKey, requested-time) regardless of source version, - * so QueryRunner forces INCREMENTAL_READ_TABLE_VERSION=6 to always route through the V1 - * relation. + * Resume from `commit#fileKey` must re-include the start commit; runs on v6 and v8 source + * meta-tables since cloud event sources always use V1/requested-time regardless of version. */ @ParameterizedTest @ValueSource(strings = {"6", "8"}) @@ -347,33 +339,23 @@ void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion) th .thenReturn(Option.empty()); when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); - // Real QueryRunner (not the @Mock queryRunner) so the actual Spark incremental - // read against the on-disk meta-table runs. + // Real QueryRunner so the actual Spark incremental read against the on-disk meta-table runs. GcsEventsHoodieIncrSource incrSource = new GcsEventsHoodieIncrSource( props, jsc(), spark(), new CloudDataFetcher(props, jsc(), spark(), metrics, cloudObjectsSelectorCommon), new QueryRunner(spark(), props), new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier))); - // Resume from a mid-commit checkpoint: prior batch stopped at file-02 in commit 10. - // sourceLimit=250 means the next batch should consume file-03 plus file-04 (200B - // cumulative) and stop before file-05 (would exceed the limit). + // Resume mid-commit at file-02; sourceLimit=250B fits file-03+file-04, file-05 would exceed. Checkpoint resumeFrom = new StreamerCheckpointV1(startCommit + "#name/file-02.json"); Pair>, Checkpoint> result = incrSource.fetchNextBatch(Option.of(resumeFrom), 250L); Assertions.assertEquals( new StreamerCheckpointV1(startCommit + "#name/file-04.json"), result.getRight(), - "After mid-commit pagination, next batch must continue within the start commit " - + "(file-03 + file-04 = 200B under the 250B sourceLimit), not skip past it to " - + "the next source commit as a bare instant."); - - // Verify the (commit_time||object_key) > 'commit#fileKey' filter selected exactly - // file-03 and file-04: file-01 and file-02 are at or below the resume key, file-05 - // exceeds the sourceLimit budget, and laterCommit's record must not be reached. - // Captures the metadata passed to downstream file loading. If the bug recurs (Spark - // scan excludes the start commit), this would capture only laterCommit's record or - // nothing at all. + "Next batch must continue within the start commit, not advance to a bare instant."); + + // Filter must pass exactly file-03 and file-04 to downstream loading. @SuppressWarnings("unchecked") ArgumentCaptor> captor = ArgumentCaptor.forClass((Class) List.class); verify(cloudObjectsSelectorCommon).loadAsDataset( @@ -382,12 +364,9 @@ props, jsc(), spark(), .map(CloudObjectMetadata::getPath) .sorted() .collect(Collectors.toList()); - Assertions.assertEquals(2, selectedPaths.size(), - "Filter must select exactly 2 files (file-03 and file-04). Got: " + selectedPaths); - Assertions.assertTrue(selectedPaths.get(0).endsWith("/name/file-03.json"), - "First selected path should be file-03.json (after-key filter + ordering). Got: " + selectedPaths.get(0)); - Assertions.assertTrue(selectedPaths.get(1).endsWith("/name/file-04.json"), - "Second selected path should be file-04.json (sourceLimit cut). Got: " + selectedPaths.get(1)); + Assertions.assertEquals(2, selectedPaths.size(), "Expected file-03 and file-04, got: " + selectedPaths); + Assertions.assertTrue(selectedPaths.get(0).endsWith("/name/file-03.json"), selectedPaths.get(0)); + Assertions.assertTrue(selectedPaths.get(1).endsWith("/name/file-04.json"), selectedPaths.get(1)); } @Test diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index d3b8581de43ce..3f406631fef18 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -352,24 +352,14 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, } /** - * Resuming from a {@code commit#fileKey} checkpoint must re-include the start commit so - * files past the key are discovered. Uses a real {@link QueryRunner} against an on-disk - * meta-table; the mocked QueryRunner in other tests returns inputDs unfiltered and would - * hide a START_COMMIT-handling regression. - */ - /** - * Verified across both v6 and v8 source meta-tables: cloud event incremental sources - * stick to V1 checkpoint (commit#fileKey, requested-time) regardless of source version, - * so QueryRunner forces INCREMENTAL_READ_TABLE_VERSION=6 to always route through the V1 - * relation. + * Resume from `commit#fileKey` must re-include the start commit; runs on v6 and v8 source + * meta-tables since cloud event sources always use V1/requested-time regardless of version. */ @ParameterizedTest @ValueSource(strings = {"6", "8"}) void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion) throws IOException { metaClient = getHoodieMetaClientWithTableVersion(storageConf(), basePath(), sourceTableVersion); - // One source commit with 5 file events (100B each), followed by a later commit to - // ensure the source timeline endInstant moves past the start commit. String startCommit = "1"; String laterCommit = "2"; writeS3MetadataRecords(startCommit, Arrays.asList( @@ -387,33 +377,23 @@ void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion) th .thenReturn(Option.empty()); Mockito.when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); - // Real QueryRunner (not the harness's mock) so the actual Spark incremental read - // against the on-disk meta-table runs. + // Real QueryRunner so the actual Spark incremental read against the on-disk meta-table runs. S3EventsHoodieIncrSource incrSource = new S3EventsHoodieIncrSource( props, jsc(), spark(), new QueryRunner(spark(), props), new CloudDataFetcher(props, jsc(), spark(), metrics, mockCloudObjectsSelectorCommon), new DefaultStreamContext(schemaProvider.orElse(null), Option.of(sourceProfileSupplier))); - // Resume from a mid-commit checkpoint: prior batch stopped at file-02 within - // commit 10. sourceLimit=250B means the next batch should consume file-03 plus - // file-04 (200B cumulative) and stop before file-05 (would exceed the limit). + // Resume mid-commit at file-02; sourceLimit=250B fits file-03+file-04, file-05 would exceed. Checkpoint resumeFrom = new StreamerCheckpointV1(startCommit + "#path/to/file-02.json"); Pair>, Checkpoint> result = incrSource.fetchNextBatch(Option.of(resumeFrom), 250L); Assertions.assertEquals( new StreamerCheckpointV1(startCommit + "#path/to/file-04.json"), result.getRight(), - "After mid-commit pagination, next batch must continue within the start commit " - + "(file-03 + file-04 = 200B under the 250B sourceLimit), not skip past it to " - + "the next source commit as a bare instant."); - - // Verify the (commit_time||object_key) > 'commit#fileKey' filter selected exactly - // file-03 and file-04: file-01 and file-02 are at or below the resume key, file-05 - // exceeds the sourceLimit budget, and laterCommit's record must not be reached. - // Captures the metadata passed to downstream file loading. If the bug recurs (Spark - // scan excludes the start commit), this would capture only laterCommit's record or - // nothing at all. + "Next batch must continue within the start commit, not advance to a bare instant."); + + // Filter must pass exactly file-03 and file-04 to downstream loading. @SuppressWarnings("unchecked") ArgumentCaptor> captor = ArgumentCaptor.forClass((Class) List.class); verify(mockCloudObjectsSelectorCommon).loadAsDataset( @@ -422,12 +402,9 @@ props, jsc(), spark(), .map(CloudObjectMetadata::getPath) .sorted() .collect(java.util.stream.Collectors.toList()); - Assertions.assertEquals(2, selectedPaths.size(), - "Filter must select exactly 2 files (file-03 and file-04). Got: " + selectedPaths); - Assertions.assertTrue(selectedPaths.get(0).endsWith("/path/to/file-03.json"), - "First selected path should be file-03.json (after-key filter + ordering). Got: " + selectedPaths.get(0)); - Assertions.assertTrue(selectedPaths.get(1).endsWith("/path/to/file-04.json"), - "Second selected path should be file-04.json (sourceLimit cut). Got: " + selectedPaths.get(1)); + Assertions.assertEquals(2, selectedPaths.size(), "Expected file-03 and file-04, got: " + selectedPaths); + Assertions.assertTrue(selectedPaths.get(0).endsWith("/path/to/file-03.json"), selectedPaths.get(0)); + Assertions.assertTrue(selectedPaths.get(1).endsWith("/path/to/file-04.json"), selectedPaths.get(1)); } @Test diff --git a/hudi-utilities/src/test/resources/streamer-config/s3-metadata.avsc b/hudi-utilities/src/test/resources/streamer-config/s3-metadata.avsc index 64b169c1373ec..cce14a472453e 100644 --- a/hudi-utilities/src/test/resources/streamer-config/s3-metadata.avsc +++ b/hudi-utilities/src/test/resources/streamer-config/s3-metadata.avsc @@ -20,6 +20,11 @@ "name": "hoodie_source", "namespace": "hoodie.source", "fields": [ + { + "name": "partition_path", + "type": ["null", "string"], + "default": null + }, { "name": "awsRegion", "type": ["null", "string"], From 983c4992928dc46c4c7e46a3cf91bd214158b3f8 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Wed, 10 Jun 2026 10:33:02 -0700 Subject: [PATCH 09/18] Fix build --- .../org/apache/hudi/utilities/sources/helpers/QueryRunner.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java index d8cb663acdb4f..81ac929458f43 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java @@ -21,7 +21,6 @@ import org.apache.hudi.DataSourceReadOptions; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -42,7 +41,6 @@ import static org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_TABLE_VERSION; import static org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; -import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf; /** * This class is currently used only by s3 and gcs incr sources that supports size based batching From db636ba2a1f28a86874439ef5014e7a27816d9bc Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Wed, 10 Jun 2026 10:33:13 -0700 Subject: [PATCH 10/18] Fix test --- .../hudi/utilities/sources/helpers/QueryRunner.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java index 81ac929458f43..26b1fa867efca 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.sources.helpers; import org.apache.hudi.DataSourceReadOptions; +import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableVersion; @@ -87,13 +88,15 @@ public static Dataset applyOrdering(Dataset dataset, List orde public Pair> runIncrementalQuery(QueryInfo queryInfo) { log.info("Running incremental query"); - // S3/GCS event incremental sources operate with V1 checkpoint (commit#fileKey, requested-time based). - // Force INCREMENTAL_READ_TABLE_VERSION to 6 so the V1 incremental relation is always chosen, regardless - // of the source meta-table's actual version. Use previousInstant so the start-exclusive incremental scan - // still includes the commit (startInstant), required to resume from checkpoint commit#fileKey. + // S3/GCS event incremental sources operate with V1 checkpoint (commit#fileKey, requested-time based), + // so force INCREMENTAL_READ_TABLE_VERSION to 6. Use previousInstant so the start-exclusive incremental + // scan still includes the commit (startInstant), required to resume from checkpoint commit#fileKey. + // Disable the file-group reader so the read goes through IncrementalRelationV1 (requested-time based), + // which handles V9 tables correctly when INCREMENTAL_READ_TABLE_VERSION is forced to 6. return Pair.of(queryInfo, sparkSession.read().format("hudi") .option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()) .option(INCREMENTAL_READ_TABLE_VERSION().key(), HoodieTableVersion.SIX.versionCode()) + .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), false) .option(DataSourceReadOptions.START_COMMIT().key(), queryInfo.getPreviousInstant()) .option(DataSourceReadOptions.END_COMMIT().key(), queryInfo.getEndInstant()) .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), From 45f4a55792105928e4cca9fa8d5df060d6e3fd07 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Wed, 10 Jun 2026 15:26:30 -0700 Subject: [PATCH 11/18] Fix file group reader incremental reads dropping rows when filter columns are pruned Incremental span filters on _hoodie_commit_time are enforced via parquet push-down, which drops all rows when Spark prunes the column from the scan schema (count(), isEmpty()). Read filter-referenced columns regardless of Spark's pruning and project them away after filtering. Re-enable the file group reader in QueryRunner. Document the getMandatoryFields contract and prune declarations not backed by required filters. Cover COW and MOR (non-partitioned, unchanged schema) source meta-tables in the S3/GCS mid-commit pagination tests. --- .../hudi/HoodieHadoopFsRelationFactory.scala | 34 +++++- ...HoodieFileGroupReaderBasedFileFormat.scala | 49 +++++--- ...stIncrementalReadWithFileGroupReader.scala | 105 ++++++++++++++++++ .../sources/helpers/QueryRunner.java | 3 - .../S3EventsHoodieIncrSourceHarness.java | 8 +- .../TestGcsEventsHoodieIncrSource.java | 37 ++++-- .../sources/TestS3EventsHoodieIncrSource.java | 28 ++++- .../streamer-config/s3-metadata.avsc | 5 - 8 files changed, 224 insertions(+), 45 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala index a11e3075a6fb7..45bd0cda2279e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala @@ -227,6 +227,22 @@ abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext, protected lazy val fileStatusCache: FileStatusCache = FileStatusCache.getOrCreate(sparkSession) + /** + * Fields that must stay readable from the data files even when the query does not project + * them, passed to [[HoodieFileGroupReaderBasedFileFormat]]: + *

    + *
  • Partition fields listed here are read from the data files instead of being appended + * from the partition-path values, since the path encodes a transformed value (timestamp and + * custom key generators) or merging needs the original value (precombine as partition);
  • + *
  • Data fields listed here document a read dependency the query plan cannot see and must + * be backed by a filter from [[getRequiredFilters]]: the file format keeps filter-referenced + * columns readable when Spark's column pruning drops them and projects them away after + * filtering (e.g., the commit time meta field for the incremental span filters).
  • + *
+ * Fields needed only for log-file merging (record key, ordering fields) must NOT be declared + * here: the file group reader appends them to its read schema internally when a file slice + * actually requires merging. + */ protected def getMandatoryFields: Seq[String] = partitionColumnsToRead protected def isMOR: Boolean @@ -293,6 +309,8 @@ abstract class HoodieBaseMergeOnReadIncrementalHadoopFsRelationFactory(override isBootstrap: Boolean) extends HoodieBaseHadoopFsRelationFactory(sqlContext, metaClient, options, schemaSpec, isBootstrap) { + // the incremental span filters reference the commit time meta field, so it must stay + // readable even when the query does not project it override protected def getMandatoryFields: Seq[String] = Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ partitionColumnsToRead override protected def isMOR: Boolean = true @@ -353,6 +371,10 @@ class HoodieMergeOnReadCDCHadoopFsRelationFactory(override val sqlContext: SQLCo override def buildPartitionSchema(): StructType = StructType(Nil) + // CDC reads go through CDCFileGroupIterator with the CDC result schema, which does not + // contain the table fields, so no mandatory fields can or need to be declared + override protected def getMandatoryFields: Seq[String] = Seq.empty + override protected def getRequiredFilters: Seq[Filter] = Seq.empty } @@ -391,8 +413,10 @@ abstract class HoodieBaseCopyOnWriteIncrementalHadoopFsRelationFactory(override isBootstrap: Boolean) extends HoodieBaseHadoopFsRelationFactory(sqlContext, metaClient, options, schemaSpec, isBootstrap) { - override protected def getMandatoryFields(): Seq[String] = Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ - orderingFields ++ partitionColumnsToRead + // the incremental span filters reference the commit time meta field, so it must stay + // readable even when the query does not project it; record key and ordering fields are + // not declared since the relation factory path never merges at the relation level + override protected def getMandatoryFields: Seq[String] = Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ partitionColumnsToRead override protected def isMOR: Boolean = false @@ -453,7 +477,9 @@ class HoodieCopyOnWriteCDCHadoopFsRelationFactory(override val sqlContext: SQLCo override def buildPartitionSchema(): StructType = StructType(Nil) + // CDC reads go through CDCFileGroupIterator with the CDC result schema, which does not + // contain the table fields, so no mandatory fields can or need to be declared + override protected def getMandatoryFields: Seq[String] = Seq.empty + override protected def getRequiredFilters: Seq[Filter] = Seq.empty } - - diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index 3fe8c6ff62f71..e182abef56c73 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -250,6 +250,13 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { val outputSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental + // Required filters are enforced via parquet push-down, which evaluates predicates on columns + // absent from the read schema as all-null and would drop every row (e.g., count() prunes all + // columns), so filter-referenced columns must be read and projected away after filtering + val filterOnlyFields = requiredFilters.flatMap(_.references).distinct + .filterNot(name => requiredSchema.fieldNames.contains(name) || partitionSchema.fieldNames.contains(name)) + .flatMap(name => dataStructType.fields.find(_.name == name)) + val readRequiredSchema = StructType(requiredSchema.fields ++ filterOnlyFields) val augmentedStorageConf = new HadoopStorageConfiguration(hadoopConf).getInline setSchemaEvolutionConfigs(augmentedStorageConf) augmentedStorageConf.set(ENABLE_LOGICAL_TIMESTAMP_REPAIR, hasTimestampMillisFieldInTableSchema.toString) @@ -265,7 +272,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, val exclusionFields = new java.util.HashSet[String]() exclusionFields.add("op") partitionSchema.fields.foreach(f => exclusionFields.add(f.name)) - val requestedStructType = StructType(requiredSchema.fields ++ partitionSchema.fields.filter(f => mandatoryFields.contains(f.name))) + val requestedStructType = StructType(readRequiredSchema.fields ++ partitionSchema.fields.filter(f => mandatoryFields.contains(f.name))) val requestedSchema = HoodieSchemaUtils.pruneDataSchema(schema, HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(requestedStructType, sanitizedTableName), exclusionFields) val dataStructTypeWithMandatoryPartitionFields = StructType(dataStructType.fields ++ partitionSchema.fields.filter(f => mandatoryFields.contains(f.name))) val dataSchema = HoodieSchemaUtils.pruneDataSchema(schema, HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(dataStructTypeWithMandatoryPartitionFields, sanitizedTableName), exclusionFields) @@ -339,7 +346,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, case _ => readBaseFile(file, baseFileReader.value, requestedStructType, remainingPartitionSchema, fixedPartitionIndexes, - requiredSchema, partitionSchema, outputSchema, filters ++ requiredFilters, storageConf) + readRequiredSchema, partitionSchema, outputSchema, filters ++ requiredFilters, storageConf) } // CDC queries. case hoodiePartitionCDCFileGroupSliceMapping: HoodiePartitionCDCFileGroupMapping => @@ -347,7 +354,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, case _ => readBaseFile(file, baseFileReader.value, requestedStructType, remainingPartitionSchema, fixedPartitionIndexes, - requiredSchema, partitionSchema, outputSchema, filters ++ requiredFilters, storageConf) + readRequiredSchema, partitionSchema, outputSchema, filters ++ requiredFilters, storageConf) } CloseableIteratorListener.addListener(iter) } @@ -493,28 +500,31 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, // executor private def readBaseFile(file: PartitionedFile, parquetFileReader: SparkColumnarFileReader, requestedSchema: StructType, - remainingPartitionSchema: StructType, fixedPartitionIndexes: Set[Int], requiredSchema: StructType, + remainingPartitionSchema: StructType, fixedPartitionIndexes: Set[Int], readRequiredSchema: StructType, partitionSchema: StructType, outputSchema: StructType, filters: Seq[Filter], storageConf: StorageConfiguration[Configuration]): Iterator[InternalRow] = { // Detect vector columns and create modified schemas with BinaryType. // Each schema is detected independently because ordinals are relative to the schema being // modified — outputSchema and requestedSchema may have vector columns at different positions - // than requiredSchema (e.g. when partition columns are interleaved). - val (modifiedRequiredSchema, vectorCols) = withVectorRewrite(requiredSchema) + // than readRequiredSchema (e.g. when partition columns are interleaved). + val (modifiedReadRequiredSchema, vectorCols) = withVectorRewrite(readRequiredSchema) val hasVectors = vectorCols.nonEmpty val (modifiedOutputSchema, outputVectorCols) = if (hasVectors) withVectorRewrite(outputSchema) else (outputSchema, Map.empty[Int, HoodieSchema.Vector]) val (modifiedRequestedSchema, _) = if (hasVectors) withVectorRewrite(requestedSchema) else (requestedSchema, Map.empty[Int, HoodieSchema.Vector]) val rawIter = if (remainingPartitionSchema.fields.length == partitionSchema.fields.length) { //none of partition fields are read from the file, so the reader will do the appending for us - parquetFileReader.read(file, modifiedRequiredSchema, partitionSchema, internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType) + val iter = parquetFileReader.read(file, modifiedReadRequiredSchema, partitionSchema, internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType) + projectIfNeeded(iter, StructType(modifiedReadRequiredSchema.fields ++ partitionSchema.fields), modifiedOutputSchema) } else if (remainingPartitionSchema.fields.length == 0) { //we read all of the partition fields from the file val pfileUtils = sparkAdapter.getSparkPartitionedFileUtils //we need to modify the partitioned file so that the partition values are empty val modifiedFile = pfileUtils.createPartitionedFile(InternalRow.empty, pfileUtils.getPathFromPartitionedFile(file), file.start, file.length) + val readSchema = StructType(modifiedReadRequiredSchema.fields ++ partitionSchema.fields) //and we pass an empty schema for the partition schema - parquetFileReader.read(modifiedFile, modifiedOutputSchema, new StructType(), internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType) + val iter = parquetFileReader.read(modifiedFile, readSchema, new StructType(), internalSchemaOpt, filters, storageConf, tableSchemaAsMessageType) + projectIfNeeded(iter, readSchema, modifiedOutputSchema) } else { //need to do an additional projection here. The case in mind is that partition schema is "a,b,c" mandatoryFields is "a,c", //then we will read (dataSchema + a + c) and append b. So the final schema will be (data schema + a + c +b) @@ -527,13 +537,10 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, } if (hasVectors) { - // The raw iterator has BinaryType for vector columns; convert back to ArrayType - val readSchema = if (remainingPartitionSchema.fields.length == partitionSchema.fields.length) { - StructType(modifiedRequiredSchema.fields ++ partitionSchema.fields) - } else { - modifiedOutputSchema - } - wrapWithVectorConversion(rawIter, readSchema, outputSchema, outputVectorCols) + // The raw iterator has BinaryType for vector columns; convert back to ArrayType. + // All branches above produce rows in modifiedOutputSchema: filter-only columns from + // readRequiredSchema are projected away by projectIfNeeded/projectIter. + wrapWithVectorConversion(rawIter, modifiedOutputSchema, outputSchema, outputVectorCols) } else { rawIter } @@ -548,6 +555,18 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, }.asInstanceOf[Iterator[InternalRow]] } + /** + * Projects to `to` only when the read schema was augmented with filter-only columns; + * otherwise returns the iterator as is, preserving columnar batches. + */ + private def projectIfNeeded(iter: Iterator[InternalRow], from: StructType, to: StructType): Iterator[InternalRow] = { + if (from.fieldNames.sameElements(to.fieldNames)) { + iter + } else { + projectIter(iter, from, to) + } + } + private def getFixedPartitionValues(allPartitionValues: InternalRow, partitionSchema: StructType, fixedPartitionIndexes: Set[Int]): InternalRow = { InternalRow.fromSeq(allPartitionValues.toSeq(partitionSchema).zipWithIndex.filter(p => fixedPartitionIndexes.contains(p._2)).map(p => p._1)) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala new file mode 100644 index 0000000000000..e83f18e593e06 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableVersion} +import org.apache.hudi.config.HoodieCompactionConfig +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{DataFrame, SaveMode} +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.CsvSource + +/** + * Asserts incremental queries with the file group reader return correct results when the query + * does not project `_hoodie_commit_time` (count(), isEmpty(), narrow projections), since the + * incremental span filters are enforced via parquet push-down on that column. Runs on a session + * without HoodieSparkSessionExtension, so the filters are not injected into the logical plan + * and the file format alone must keep the filter columns readable. + */ +class TestIncrementalReadWithFileGroupReader extends SparkClientFunctionalTestHarness { + + val inserts: Seq[(Int, String, String, Double, String)] = Seq( + (10, "1", "rider-A", 19.10, "pt1"), + (10, "2", "rider-B", 27.70, "pt1"), + (10, "3", "rider-C", 33.90, "pt1"), + (10, "4", "rider-D", 34.15, "pt1"), + (10, "5", "rider-E", 17.85, "pt1"), + (10, "6", "rider-F", 41.06, "pt1")) + val updates: Seq[(Int, String, String, Double, String)] = Seq( + (11, "1", "rider-A", 1.10, "pt1"), + (11, "2", "rider-B", 2.20, "pt1"), + (11, "3", "rider-C", 3.30, "pt1")) + val columns: Seq[String] = Seq("ts", "key", "rider", "fare", "pt") + + @ParameterizedTest + @CsvSource(value = Array( + "COPY_ON_WRITE,true", + "COPY_ON_WRITE,false", + "MERGE_ON_READ,true", + "MERGE_ON_READ,false" + )) + def testCountAndPrunedProjections(tableType: String, forceV1Read: Boolean): Unit = { + write(inserts, tableType, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, SaveMode.Overwrite) + write(updates, tableType, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, SaveMode.Append) + if (tableType == "MERGE_ON_READ") { + // updates must land in log files so the incremental read merges logs + Assertions.assertTrue(fs.listStatus(new Path(basePath, "pt1")).exists(_.getPath.getName.contains(".log.")), + "Expected log files in the MOR table") + } + + Assertions.assertEquals(6, readIncremental(forceV1Read).collect().length) + // these query shapes prune `_hoodie_commit_time` out of the scan schema + Assertions.assertEquals(6L, readIncremental(forceV1Read).count()) + Assertions.assertFalse(readIncremental(forceV1Read).isEmpty) + Assertions.assertEquals(6L, readIncremental(forceV1Read).select("key").count()) + } + + private def write(data: Seq[(Int, String, String, Double, String)], + tableType: String, operation: String, mode: SaveMode): Unit = { + spark.createDataFrame(data).toDF(columns: _*).write.format("hudi") + .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "key") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "pt") + .option(HoodieTableConfig.ORDERING_FIELDS.key, "ts") + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType) + .option(DataSourceWriteOptions.TABLE_NAME.key, "test_incr_read_fgr") + .option(HoodieCompactionConfig.INLINE_COMPACT.key, "false") + .option(DataSourceWriteOptions.OPERATION.key, operation) + .option("hoodie.insert.shuffle.parallelism", "2") + .option("hoodie.upsert.shuffle.parallelism", "2") + .mode(mode) + .save(basePath) + } + + private def readIncremental(forceV1Read: Boolean): DataFrame = { + val reader = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.START_COMMIT.key(), "000") + val readerWithVersion = if (forceV1Read) { + // same access pattern as the S3/GCS event incremental sources + reader.option(DataSourceReadOptions.INCREMENTAL_READ_TABLE_VERSION.key(), + HoodieTableVersion.SIX.versionCode().toString) + } else { + reader + } + readerWithVersion.load(basePath) + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java index 26b1fa867efca..541f90ab6c9c2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java @@ -91,12 +91,9 @@ public Pair> runIncrementalQuery(QueryInfo queryInfo) { // S3/GCS event incremental sources operate with V1 checkpoint (commit#fileKey, requested-time based), // so force INCREMENTAL_READ_TABLE_VERSION to 6. Use previousInstant so the start-exclusive incremental // scan still includes the commit (startInstant), required to resume from checkpoint commit#fileKey. - // Disable the file-group reader so the read goes through IncrementalRelationV1 (requested-time based), - // which handles V9 tables correctly when INCREMENTAL_READ_TABLE_VERSION is forced to 6. return Pair.of(queryInfo, sparkSession.read().format("hudi") .option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()) .option(INCREMENTAL_READ_TABLE_VERSION().key(), HoodieTableVersion.SIX.versionCode()) - .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), false) .option(DataSourceReadOptions.START_COMMIT().key(), queryInfo.getPreviousInstant()) .option(DataSourceReadOptions.END_COMMIT().key(), queryInfo.getEndInstant()) .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java index bc3e522ccd0a3..7419c3d6e919d 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java @@ -79,7 +79,6 @@ import java.util.Map; import java.util.stream.Collectors; -import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.mockito.Mockito.when; @@ -162,7 +161,9 @@ protected String generateS3EventMetadata(Long objectSize, String bucketName, Str } protected HoodieRecord generateS3EventMetadata(String commitTime, String bucketName, String objectKey, Long objectSize) { - String partitionPath = bucketName; + // records must be written to a partition path consistent with the table config; otherwise + // the incremental read finds no partitions matching the commit metadata + String partitionPath = metaClient.getTableConfig().isTablePartitioned() ? bucketName : ""; HoodieSchema schema = S3_METADATA_SCHEMA; GenericRecord rec = new GenericData.Record(schema.toAvroSchema()); HoodieSchemaField s3Field = schema.getField("s3").get(); @@ -185,7 +186,6 @@ protected HoodieRecord generateS3EventMetadata(String commitTime, String bucketN s3Record.put("bucket", s3BucketRec); s3Record.put("object", s3ObjectRec); rec.put("s3", s3Record); - rec.put("partition_path", bucketName); rec.put("_hoodie_commit_time", commitTime); HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(rec)); @@ -235,7 +235,7 @@ protected Pair> writeS3MetadataRecords(String commitT .map(p -> generateS3EventMetadata(commitTime, "bucket-1", p.getLeft(), p.getRight())) .collect(Collectors.toList()); List statusList = writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime).collect(); - writeClient.commit(commitTime, jsc.parallelize(statusList), Option.empty(), COMMIT_ACTION, Collections.emptyMap(), Option.empty()); + writeClient.commit(commitTime, jsc.parallelize(statusList), Option.empty(), metaClient.getCommitActionType(), Collections.emptyMap(), Option.empty()); assertNoWriteErrors(statusList); return Pair.of(commitTime, s3MetadataRecords); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index e23670e7f0889..400cb59cd89f8 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -27,7 +27,9 @@ import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.checkpoint.Checkpoint; import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; @@ -61,6 +63,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; @@ -83,9 +86,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.stream.Collectors; -import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -315,13 +319,20 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, } /** - * Resume from `commit#fileKey` must re-include the start commit; runs on v6 and v8 source - * meta-tables since cloud event sources always use V1/requested-time regardless of version. + * Resume from `commit#fileKey` must re-include the start commit; runs on v6 and v8, COW and MOR + * source meta-tables since cloud event sources always use V1/requested-time regardless of version. */ @ParameterizedTest - @ValueSource(strings = {"6", "8"}) - void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion) throws IOException { - metaClient = getHoodieMetaClientWithTableVersion(storageConf(), basePath(), sourceTableVersion); + @CsvSource({"6,COPY_ON_WRITE", "8,COPY_ON_WRITE", "6,MERGE_ON_READ", "8,MERGE_ON_READ"}) + void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion, HoodieTableType tableType) throws IOException { + Properties tableProps = new Properties(); + tableProps.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(true)); + tableProps.put("hoodie.datasource.write.recordkey.field", "_row_key"); + tableProps.put("hoodie.datasource.write.partitionpath.field", ""); + tableProps.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key"); + tableProps.put(HoodieTableConfig.PARTITION_FIELDS.key(), ""); + tableProps.put(WRITE_TABLE_VERSION.key(), sourceTableVersion); + metaClient = getHoodieMetaClient(storageConf(), basePath(), tableProps, tableType); String startCommit = "1"; String laterCommit = "2"; @@ -332,6 +343,12 @@ void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion) th Pair.of("name/file-04.json", 100L), Pair.of("name/file-05.json", 100L))); writeGcsMetadataRecords(laterCommit); + if (tableType == HoodieTableType.MERGE_ON_READ) { + // the second commit's records must land in log files so the incremental read merges logs + boolean hasLogFiles = Arrays.stream(fs().listStatus(new Path(basePath()))) + .anyMatch(f -> f.getPath().getName().contains(".log.")); + Assertions.assertTrue(hasLogFiles, "Expected log files in the MOR source meta-table"); + } TypedProperties props = setProps(READ_UPTO_LATEST_COMMIT); props.setProperty(CloudSourceConfig.ENABLE_EXISTS_CHECK.key(), "false"); @@ -447,7 +464,9 @@ private HoodieRecord getGcsMetadataRecord(String commitTime, String filename, St } private HoodieRecord getGcsMetadataRecord(String commitTime, String filename, String bucketName, String generation, long size) { - String partitionPath = bucketName; + // records must be written to a partition path consistent with the table config; otherwise + // the incremental read finds no partitions matching the commit metadata + String partitionPath = metaClient.getTableConfig().isTablePartitioned() ? bucketName : ""; String id = "id:" + bucketName + "/" + filename + "/" + generation; String mediaLink = String.format("https://storage.googleapis.com/download/storage/v1/b/%s/o/%s" @@ -503,7 +522,7 @@ private Pair> writeGcsMetadataRecords(String commitTi getGcsMetadataRecord(commitTime, "data-file-4.json", "bucket-1", "1") ); List statusList = writeClient.upsert(jsc().parallelize(gcsMetadataRecords, 1), commitTime).collect(); - writeClient.commit(commitTime, jsc.parallelize(statusList), Option.empty(), COMMIT_ACTION, Collections.emptyMap(), Option.empty()); + writeClient.commit(commitTime, jsc.parallelize(statusList), Option.empty(), metaClient.getCommitActionType(), Collections.emptyMap(), Option.empty()); assertNoWriteErrors(statusList); return Pair.of(commitTime, gcsMetadataRecords); } @@ -519,7 +538,7 @@ private Pair> writeGcsMetadataRecords(String commitTi .map(p -> getGcsMetadataRecord(commitTime, p.getLeft(), "bucket-1", "1", p.getRight())) .collect(Collectors.toList()); List statusList = writeClient.upsert(jsc().parallelize(gcsMetadataRecords, 1), commitTime).collect(); - writeClient.commit(commitTime, jsc.parallelize(statusList), Option.empty(), COMMIT_ACTION, Collections.emptyMap(), Option.empty()); + writeClient.commit(commitTime, jsc.parallelize(statusList), Option.empty(), metaClient.getCommitActionType(), Collections.emptyMap(), Option.empty()); assertNoWriteErrors(statusList); return Pair.of(commitTime, gcsMetadataRecords); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index 3f406631fef18..9d40a9e2ac2a7 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -20,6 +20,8 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.checkpoint.Checkpoint; import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2; @@ -34,6 +36,7 @@ import org.apache.hudi.utilities.streamer.DefaultStreamContext; import org.apache.hudi.utilities.streamer.SourceProfile; +import org.apache.hadoop.fs.Path; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.junit.jupiter.api.Assertions; @@ -51,7 +54,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Properties; +import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -352,13 +357,20 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, } /** - * Resume from `commit#fileKey` must re-include the start commit; runs on v6 and v8 source - * meta-tables since cloud event sources always use V1/requested-time regardless of version. + * Resume from `commit#fileKey` must re-include the start commit; runs on v6 and v8, COW and MOR + * source meta-tables since cloud event sources always use V1/requested-time regardless of version. */ @ParameterizedTest - @ValueSource(strings = {"6", "8"}) - void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion) throws IOException { - metaClient = getHoodieMetaClientWithTableVersion(storageConf(), basePath(), sourceTableVersion); + @CsvSource({"6,COPY_ON_WRITE", "8,COPY_ON_WRITE", "6,MERGE_ON_READ", "8,MERGE_ON_READ"}) + void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion, HoodieTableType tableType) throws IOException { + Properties tableProps = new Properties(); + tableProps.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(true)); + tableProps.put("hoodie.datasource.write.recordkey.field", "_row_key"); + tableProps.put("hoodie.datasource.write.partitionpath.field", ""); + tableProps.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key"); + tableProps.put(HoodieTableConfig.PARTITION_FIELDS.key(), ""); + tableProps.put(WRITE_TABLE_VERSION.key(), sourceTableVersion); + metaClient = getHoodieMetaClient(storageConf(), basePath(), tableProps, tableType); String startCommit = "1"; String laterCommit = "2"; @@ -369,6 +381,12 @@ void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion) th Pair.of("path/to/file-04.json", 100L), Pair.of("path/to/file-05.json", 100L))); writeS3MetadataRecords(laterCommit); + if (tableType == HoodieTableType.MERGE_ON_READ) { + // the second commit's record must land in a log file so the incremental read merges logs + boolean hasLogFiles = Arrays.stream(fs().listStatus(new Path(basePath()))) + .anyMatch(f -> f.getPath().getName().contains(".log.")); + Assertions.assertTrue(hasLogFiles, "Expected log files in the MOR source meta-table"); + } TypedProperties props = setProps(READ_UPTO_LATEST_COMMIT); props.setProperty(CloudSourceConfig.ENABLE_EXISTS_CHECK.key(), "false"); diff --git a/hudi-utilities/src/test/resources/streamer-config/s3-metadata.avsc b/hudi-utilities/src/test/resources/streamer-config/s3-metadata.avsc index cce14a472453e..64b169c1373ec 100644 --- a/hudi-utilities/src/test/resources/streamer-config/s3-metadata.avsc +++ b/hudi-utilities/src/test/resources/streamer-config/s3-metadata.avsc @@ -20,11 +20,6 @@ "name": "hoodie_source", "namespace": "hoodie.source", "fields": [ - { - "name": "partition_path", - "type": ["null", "string"], - "default": null - }, { "name": "awsRegion", "type": ["null", "string"], From af04f7b9ebf5922966ec0d64033f2eb28ea80757 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Wed, 10 Jun 2026 17:33:24 -0700 Subject: [PATCH 12/18] Keep getMandatoryFields declarations unchanged The fix derives the extra read columns from the required filters only, so the declaration cleanup and contract docs can land separately. --- .../hudi/HoodieHadoopFsRelationFactory.scala | 34 +++---------------- 1 file changed, 4 insertions(+), 30 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala index 45bd0cda2279e..a11e3075a6fb7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala @@ -227,22 +227,6 @@ abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext, protected lazy val fileStatusCache: FileStatusCache = FileStatusCache.getOrCreate(sparkSession) - /** - * Fields that must stay readable from the data files even when the query does not project - * them, passed to [[HoodieFileGroupReaderBasedFileFormat]]: - *
    - *
  • Partition fields listed here are read from the data files instead of being appended - * from the partition-path values, since the path encodes a transformed value (timestamp and - * custom key generators) or merging needs the original value (precombine as partition);
  • - *
  • Data fields listed here document a read dependency the query plan cannot see and must - * be backed by a filter from [[getRequiredFilters]]: the file format keeps filter-referenced - * columns readable when Spark's column pruning drops them and projects them away after - * filtering (e.g., the commit time meta field for the incremental span filters).
  • - *
- * Fields needed only for log-file merging (record key, ordering fields) must NOT be declared - * here: the file group reader appends them to its read schema internally when a file slice - * actually requires merging. - */ protected def getMandatoryFields: Seq[String] = partitionColumnsToRead protected def isMOR: Boolean @@ -309,8 +293,6 @@ abstract class HoodieBaseMergeOnReadIncrementalHadoopFsRelationFactory(override isBootstrap: Boolean) extends HoodieBaseHadoopFsRelationFactory(sqlContext, metaClient, options, schemaSpec, isBootstrap) { - // the incremental span filters reference the commit time meta field, so it must stay - // readable even when the query does not project it override protected def getMandatoryFields: Seq[String] = Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ partitionColumnsToRead override protected def isMOR: Boolean = true @@ -371,10 +353,6 @@ class HoodieMergeOnReadCDCHadoopFsRelationFactory(override val sqlContext: SQLCo override def buildPartitionSchema(): StructType = StructType(Nil) - // CDC reads go through CDCFileGroupIterator with the CDC result schema, which does not - // contain the table fields, so no mandatory fields can or need to be declared - override protected def getMandatoryFields: Seq[String] = Seq.empty - override protected def getRequiredFilters: Seq[Filter] = Seq.empty } @@ -413,10 +391,8 @@ abstract class HoodieBaseCopyOnWriteIncrementalHadoopFsRelationFactory(override isBootstrap: Boolean) extends HoodieBaseHadoopFsRelationFactory(sqlContext, metaClient, options, schemaSpec, isBootstrap) { - // the incremental span filters reference the commit time meta field, so it must stay - // readable even when the query does not project it; record key and ordering fields are - // not declared since the relation factory path never merges at the relation level - override protected def getMandatoryFields: Seq[String] = Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ partitionColumnsToRead + override protected def getMandatoryFields(): Seq[String] = Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ + orderingFields ++ partitionColumnsToRead override protected def isMOR: Boolean = false @@ -477,9 +453,7 @@ class HoodieCopyOnWriteCDCHadoopFsRelationFactory(override val sqlContext: SQLCo override def buildPartitionSchema(): StructType = StructType(Nil) - // CDC reads go through CDCFileGroupIterator with the CDC result schema, which does not - // contain the table fields, so no mandatory fields can or need to be declared - override protected def getMandatoryFields: Seq[String] = Seq.empty - override protected def getRequiredFilters: Seq[Filter] = Seq.empty } + + From 3ca99e9a4614784f2f4fdd80cd081d645ffe350b Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Wed, 10 Jun 2026 17:50:51 -0700 Subject: [PATCH 13/18] Expand incremental query coverage and fix MOR test data Write records to a partition path consistent with the table config so the non-partitioned meta-tables list correctly, and re-write an existing key in the second commit so MOR runs produce log files. Expand the file group reader incremental test to cover COW/MOR x source table v6/v8 x v6/v8 reads, with preconditions on small-file handling and base/log file layout, incremental ranges over base-only, base-plus-log, log-only, and empty windows, and select *, narrow projection, count(), and isEmpty() per range. --- ...stIncrementalReadWithFileGroupReader.scala | 155 +++++++++++++----- .../TestGcsEventsHoodieIncrSource.java | 5 +- .../sources/TestS3EventsHoodieIncrSource.java | 5 +- 3 files changed, 122 insertions(+), 43 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala index e83f18e593e06..263d3a26857f8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala @@ -18,8 +18,9 @@ package org.apache.hudi.functional import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} -import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableVersion} -import org.apache.hudi.config.HoodieCompactionConfig +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig} import org.apache.hudi.testutils.SparkClientFunctionalTestHarness import org.apache.hadoop.fs.Path @@ -28,59 +29,127 @@ import org.junit.jupiter.api.Assertions import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource +import scala.collection.JavaConverters._ + /** - * Asserts incremental queries with the file group reader return correct results when the query - * does not project `_hoodie_commit_time` (count(), isEmpty(), narrow projections), since the - * incremental span filters are enforced via parquet push-down on that column. Runs on a session - * without HoodieSparkSessionExtension, so the filters are not injected into the logical plan - * and the file format alone must keep the filter columns readable. + * Incremental query correctness with the file group reader across source table versions and + * read versions, COW and MOR (file slices with and without log files), and query shapes that + * prune `_hoodie_commit_time` out of the scan schema (count(), isEmpty(), narrow projections) + * where the incremental span filters must stay effective. Runs on a session without + * HoodieSparkSessionExtension, so the filters are not injected into the logical plan and the + * file format alone must keep the filter columns readable. */ class TestIncrementalReadWithFileGroupReader extends SparkClientFunctionalTestHarness { - val inserts: Seq[(Int, String, String, Double, String)] = Seq( - (10, "1", "rider-A", 19.10, "pt1"), - (10, "2", "rider-B", 27.70, "pt1"), - (10, "3", "rider-C", 33.90, "pt1"), - (10, "4", "rider-D", 34.15, "pt1"), - (10, "5", "rider-E", 17.85, "pt1"), - (10, "6", "rider-F", 41.06, "pt1")) - val updates: Seq[(Int, String, String, Double, String)] = Seq( - (11, "1", "rider-A", 1.10, "pt1"), - (11, "2", "rider-B", 2.20, "pt1"), - (11, "3", "rider-C", 3.30, "pt1")) val columns: Seq[String] = Seq("ts", "key", "rider", "fare", "pt") + // commits c1..c3 insert disjoint key pairs (small file handling keeps one file group with + // base files only); commits c4..c6 update those pairs (log files on MOR) + val batches: Seq[(Seq[(Int, String, String, Double, String)], String)] = Seq( + (Seq((1, "k1", "rider-c1", 10.0, "pt1"), (1, "k2", "rider-c1", 10.0, "pt1")), + DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL), + (Seq((2, "k3", "rider-c2", 20.0, "pt1"), (2, "k4", "rider-c2", 20.0, "pt1")), + DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL), + (Seq((3, "k5", "rider-c3", 30.0, "pt1"), (3, "k6", "rider-c3", 30.0, "pt1")), + DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL), + (Seq((4, "k1", "rider-c4", 40.0, "pt1"), (4, "k2", "rider-c4", 40.0, "pt1")), + DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL), + (Seq((5, "k3", "rider-c5", 50.0, "pt1"), (5, "k4", "rider-c5", 50.0, "pt1")), + DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL), + (Seq((6, "k5", "rider-c6", 60.0, "pt1"), (6, "k6", "rider-c6", 60.0, "pt1")), + DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)) + @ParameterizedTest @CsvSource(value = Array( - "COPY_ON_WRITE,true", - "COPY_ON_WRITE,false", - "MERGE_ON_READ,true", - "MERGE_ON_READ,false" + "COPY_ON_WRITE,6,6", + "COPY_ON_WRITE,8,6", + "COPY_ON_WRITE,8,8", + "MERGE_ON_READ,6,6", + "MERGE_ON_READ,8,6", + "MERGE_ON_READ,8,8" )) - def testCountAndPrunedProjections(tableType: String, forceV1Read: Boolean): Unit = { - write(inserts, tableType, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, SaveMode.Overwrite) - write(updates, tableType, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, SaveMode.Append) + def testIncrementalReadRanges(tableType: String, sourceVersion: Int, readVersion: Int): Unit = { + batches.zipWithIndex.foreach { case ((data, operation), i) => + val mode = if (i == 0) SaveMode.Overwrite else SaveMode.Append + write(data, tableType, sourceVersion, operation, mode) + if (i == 2) { + // small file handling must have kept a single file group with base files only + val (baseFiles, logFiles) = listDataFiles() + Assertions.assertEquals(3, baseFiles.size, "Expected one base file per insert commit") + Assertions.assertEquals(1, baseFiles.map(fileId).distinct.size, "Expected a single file group") + Assertions.assertTrue(logFiles.isEmpty, "Expected no log files after insert-only commits") + } + } + + val metaClient = HoodieTableMetaClient.builder() + .setConf(storageConf().newInstance()).setBasePath(basePath()).build() + Assertions.assertEquals(sourceVersion, metaClient.getTableConfig.getTableVersion.versionCode()) + val (baseFiles, logFiles) = listDataFiles() + Assertions.assertEquals(1, baseFiles.map(fileId).distinct.size, "Expected a single file group") if (tableType == "MERGE_ON_READ") { - // updates must land in log files so the incremental read merges logs - Assertions.assertTrue(fs.listStatus(new Path(basePath, "pt1")).exists(_.getPath.getName.contains(".log.")), - "Expected log files in the MOR table") + Assertions.assertEquals(3, baseFiles.size, "Update commits must not rewrite MOR base files") + Assertions.assertEquals(3, logFiles.size, "Expected one log file per update commit") + } else { + Assertions.assertEquals(6, baseFiles.size, "Expected one base file per commit") + Assertions.assertTrue(logFiles.isEmpty, "Expected no log files on COW") } - Assertions.assertEquals(6, readIncremental(forceV1Read).collect().length) + // c1..c6 ordered by requested time + val instants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants + .getInstants.asScala.toList + Assertions.assertEquals(6, instants.size) + + // (000, c2]: base files only + assertIncrementalRange(readVersion, instants, 0, 2, + Set(("k1", 1), ("k2", 1), ("k3", 2), ("k4", 2))) + // (c2, c4]: base file commit c3 plus, on MOR, the log file of c4; rows carried over into + // the c3 base file from c1/c2 must be filtered out + assertIncrementalRange(readVersion, instants, 2, 4, + Set(("k5", 3), ("k6", 3), ("k1", 4), ("k2", 4))) + // (c3, c5]: on MOR only the log files of c4/c5 are in range + assertIncrementalRange(readVersion, instants, 3, 5, + Set(("k1", 4), ("k2", 4), ("k3", 5), ("k4", 5))) + // (c6, c6]: empty range + assertIncrementalRange(readVersion, instants, 6, 6, Set.empty) + } + + private def assertIncrementalRange(readVersion: Int, + instants: List[HoodieInstant], + startIdx: Int, endIdx: Int, + expected: Set[(String, Int)]): Unit = { + def boundary(idx: Int): String = { + if (idx == 0) { + "000" + } else if (readVersion == 6) { + instants(idx - 1).requestedTime + } else { + instants(idx - 1).getCompletionTime + } + } + val start = boundary(startIdx) + val end = boundary(endIdx) + + // select * + val rows = readIncremental(readVersion, start, end).collect() + .map(r => (r.getAs[String]("key"), r.getAs[Int]("ts"))).toSet + Assertions.assertEquals(expected, rows) + // projection without _hoodie_commit_time + val keys = readIncremental(readVersion, start, end).select("key").collect().map(_.getString(0)).toSet + Assertions.assertEquals(expected.map(_._1), keys) // these query shapes prune `_hoodie_commit_time` out of the scan schema - Assertions.assertEquals(6L, readIncremental(forceV1Read).count()) - Assertions.assertFalse(readIncremental(forceV1Read).isEmpty) - Assertions.assertEquals(6L, readIncremental(forceV1Read).select("key").count()) + Assertions.assertEquals(expected.size.toLong, readIncremental(readVersion, start, end).count()) + Assertions.assertEquals(expected.isEmpty, readIncremental(readVersion, start, end).isEmpty) } - private def write(data: Seq[(Int, String, String, Double, String)], - tableType: String, operation: String, mode: SaveMode): Unit = { + private def write(data: Seq[(Int, String, String, Double, String)], tableType: String, + sourceVersion: Int, operation: String, mode: SaveMode): Unit = { spark.createDataFrame(data).toDF(columns: _*).write.format("hudi") .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "key") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "pt") .option(HoodieTableConfig.ORDERING_FIELDS.key, "ts") .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType) .option(DataSourceWriteOptions.TABLE_NAME.key, "test_incr_read_fgr") + .option(HoodieWriteConfig.WRITE_TABLE_VERSION.key, sourceVersion.toString) .option(HoodieCompactionConfig.INLINE_COMPACT.key, "false") .option(DataSourceWriteOptions.OPERATION.key, operation) .option("hoodie.insert.shuffle.parallelism", "2") @@ -89,17 +158,25 @@ class TestIncrementalReadWithFileGroupReader extends SparkClientFunctionalTestHa .save(basePath) } - private def readIncremental(forceV1Read: Boolean): DataFrame = { + private def readIncremental(readVersion: Int, start: String, end: String): DataFrame = { val reader = spark.read.format("hudi") .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.START_COMMIT.key(), "000") - val readerWithVersion = if (forceV1Read) { + .option(DataSourceReadOptions.START_COMMIT.key(), start) + .option(DataSourceReadOptions.END_COMMIT.key(), end) + val readerWithVersion = if (readVersion == 6) { // same access pattern as the S3/GCS event incremental sources - reader.option(DataSourceReadOptions.INCREMENTAL_READ_TABLE_VERSION.key(), - HoodieTableVersion.SIX.versionCode().toString) + reader.option(DataSourceReadOptions.INCREMENTAL_READ_TABLE_VERSION.key(), "6") } else { reader } readerWithVersion.load(basePath) } + + private def listDataFiles(): (Seq[String], Seq[String]) = { + val names = fs.listStatus(new Path(basePath, "pt1")).map(_.getPath.getName).toSeq + // log files are dot-prefixed: .{fileId}_{baseCommit}.log.{version}_{writeToken} + (names.filter(n => !n.startsWith(".") && n.endsWith(".parquet")), names.filter(_.contains(".log."))) + } + + private def fileId(baseFileName: String): String = baseFileName.substring(0, baseFileName.indexOf("_")) } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index 400cb59cd89f8..b934653f813bf 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -342,9 +342,10 @@ void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion, Ho Pair.of("name/file-03.json", 100L), Pair.of("name/file-04.json", 100L), Pair.of("name/file-05.json", 100L))); - writeGcsMetadataRecords(laterCommit); + // the second commit re-writes an existing object key (a re-uploaded object), which MOR + // routes to a log file so the incremental read merges logs + writeGcsMetadataRecords(laterCommit, Arrays.asList(Pair.of("name/file-05.json", 100L))); if (tableType == HoodieTableType.MERGE_ON_READ) { - // the second commit's records must land in log files so the incremental read merges logs boolean hasLogFiles = Arrays.stream(fs().listStatus(new Path(basePath()))) .anyMatch(f -> f.getPath().getName().contains(".log.")); Assertions.assertTrue(hasLogFiles, "Expected log files in the MOR source meta-table"); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index 9d40a9e2ac2a7..8087c0ba657f6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -380,9 +380,10 @@ void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion, Ho Pair.of("path/to/file-03.json", 100L), Pair.of("path/to/file-04.json", 100L), Pair.of("path/to/file-05.json", 100L))); - writeS3MetadataRecords(laterCommit); + // the second commit re-writes an existing object key (an S3 re-upload event), which MOR + // routes to a log file so the incremental read merges logs + writeS3MetadataRecords(laterCommit, Arrays.asList(Pair.of("path/to/file-05.json", 100L))); if (tableType == HoodieTableType.MERGE_ON_READ) { - // the second commit's record must land in a log file so the incremental read merges logs boolean hasLogFiles = Arrays.stream(fs().listStatus(new Path(basePath()))) .anyMatch(f -> f.getPath().getName().contains(".log.")); Assertions.assertTrue(hasLogFiles, "Expected log files in the MOR source meta-table"); From cd7a6a305575539c76d4418d2e422337ade1875d Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Wed, 10 Jun 2026 18:11:02 -0700 Subject: [PATCH 14/18] Tighten test comments --- ...stIncrementalReadWithFileGroupReader.scala | 19 ++++++++----------- .../S3EventsHoodieIncrSourceHarness.java | 3 +-- .../TestGcsEventsHoodieIncrSource.java | 6 ++---- .../sources/TestS3EventsHoodieIncrSource.java | 3 +-- 4 files changed, 12 insertions(+), 19 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala index 263d3a26857f8..0b4e0a31f3b69 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala @@ -32,19 +32,17 @@ import org.junit.jupiter.params.provider.CsvSource import scala.collection.JavaConverters._ /** - * Incremental query correctness with the file group reader across source table versions and - * read versions, COW and MOR (file slices with and without log files), and query shapes that - * prune `_hoodie_commit_time` out of the scan schema (count(), isEmpty(), narrow projections) - * where the incremental span filters must stay effective. Runs on a session without - * HoodieSparkSessionExtension, so the filters are not injected into the logical plan and the - * file format alone must keep the filter columns readable. + * Incremental query correctness with the file group reader across COW/MOR, source table + * versions, read versions, and query shapes that prune `_hoodie_commit_time` from the scan + * schema (count(), isEmpty(), narrow projections). Runs without HoodieSparkSessionExtension, + * so the file format alone must keep the span-filter columns readable. */ class TestIncrementalReadWithFileGroupReader extends SparkClientFunctionalTestHarness { val columns: Seq[String] = Seq("ts", "key", "rider", "fare", "pt") - // commits c1..c3 insert disjoint key pairs (small file handling keeps one file group with - // base files only); commits c4..c6 update those pairs (log files on MOR) + // c1..c3 insert disjoint key pairs (one file group, base files only via small file handling); + // c4..c6 update those pairs (log files on MOR) val batches: Seq[(Seq[(Int, String, String, Double, String)], String)] = Seq( (Seq((1, "k1", "rider-c1", 10.0, "pt1"), (1, "k2", "rider-c1", 10.0, "pt1")), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL), @@ -102,11 +100,10 @@ class TestIncrementalReadWithFileGroupReader extends SparkClientFunctionalTestHa // (000, c2]: base files only assertIncrementalRange(readVersion, instants, 0, 2, Set(("k1", 1), ("k2", 1), ("k3", 2), ("k4", 2))) - // (c2, c4]: base file commit c3 plus, on MOR, the log file of c4; rows carried over into - // the c3 base file from c1/c2 must be filtered out + // (c2, c4]: base file of c3 plus c4's log file on MOR; carried-over c1/c2 rows filtered out assertIncrementalRange(readVersion, instants, 2, 4, Set(("k5", 3), ("k6", 3), ("k1", 4), ("k2", 4))) - // (c3, c5]: on MOR only the log files of c4/c5 are in range + // (c3, c5]: log files of c4/c5 only on MOR assertIncrementalRange(readVersion, instants, 3, 5, Set(("k1", 4), ("k2", 4), ("k3", 5), ("k4", 5))) // (c6, c6]: empty range diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java index 7419c3d6e919d..acaa7eefba551 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSourceHarness.java @@ -161,8 +161,7 @@ protected String generateS3EventMetadata(Long objectSize, String bucketName, Str } protected HoodieRecord generateS3EventMetadata(String commitTime, String bucketName, String objectKey, Long objectSize) { - // records must be written to a partition path consistent with the table config; otherwise - // the incremental read finds no partitions matching the commit metadata + // partition path must match the table config, or the incremental read lists no partitions String partitionPath = metaClient.getTableConfig().isTablePartitioned() ? bucketName : ""; HoodieSchema schema = S3_METADATA_SCHEMA; GenericRecord rec = new GenericData.Record(schema.toAvroSchema()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index b934653f813bf..25375231145ff 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -342,8 +342,7 @@ void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion, Ho Pair.of("name/file-03.json", 100L), Pair.of("name/file-04.json", 100L), Pair.of("name/file-05.json", 100L))); - // the second commit re-writes an existing object key (a re-uploaded object), which MOR - // routes to a log file so the incremental read merges logs + // the second commit re-writes an existing key (a re-uploaded object), landing in a log file on MOR writeGcsMetadataRecords(laterCommit, Arrays.asList(Pair.of("name/file-05.json", 100L))); if (tableType == HoodieTableType.MERGE_ON_READ) { boolean hasLogFiles = Arrays.stream(fs().listStatus(new Path(basePath()))) @@ -465,8 +464,7 @@ private HoodieRecord getGcsMetadataRecord(String commitTime, String filename, St } private HoodieRecord getGcsMetadataRecord(String commitTime, String filename, String bucketName, String generation, long size) { - // records must be written to a partition path consistent with the table config; otherwise - // the incremental read finds no partitions matching the commit metadata + // partition path must match the table config, or the incremental read lists no partitions String partitionPath = metaClient.getTableConfig().isTablePartitioned() ? bucketName : ""; String id = "id:" + bucketName + "/" + filename + "/" + generation; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index 8087c0ba657f6..691f4e44376dd 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -380,8 +380,7 @@ void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion, Ho Pair.of("path/to/file-03.json", 100L), Pair.of("path/to/file-04.json", 100L), Pair.of("path/to/file-05.json", 100L))); - // the second commit re-writes an existing object key (an S3 re-upload event), which MOR - // routes to a log file so the incremental read merges logs + // the second commit re-writes an existing key (an S3 re-upload), landing in a log file on MOR writeS3MetadataRecords(laterCommit, Arrays.asList(Pair.of("path/to/file-05.json", 100L))); if (tableType == HoodieTableType.MERGE_ON_READ) { boolean hasLogFiles = Arrays.stream(fs().listStatus(new Path(basePath()))) From 1c9c63876e3a18c3deb75818df8c8062b2157534 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Wed, 10 Jun 2026 19:10:34 -0700 Subject: [PATCH 15/18] Address review comments --- ...HoodieFileGroupReaderBasedFileFormat.scala | 9 ++- ...stIncrementalReadWithFileGroupReader.scala | 58 +++++++++++-------- 2 files changed, 40 insertions(+), 27 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index e182abef56c73..f3e2330309366 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -250,9 +250,12 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { val outputSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental - // Required filters are enforced via parquet push-down, which evaluates predicates on columns - // absent from the read schema as all-null and would drop every row (e.g., count() prunes all - // columns), so filter-referenced columns must be read and projected away after filtering + // Spark planner only adds the user-provided predicates (from `WHERE` clause or `.filter()`) + // to `filters`; the `requiredFilters` from `HoodieBaseHadoopFsRelationFactory#getRequiredFilters` + // are not visible to the planner, thus the `requiredSchema` passed by Spark can miss the + // columns in `requiredFilters`. This happens for incremental query where `requiredFilters` + // is present. To allow correct projection and filtering, the columns from `requiredFilters` + // are added back to the `readRequiredSchema` for reading the file. val filterOnlyFields = requiredFilters.flatMap(_.references).distinct .filterNot(name => requiredSchema.fieldNames.contains(name) || partitionSchema.fieldNames.contains(name)) .flatMap(name => dataStructType.fields.find(_.name == name)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala index 0b4e0a31f3b69..ccb23158ad414 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFileGroupReader.scala @@ -18,14 +18,16 @@ package org.apache.hudi.functional import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} +import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig} +import org.apache.hudi.storage.StoragePath import org.apache.hudi.testutils.SparkClientFunctionalTestHarness import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, SaveMode} -import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource @@ -42,7 +44,8 @@ class TestIncrementalReadWithFileGroupReader extends SparkClientFunctionalTestHa val columns: Seq[String] = Seq("ts", "key", "rider", "fare", "pt") // c1..c3 insert disjoint key pairs (one file group, base files only via small file handling); - // c4..c6 update those pairs (log files on MOR) + // c4..c6 are update commits (log files on MOR), each updating k1 with a different value so a + // range must surface only the targeted update of k1 val batches: Seq[(Seq[(Int, String, String, Double, String)], String)] = Seq( (Seq((1, "k1", "rider-c1", 10.0, "pt1"), (1, "k2", "rider-c1", 10.0, "pt1")), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL), @@ -52,9 +55,9 @@ class TestIncrementalReadWithFileGroupReader extends SparkClientFunctionalTestHa DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL), (Seq((4, "k1", "rider-c4", 40.0, "pt1"), (4, "k2", "rider-c4", 40.0, "pt1")), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL), - (Seq((5, "k3", "rider-c5", 50.0, "pt1"), (5, "k4", "rider-c5", 50.0, "pt1")), + (Seq((5, "k1", "rider-c5", 50.0, "pt1"), (5, "k3", "rider-c5", 50.0, "pt1"), (5, "k4", "rider-c5", 50.0, "pt1")), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL), - (Seq((6, "k5", "rider-c6", 60.0, "pt1"), (6, "k6", "rider-c6", 60.0, "pt1")), + (Seq((6, "k1", "rider-c6", 60.0, "pt1"), (6, "k5", "rider-c6", 60.0, "pt1"), (6, "k6", "rider-c6", 60.0, "pt1")), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)) @ParameterizedTest @@ -73,39 +76,49 @@ class TestIncrementalReadWithFileGroupReader extends SparkClientFunctionalTestHa if (i == 2) { // small file handling must have kept a single file group with base files only val (baseFiles, logFiles) = listDataFiles() - Assertions.assertEquals(3, baseFiles.size, "Expected one base file per insert commit") - Assertions.assertEquals(1, baseFiles.map(fileId).distinct.size, "Expected a single file group") - Assertions.assertTrue(logFiles.isEmpty, "Expected no log files after insert-only commits") + assertEquals(3, baseFiles.size, "Expected one base file per insert commit") + assertEquals(1, baseFiles.map(FSUtils.getFileId).distinct.size, "Expected a single file group") + assertTrue(logFiles.isEmpty, "Expected no log files after insert-only commits") } } val metaClient = HoodieTableMetaClient.builder() .setConf(storageConf().newInstance()).setBasePath(basePath()).build() - Assertions.assertEquals(sourceVersion, metaClient.getTableConfig.getTableVersion.versionCode()) + assertEquals(sourceVersion, metaClient.getTableConfig.getTableVersion.versionCode()) val (baseFiles, logFiles) = listDataFiles() - Assertions.assertEquals(1, baseFiles.map(fileId).distinct.size, "Expected a single file group") + assertEquals(1, baseFiles.map(FSUtils.getFileId).distinct.size, "Expected a single file group") if (tableType == "MERGE_ON_READ") { - Assertions.assertEquals(3, baseFiles.size, "Update commits must not rewrite MOR base files") - Assertions.assertEquals(3, logFiles.size, "Expected one log file per update commit") + assertEquals(3, baseFiles.size, "Update commits must not rewrite MOR base files") + assertEquals(3, logFiles.size, "Expected one log file per update commit") } else { - Assertions.assertEquals(6, baseFiles.size, "Expected one base file per commit") - Assertions.assertTrue(logFiles.isEmpty, "Expected no log files on COW") + assertEquals(6, baseFiles.size, "Expected one base file per commit") + assertTrue(logFiles.isEmpty, "Expected no log files on COW") } + // records merged into the latest base file keep their original commit times + val latestBaseFile = baseFiles.maxBy(name => FSUtils.getCommitTime(name)) + val commitTimesInBaseFile = spark.read.parquet(new Path(new Path(basePath, "pt1"), latestBaseFile).toString) + .select("_hoodie_commit_time").distinct().count() + assertTrue(commitTimesInBaseFile > 1, + s"Expected multiple commit times in the latest base file, got $commitTimesInBaseFile") // c1..c6 ordered by requested time val instants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants .getInstants.asScala.toList - Assertions.assertEquals(6, instants.size) + assertEquals(6, instants.size) // (000, c2]: base files only assertIncrementalRange(readVersion, instants, 0, 2, Set(("k1", 1), ("k2", 1), ("k3", 2), ("k4", 2))) + // (c1, c2]: single base file in range + assertIncrementalRange(readVersion, instants, 1, 2, + Set(("k3", 2), ("k4", 2))) // (c2, c4]: base file of c3 plus c4's log file on MOR; carried-over c1/c2 rows filtered out assertIncrementalRange(readVersion, instants, 2, 4, Set(("k5", 3), ("k6", 3), ("k1", 4), ("k2", 4))) - // (c3, c5]: log files of c4/c5 only on MOR + // (c3, c5]: log files of c4/c5 only on MOR; k1 updated in both c4 and c5 must surface once + // with the latest in-range value assertIncrementalRange(readVersion, instants, 3, 5, - Set(("k1", 4), ("k2", 4), ("k3", 5), ("k4", 5))) + Set(("k1", 5), ("k2", 4), ("k3", 5), ("k4", 5))) // (c6, c6]: empty range assertIncrementalRange(readVersion, instants, 6, 6, Set.empty) } @@ -129,13 +142,13 @@ class TestIncrementalReadWithFileGroupReader extends SparkClientFunctionalTestHa // select * val rows = readIncremental(readVersion, start, end).collect() .map(r => (r.getAs[String]("key"), r.getAs[Int]("ts"))).toSet - Assertions.assertEquals(expected, rows) + assertEquals(expected, rows) // projection without _hoodie_commit_time val keys = readIncremental(readVersion, start, end).select("key").collect().map(_.getString(0)).toSet - Assertions.assertEquals(expected.map(_._1), keys) + assertEquals(expected.map(_._1), keys) // these query shapes prune `_hoodie_commit_time` out of the scan schema - Assertions.assertEquals(expected.size.toLong, readIncremental(readVersion, start, end).count()) - Assertions.assertEquals(expected.isEmpty, readIncremental(readVersion, start, end).isEmpty) + assertEquals(expected.size.toLong, readIncremental(readVersion, start, end).count()) + assertEquals(expected.isEmpty, readIncremental(readVersion, start, end).isEmpty) } private def write(data: Seq[(Int, String, String, Double, String)], tableType: String, @@ -171,9 +184,6 @@ class TestIncrementalReadWithFileGroupReader extends SparkClientFunctionalTestHa private def listDataFiles(): (Seq[String], Seq[String]) = { val names = fs.listStatus(new Path(basePath, "pt1")).map(_.getPath.getName).toSeq - // log files are dot-prefixed: .{fileId}_{baseCommit}.log.{version}_{writeToken} - (names.filter(n => !n.startsWith(".") && n.endsWith(".parquet")), names.filter(_.contains(".log."))) + (names.filter(n => FSUtils.isBaseFile(new StoragePath(n))), names.filter(n => FSUtils.isLogFile(n))) } - - private def fileId(baseFileName: String): String = baseFileName.substring(0, baseFileName.indexOf("_")) } From 22267a0f9273f286e6da847e8ab54cbfa2ea74c8 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Wed, 10 Jun 2026 19:16:04 -0700 Subject: [PATCH 16/18] Use individual static imports for Assertions and Mockito --- .../TestGcsEventsHoodieIncrSource.java | 28 +++++++------ .../sources/TestS3EventsHoodieIncrSource.java | 42 ++++++++++--------- 2 files changed, 37 insertions(+), 33 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index 25375231145ff..b843eaf3113a6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -68,7 +68,6 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -93,8 +92,11 @@ import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.verify; @@ -256,7 +258,7 @@ public void testTwoFilesAndContinueAcrossCommits(String extension) throws IOExce List numPartitions = Arrays.asList(12, 2, 1); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Integer.class); verify(cloudObjectsSelectorCommon, atLeastOnce()).loadAsDataset(any(), any(), any(), eq(schemaProvider), argumentCaptor.capture()); - Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues()); + assertEquals(numPartitions, argumentCaptor.getAllValues()); } @ParameterizedTest @@ -314,8 +316,8 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, } else { numPartitions = Arrays.asList(23, sourcePartitions); } - Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues()); - Assertions.assertEquals(numPartitions, argumentCaptorForMetrics.getAllValues()); + assertEquals(numPartitions, argumentCaptor.getAllValues()); + assertEquals(numPartitions, argumentCaptorForMetrics.getAllValues()); } /** @@ -347,12 +349,12 @@ void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion, Ho if (tableType == HoodieTableType.MERGE_ON_READ) { boolean hasLogFiles = Arrays.stream(fs().listStatus(new Path(basePath()))) .anyMatch(f -> f.getPath().getName().contains(".log.")); - Assertions.assertTrue(hasLogFiles, "Expected log files in the MOR source meta-table"); + assertTrue(hasLogFiles, "Expected log files in the MOR source meta-table"); } TypedProperties props = setProps(READ_UPTO_LATEST_COMMIT); props.setProperty(CloudSourceConfig.ENABLE_EXISTS_CHECK.key(), "false"); - when(cloudObjectsSelectorCommon.loadAsDataset(any(), any(), any(), eq(schemaProvider), org.mockito.Mockito.anyInt())) + when(cloudObjectsSelectorCommon.loadAsDataset(any(), any(), any(), eq(schemaProvider), org.mockito.anyInt())) .thenReturn(Option.empty()); when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); @@ -367,7 +369,7 @@ props, jsc(), spark(), Checkpoint resumeFrom = new StreamerCheckpointV1(startCommit + "#name/file-02.json"); Pair>, Checkpoint> result = incrSource.fetchNextBatch(Option.of(resumeFrom), 250L); - Assertions.assertEquals( + assertEquals( new StreamerCheckpointV1(startCommit + "#name/file-04.json"), result.getRight(), "Next batch must continue within the start commit, not advance to a bare instant."); @@ -376,14 +378,14 @@ props, jsc(), spark(), @SuppressWarnings("unchecked") ArgumentCaptor> captor = ArgumentCaptor.forClass((Class) List.class); verify(cloudObjectsSelectorCommon).loadAsDataset( - any(), captor.capture(), any(), eq(schemaProvider), org.mockito.Mockito.anyInt()); + any(), captor.capture(), any(), eq(schemaProvider), org.mockito.anyInt()); List selectedPaths = captor.getValue().stream() .map(CloudObjectMetadata::getPath) .sorted() .collect(Collectors.toList()); - Assertions.assertEquals(2, selectedPaths.size(), "Expected file-03 and file-04, got: " + selectedPaths); - Assertions.assertTrue(selectedPaths.get(0).endsWith("/name/file-03.json"), selectedPaths.get(0)); - Assertions.assertTrue(selectedPaths.get(1).endsWith("/name/file-04.json"), selectedPaths.get(1)); + assertEquals(2, selectedPaths.size(), "Expected file-03 and file-04, got: " + selectedPaths); + assertTrue(selectedPaths.get(0).endsWith("/name/file-03.json"), selectedPaths.get(0)); + assertTrue(selectedPaths.get(1).endsWith("/name/file-04.json"), selectedPaths.get(1)); } @Test @@ -448,8 +450,8 @@ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingChe Option> datasetOpt = dataAndCheckpoint.getLeft(); Checkpoint nextCheckPoint = dataAndCheckpoint.getRight(); - Assertions.assertNotNull(nextCheckPoint); - Assertions.assertEquals(new StreamerCheckpointV1(expectedCheckpoint), nextCheckPoint); + assertNotNull(nextCheckPoint); + assertEquals(new StreamerCheckpointV1(expectedCheckpoint), nextCheckPoint); } private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index 691f4e44376dd..b074329e28e97 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -39,7 +39,6 @@ import org.apache.hadoop.fs.Path; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -47,7 +46,6 @@ import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import java.io.IOException; @@ -59,8 +57,12 @@ import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -100,7 +102,7 @@ public void testOneFileInCommit() throws IOException { Dataset inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs); - when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())).thenReturn(Option.empty()); + when(mockCloudObjectsSelectorCommon.loadAsDataset(any(), any(), any(), eq(schemaProvider), anyInt())).thenReturn(Option.empty()); when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json"); @@ -125,7 +127,7 @@ public void testTwoFilesAndContinueInSameCommit() throws IOException { Dataset inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs); - when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())).thenReturn(Option.empty()); + when(mockCloudObjectsSelectorCommon.loadAsDataset(any(), any(), any(), eq(schemaProvider), anyInt())).thenReturn(Option.empty()); when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 250L, "1#path/to/file2.json"); @@ -165,7 +167,7 @@ public void testTwoFilesAndContinueAcrossCommits(String extension) throws IOExce Dataset inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs); - when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())).thenReturn(Option.empty()); + when(mockCloudObjectsSelectorCommon.loadAsDataset(any(), any(), any(), eq(schemaProvider), anyInt())).thenReturn(Option.empty()); when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, @@ -230,7 +232,7 @@ public void testFilterAnEntireCommit(boolean useSourceProfile) throws IOExceptio setMockQueryRunner(inputDs); SourceProfile sourceProfile = new TestSourceProfile(50L, 0, 10L); - when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())).thenReturn(Option.empty()); + when(mockCloudObjectsSelectorCommon.loadAsDataset(any(), any(), any(), eq(schemaProvider), anyInt())).thenReturn(Option.empty()); if (useSourceProfile) { when(sourceProfileSupplier.getSourceProfile()).thenReturn(sourceProfile); } else { @@ -268,7 +270,7 @@ public void testFilterAnEntireMiddleCommit(boolean useSourceProfile) throws IOEx Dataset inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs); - when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())).thenReturn(Option.empty()); + when(mockCloudObjectsSelectorCommon.loadAsDataset(any(), any(), any(), eq(schemaProvider), anyInt())).thenReturn(Option.empty()); SourceProfile sourceProfile = new TestSourceProfile(50L, 0, 10L); if (useSourceProfile) { when(sourceProfileSupplier.getSourceProfile()).thenReturn(sourceProfile); @@ -314,7 +316,7 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, Dataset inputDs = generateDataset(filePathSizeAndCommitTime); setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint)); - when(mockCloudObjectsSelectorCommon.loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())).thenReturn(Option.empty()); + when(mockCloudObjectsSelectorCommon.loadAsDataset(any(), any(), any(), eq(schemaProvider), anyInt())).thenReturn(Option.empty()); TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix", "path/to/skip"); typedProperties.setProperty("hoodie.streamer.source.cloud.data.select.relative.path.regex", "path/to/file[0-9]+"); @@ -344,7 +346,7 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, // Verify the partitions being passed in getCloudObjectDataDF are correct. ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Integer.class); ArgumentCaptor argumentCaptorForMetrics = ArgumentCaptor.forClass(Integer.class); - verify(mockCloudObjectsSelectorCommon, atLeastOnce()).loadAsDataset(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), argumentCaptor.capture()); + verify(mockCloudObjectsSelectorCommon, atLeastOnce()).loadAsDataset(any(), any(), any(), eq(schemaProvider), argumentCaptor.capture()); verify(metrics, atLeastOnce()).updateStreamerSourceParallelism(argumentCaptorForMetrics.capture()); List numPartitions; if (snapshotCheckPoint.equals("1") || snapshotCheckPoint.equals("2")) { @@ -352,8 +354,8 @@ public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, } else { numPartitions = Arrays.asList(23, sourcePartitions); } - Assertions.assertEquals(numPartitions, argumentCaptor.getAllValues()); - Assertions.assertEquals(numPartitions, argumentCaptorForMetrics.getAllValues()); + assertEquals(numPartitions, argumentCaptor.getAllValues()); + assertEquals(numPartitions, argumentCaptorForMetrics.getAllValues()); } /** @@ -385,15 +387,15 @@ void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion, Ho if (tableType == HoodieTableType.MERGE_ON_READ) { boolean hasLogFiles = Arrays.stream(fs().listStatus(new Path(basePath()))) .anyMatch(f -> f.getPath().getName().contains(".log.")); - Assertions.assertTrue(hasLogFiles, "Expected log files in the MOR source meta-table"); + assertTrue(hasLogFiles, "Expected log files in the MOR source meta-table"); } TypedProperties props = setProps(READ_UPTO_LATEST_COMMIT); props.setProperty(CloudSourceConfig.ENABLE_EXISTS_CHECK.key(), "false"); - Mockito.when(mockCloudObjectsSelectorCommon.loadAsDataset( - Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt())) + when(mockCloudObjectsSelectorCommon.loadAsDataset( + any(), any(), any(), eq(schemaProvider), anyInt())) .thenReturn(Option.empty()); - Mockito.when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); + when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); // Real QueryRunner so the actual Spark incremental read against the on-disk meta-table runs. S3EventsHoodieIncrSource incrSource = new S3EventsHoodieIncrSource( @@ -406,7 +408,7 @@ props, jsc(), spark(), Checkpoint resumeFrom = new StreamerCheckpointV1(startCommit + "#path/to/file-02.json"); Pair>, Checkpoint> result = incrSource.fetchNextBatch(Option.of(resumeFrom), 250L); - Assertions.assertEquals( + assertEquals( new StreamerCheckpointV1(startCommit + "#path/to/file-04.json"), result.getRight(), "Next batch must continue within the start commit, not advance to a bare instant."); @@ -415,14 +417,14 @@ props, jsc(), spark(), @SuppressWarnings("unchecked") ArgumentCaptor> captor = ArgumentCaptor.forClass((Class) List.class); verify(mockCloudObjectsSelectorCommon).loadAsDataset( - Mockito.any(), captor.capture(), Mockito.any(), Mockito.eq(schemaProvider), Mockito.anyInt()); + any(), captor.capture(), any(), eq(schemaProvider), anyInt()); List selectedPaths = captor.getValue().stream() .map(CloudObjectMetadata::getPath) .sorted() .collect(java.util.stream.Collectors.toList()); - Assertions.assertEquals(2, selectedPaths.size(), "Expected file-03 and file-04, got: " + selectedPaths); - Assertions.assertTrue(selectedPaths.get(0).endsWith("/path/to/file-03.json"), selectedPaths.get(0)); - Assertions.assertTrue(selectedPaths.get(1).endsWith("/path/to/file-04.json"), selectedPaths.get(1)); + assertEquals(2, selectedPaths.size(), "Expected file-03 and file-04, got: " + selectedPaths); + assertTrue(selectedPaths.get(0).endsWith("/path/to/file-03.json"), selectedPaths.get(0)); + assertTrue(selectedPaths.get(1).endsWith("/path/to/file-04.json"), selectedPaths.get(1)); } @Test From 4a9434032143adafb593e3cbb62041bdb37e00c9 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Wed, 10 Jun 2026 19:17:19 -0700 Subject: [PATCH 17/18] Fix broken fully-qualified Mockito.anyInt call --- .../hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index b843eaf3113a6..c9b5286f9c778 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -354,7 +354,7 @@ void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion, Ho TypedProperties props = setProps(READ_UPTO_LATEST_COMMIT); props.setProperty(CloudSourceConfig.ENABLE_EXISTS_CHECK.key(), "false"); - when(cloudObjectsSelectorCommon.loadAsDataset(any(), any(), any(), eq(schemaProvider), org.mockito.anyInt())) + when(cloudObjectsSelectorCommon.loadAsDataset(any(), any(), any(), eq(schemaProvider), anyInt())) .thenReturn(Option.empty()); when(sourceProfileSupplier.getSourceProfile()).thenReturn(null); @@ -378,7 +378,7 @@ props, jsc(), spark(), @SuppressWarnings("unchecked") ArgumentCaptor> captor = ArgumentCaptor.forClass((Class) List.class); verify(cloudObjectsSelectorCommon).loadAsDataset( - any(), captor.capture(), any(), eq(schemaProvider), org.mockito.anyInt()); + any(), captor.capture(), any(), eq(schemaProvider), anyInt()); List selectedPaths = captor.getValue().stream() .map(CloudObjectMetadata::getPath) .sorted() From bb7b1afdc798d0ada825dfe2db9fbf356c8bb42e Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Wed, 10 Jun 2026 22:05:53 -0700 Subject: [PATCH 18/18] Use timestamp-format commit times in mid-commit pagination tests The incremental read path now normalizes START_COMMIT and END_COMMIT via HoodieSqlCommonUtils.formatIncrementalInstant, which rejects single-digit instant times. --- .../utilities/sources/TestGcsEventsHoodieIncrSource.java | 6 ++++-- .../utilities/sources/TestS3EventsHoodieIncrSource.java | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index c9b5286f9c778..6dd8b7035d806 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -336,8 +336,10 @@ void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion, Ho tableProps.put(WRITE_TABLE_VERSION.key(), sourceTableVersion); metaClient = getHoodieMetaClient(storageConf(), basePath(), tableProps, tableType); - String startCommit = "1"; - String laterCommit = "2"; + // timestamp-format instants: the incremental read normalizes START_COMMIT/END_COMMIT + // through HoodieSqlCommonUtils.formatIncrementalInstant, which rejects other formats + String startCommit = "20260601000001"; + String laterCommit = "20260601000002"; writeGcsMetadataRecords(startCommit, Arrays.asList( Pair.of("name/file-01.json", 100L), Pair.of("name/file-02.json", 100L), diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index b074329e28e97..4a0bca4f9aaf1 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -374,8 +374,10 @@ void testRealQueryRunnerResumesMidCommitPagination(String sourceTableVersion, Ho tableProps.put(WRITE_TABLE_VERSION.key(), sourceTableVersion); metaClient = getHoodieMetaClient(storageConf(), basePath(), tableProps, tableType); - String startCommit = "1"; - String laterCommit = "2"; + // timestamp-format instants: the incremental read normalizes START_COMMIT/END_COMMIT + // through HoodieSqlCommonUtils.formatIncrementalInstant, which rejects other formats + String startCommit = "20260601000001"; + String laterCommit = "20260601000002"; writeS3MetadataRecords(startCommit, Arrays.asList( Pair.of("path/to/file-01.json", 100L), Pair.of("path/to/file-02.json", 100L),