fix(streamer): Include start commit in S3/GCS IncrSource incremental query#18949
Draft
yihua wants to merge 18 commits into
Draft
fix(streamer): Include start commit in S3/GCS IncrSource incremental query#18949yihua wants to merge 18 commits into
yihua wants to merge 18 commits into
Conversation
0f4d233 to
2917bb5
Compare
2917bb5 to
199e19c
Compare
…query S3/GCS cloud-object incremental sources can silently drop records whenever a previous batch persisted a commit#fileKey mid-commit-pagination checkpoint (i.e., the prior batch hit sourceLimit before exhausting the start commit's files). Files in the start commit after the checkpoint key become unreachable, and the persisted checkpoint advances past them as a bare instant. Root cause: QueryRunner.runIncrementalQuery passes queryInfo.getStartInstant() as the Spark START_COMMIT. The Spark incremental relation's findInstantsInRange is (start, end] (start-exclusive), so the start commit is dropped from the scan. The downstream (commit_time || object_key) > 'commit#fileKey' filter then matches nothing in the start commit, and filterAndGenerateCheckpointBasedOnSourceLimit falls through its empty-batch branch, emitting endInstant as bare with no #fileKey suffix. The next batch resumes past the gap. Fix: pass queryInfo.getPreviousInstant() so the resulting scan range (previousInstant, end] includes the start commit (startInstant) while preserving start-exclusive relation semantics. Required for cloud-object sources whose commit#fileKey pagination depends on re-reading the start commit to find files past the persisted key. Adds testRealQueryRunnerResumesMidCommitPagination to both TestS3EventsHoodieIncrSource and TestGcsEventsHoodieIncrSource. The new tests exercise a real QueryRunner against an on-disk Hudi events meta-table, resuming from a mid-commit commit#fileKey checkpoint with sourceLimit smaller than the remaining files. They assert both the next persisted checkpoint and the exact files passed downstream (via captor on loadAsDataset). The existing tests mocked QueryRunner.run() to return inputDs unfiltered for incremental queries and could not catch a START_COMMIT-handling regression.
199e19c to
b95efa8
Compare
Long.parseLong(startCommit) - 1 was producing a previousInstant string of shorter length than the real timeline instants, so findInstantsInRange's lexicographic compare excluded the start commit and the empty-batch path silently advanced the checkpoint past it.
The V1 incremental relation (where the QueryRunner fix actually takes effect) is only chosen when the source table version is < 8. The test harness defaults to v8, which routed the test through the V2 relation and broke the assertion.
…ryRunner
Cloud event incremental sources (S3/GCS) always use V1 checkpoint
(commit#fileKey, requested-time). They should always route through the V1
incremental relation regardless of the source meta-table's actual version.
Parameterizes the regression test on source version {6, 8} to cover both.
The S3 metadata test schema lacked a top-level partition_path field, so the V1 incremental relation's partition-schema lookup failed when the test ran a real read against the on-disk meta-table. Mirroring the GCS test schema.
…umns are pruned Incremental span filters on _hoodie_commit_time are enforced via parquet push-down, which drops all rows when Spark prunes the column from the scan schema (count(), isEmpty()). Read filter-referenced columns regardless of Spark's pruning and project them away after filtering. Re-enable the file group reader in QueryRunner. Document the getMandatoryFields contract and prune declarations not backed by required filters. Cover COW and MOR (non-partitioned, unchanged schema) source meta-tables in the S3/GCS mid-commit pagination tests.
The fix derives the extra read columns from the required filters only, so the declaration cleanup and contract docs can land separately.
Write records to a partition path consistent with the table config so the non-partitioned meta-tables list correctly, and re-write an existing key in the second commit so MOR runs produce log files. Expand the file group reader incremental test to cover COW/MOR x source table v6/v8 x v6/v8 reads, with preconditions on small-file handling and base/log file layout, incremental ranges over base-only, base-plus-log, log-only, and empty windows, and select *, narrow projection, count(), and isEmpty() per range.
The incremental read path now normalizes START_COMMIT and END_COMMIT via HoodieSqlCommonUtils.formatIncrementalInstant, which rejects single-digit instant times.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18949 +/- ##
=============================================
- Coverage 68.26% 53.66% -14.60%
+ Complexity 29500 21865 -7635
=============================================
Files 2542 2452 -90
Lines 142618 132467 -10151
Branches 17790 15525 -2265
=============================================
- Hits 97352 71084 -26268
- Misses 37261 55745 +18484
+ Partials 8005 5638 -2367
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
nsivabalan
added a commit
to prashantwason/incubator-hudi
that referenced
this pull request
Jun 17, 2026
…cremental happy-path test The NOTE previously claimed the underlying cause was all-null record-key column statistics; investigation showed the actual cause is Spark column pruning on the incremental relation: HoodieFileGroupReaderBasedFileFormat hides its requiredFilters (the commit-time span predicate) from Catalyst, so Spark drops _hoodie_commit_time from the scan schema for count()/isEmpty()/projections-without-meta. Parquet then evaluates the predicate against missing columns as all-null and returns 0 rows. The bug is pre-existing and reproduces without selective meta-field exclusion - this test just happens to be the first one to trip it. Tracking fix: apache#18949 (augments the read schema with filter-only columns and projects them away after filtering). Locally validated: with that PR's HoodieFileGroupReaderBasedFileFormat diff applied, both CoW (v6/v9) and MoR (v6/v9) happy-path tests pass with the count() assertion uncommented. Reverted the production change here since PR apache#18949 owns it; left the assertEquals(2, count()) commented out with a reference so we re-enable it once apache#18949 lands. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Describe the issue this Pull Request addresses
S3/GCS cloud-object incremental sources can silently drop records whenever a previous batch persisted a
commit#fileKeymid-commit-pagination checkpoint (i.e., the prior batch hitsourceLimitbefore exhausting the start commit's files). Files in the start commit after the checkpoint key become unreachable, and the persisted checkpoint advances past them as a bare instant.Affected sources and conditions:
S3EventsHoodieIncrSource/GcsEventsHoodieIncrSourcehoodie.deltastreamer.read.source.limitCommon triggers: cold-start backfills against a source table with a big initial commit, bursty event writers, low
sourceLimitoverrides. Steady-state streams whose upstream commits fit withinsourceLimitare unaffected.HoodieIncrSource(non-cloud) does not go throughQueryRunnerand is unaffected.Summary and Changelog
Root cause 1: start commit dropped from the scan.
QueryRunner.runIncrementalQuerypassesqueryInfo.getStartInstant()as the SparkSTART_COMMIT. The Spark V1 incremental relation'sfindInstantsInRangeis(start, end](start-exclusive), so the start commit is dropped from the scan. The downstream(commit_time || object_key) > 'commit#fileKey'filter inIncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimitthen matches nothing in the start commit, the empty-batch branch fires, and the new checkpoint is emitted asendInstantwith no#fileKeysuffix. The next batch resumes past the gap. Regression introduced byab4c7774a6 [HUDI-8141] Support incremental query with completion time (#11947).Root cause 2: file-group-reader incremental scans return no rows when
_hoodie_commit_timeis pruned. The incremental relations enforce the commit-time span filters only via parquet filter push-down insideHoodieFileGroupReaderBasedFileFormat. When Spark's column pruning drops_hoodie_commit_timefrom the scan schema, which happens forcount(),isEmpty()(the first operation the cloud sources run on the batch), and any projection without meta fields, parquet evaluates the predicate against a missing column as all-null and filters out every row, so the batch looks empty and the checkpoint again advances as a bare instant. Sessions withHoodieSparkSessionExtensionare shielded becauseAdaptIngestionTargetLogicalRelationsinjects the same filters into the logical plan; sessions without the extension relied solely on the broken push-down path. Affects both V1 and V2 incremental relations through the file group reader.Fix.
QueryRunner): setINCREMENTAL_READ_TABLE_VERSION = 6unconditionally. Cloud event sources always use V1 checkpoints (commit#fileKey, requested-time) externally, so they always route through the V1 incremental relation internally.previousInstant(notstartInstant) toSTART_COMMIT(QueryRunner): with start-exclusive semantics, the scan range(previousInstant, end]still includes the start commit needed forcommit#fileKeypagination.HoodieFileGroupReaderBasedFileFormat): columns referenced by the relation's required filters but pruned from the scan schema are now read from the file and projected away after filtering. This restores format-level enforcement that HUDI-6658 moved to theHoodieSparkSessionExtension-gated plan-injection rule (leaving sessions without the extension unprotected) and HUDI-7567 removed the last remnants of. Snapshot/CDC reads have no required filters, so their read path is unchanged.Tests.
testRealQueryRunnerResumesMidCommitPaginationin bothTestS3EventsHoodieIncrSourceandTestGcsEventsHoodieIncrSource(@ParameterizedTestover source meta-table versions 6/8 x COW/MOR) exercises a realQueryRunneragainst an on-disk events meta-table with the file group reader enabled, resumes from a mid-commitcommit#fileKeycheckpoint withsourceLimitsmaller than the remaining files, and asserts both the next persisted checkpoint and the exact files passed downstream. The source meta-table is written non-partitioned with the production S3 events schema (no test-only schema changes); the MOR runs assert the second commit's records land in log files so the read exercises log merging. SupportingS3EventsHoodieIncrSourceHarnesschanges: non-partitioned meta-table writes, multi-record commits, and using the table's commit action type so MOR writes commit as delta commits.TestIncrementalReadWithFileGroupReadercovers {COW, MOR} x {source v6 + v6 read, source v8 + v6 read, source v8 + v8 read} on a session withoutHoodieSparkSessionExtension(no plan-injected filters; the file format alone enforces the span filters). Each run writes 3 insert commits (small-file handling keeps a single file group with base files only, validated as a precondition) and 3 update commits (one log file each on MOR, base rewrites on COW, also validated), then queries incremental ranges over base-only (multi-file and single-file), base-plus-some-logs, logs-only, and empty windows, validating the exact row set forselect *plus a narrow projection,count(), andisEmptyper range. One key is updated in every update commit, so a range spanning several updates must surface it exactly once with the latest in-range value; a precondition also asserts the latest base file retains rows from multiple commits: the carried-over rows the span filters must exclude. Without the fix, the pruned-schema shapes fail: COWcount()returns 0; MORcount()returns only log-merged records with base-file rows silently dropped.Impact
Behavior change is contained to incremental reads:
commit#fileKeycheckpoint contract regardless of source table version.commit#fileKeymid-commit-pagination resumes now re-include the start commit in the scan (fixes the silent data drop)._hoodie_commit_time(e.g.,count(),isEmpty()), for both V1 and V2 relations.No API, config, or on-disk format changes;
HoodieIncrSource(non-cloud) is untouched.Risk Level
low
Documentation Update
none
Contributor's checklist