Skip to content

fix(flink): fix the mor small file record size estimation#18991

Merged
danny0405 merged 3 commits into
apache:masterfrom
danny0405:fix-mor-profile
Jun 16, 2026
Merged

fix(flink): fix the mor small file record size estimation#18991
danny0405 merged 3 commits into
apache:masterfrom
danny0405:fix-mor-profile

Conversation

@danny0405

@danny0405 danny0405 commented Jun 13, 2026

Copy link
Copy Markdown
Contributor

Describe the issue this Pull Request addresses

Flink MERGE_ON_READ write profiling estimates how many records can fit into small-file and insert buckets based on average record size. For MOR tables, commit metadata may include log-file bytes, which need to be normalized to estimated parquet size using hoodie.logfile.to.parquet.compression.ratio. Without that adjustment, Flink can overestimate record size and under-fill buckets.

The write profile also only refreshed recordsPerBucket from profiled average record size when multi-write-on-same-instant was enabled. That made normal writes continue using the configured split-size fallback instead of the current profile-derived capacity.

Summary and Changelog

This PR updates Flink write profiling so MOR average record size uses the existing log-to-parquet compression ratio, and insert bucket capacity is refreshed from profiled record size consistently.

  • Added a fileSizeParquetCompressionRatio() hook in WriteProfile, defaulting to 1.
  • Overrode the hook in DeltaWriteProfile to use config.getLogFileToParquetCompressionRatio().
  • Applied the ratio when computing average bytes per record from commit metadata.
  • Refreshed recordsPerBucket from parquetMaxFileSize / avgSize unconditionally during profile construction/reload.
  • Updated TestBucketAssigner to cover profile-derived bucket sizing and MOR commit-metadata sizing with compression ratio.

Impact

This affects Flink sink bucket assignment for Hudi write profiles, especially MERGE_ON_READ tables. MOR small-file and insert bucket sizing should better reflect estimated compacted parquet size.

There is no public API change, storage format change, config rename, or compatibility break. Existing configuration is reused.

Risk Level

medium

The change is in Flink write-path sizing logic and can affect how records are assigned to insert buckets and small files. The risk is mitigated by targeted coverage in TestBucketAssigner, including a real MOR write commit and validation that average record size and records-per-bucket use the compression-adjusted metadata estimate.

Validated with:

mvn -pl hudi-flink-datasource/hudi-flink -am -Dtest=TestBucketAssigner -Dsurefire.failIfNoSpecifiedTests=false test

Documentation Update

none

  • No new config, API, or user-facing option is introduced.
  • This PR corrects internal use of the existing MOR sizing configuration.

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions Bot added the size:S PR with lines of changes in (10, 100] label Jun 13, 2026

@hudi-agent hudi-agent left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! The change tightens MOR write profiling by normalizing log bytes via the existing log-to-parquet compression ratio and always refreshing recordsPerBucket from profiled record size — both changes line up with what DeltaWriteProfile.getTotalFileSize already does for small-file sizing, so the downstream SmallFileAssignState arithmetic stays consistent. A couple of edge cases worth double-checking in the inline comments, in addition to the points the other reviewers have already raised. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small naming and literal-type nits below; the overall change is clean and well-structured.

@@ -228,10 +232,8 @@ private void cleanMetadataCache(Stream<HoodieInstant> instants) {

private void recordProfile() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Removing the shouldAllowMultiWriteOnSameInstant gate means COPY_ON_WRITE_INSERT_SPLIT_SIZE is now silently ignored on the Flink side, while the Spark UpsertPartitioner still honors it when shouldAutoTuneInsertSplits() is false. Was the intent to fully deprecate that config for Flink, or should this be gated on shouldAutoTuneInsertSplits() (defaulting to true) to mirror Spark? @danny0405

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@hudi-agent hudi-agent left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the updates! The rename to fileSizeCalibrationRatio() and the 1D literal address the prior nit/readability comments, and the new logFileToParquetCompressionRatio() helper plus accompanying test correctly skips the compression ratio when log blocks are already stored as parquet. The two open items from the prior round — wombatu-kun's notes on recordsPerBucket being set unconditionally and on MOR commits that mix base-parquet and log bytes (e.g., bootstrap inserts), plus the shouldAllowMultiWriteOnSameInstant question — are not addressed in this revision and worth a committer's judgment. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here.

@wombatu-kun

Copy link
Copy Markdown
Contributor

flaky trino CI should be fixed by #19004

@codecov-commenter

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 88.63636% with 5 lines in your changes missing coverage. Please review.
✅ Project coverage is 67.64%. Comparing base (612e327) to head (5a35831).
⚠️ Report is 18 commits behind head on master.

Files with missing lines Patch % Lines
...he/hudi/sink/partitioner/profile/WriteProfile.java 87.50% 1 Missing and 2 partials ⚠️
...di/sink/partitioner/profile/DeltaWriteProfile.java 90.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18991      +/-   ##
============================================
- Coverage     68.24%   67.64%   -0.61%     
- Complexity    29478    29795     +317     
============================================
  Files          2542     2562      +20     
  Lines        142541   145178    +2637     
  Branches      17798    18337     +539     
============================================
+ Hits          97281    98204     +923     
- Misses        37254    38753    +1499     
- Partials       8006     8221     +215     
Flag Coverage Δ
common-and-other-modules 44.78% <88.63%> (+0.01%) ⬆️
hadoop-mr-java-client 44.72% <ø> (-0.03%) ⬇️
spark-client-hadoop-common 48.31% <ø> (+0.24%) ⬆️
spark-java-tests 48.30% <ø> (-0.46%) ⬇️
spark-scala-tests 44.47% <ø> (-0.34%) ⬇️
utilities 37.22% <ø> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...di/sink/partitioner/profile/DeltaWriteProfile.java 92.15% <90.00%> (-1.60%) ⬇️
...he/hudi/sink/partitioner/profile/WriteProfile.java 95.18% <87.50%> (+3.82%) ⬆️

... and 72 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

}
}
}
log.info("Refresh average bytes per record => " + avgSize);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

log.info("Refresh average bytes per record => {}", avgSize);

log.info("Refresh insert records per bucket => " + recordsPerBucket);
}
this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize;
log.info("Refresh insert records per bucket => " + recordsPerBucket);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

log.info("Refresh insert records per bucket => {}", recordsPerBucket);

@hudi-bot

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@github-actions github-actions Bot added size:M PR with lines of changes in (100, 300] and removed size:S PR with lines of changes in (10, 100] labels Jun 16, 2026
@danny0405 danny0405 merged commit e853440 into apache:master Jun 16, 2026
136 of 137 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:M PR with lines of changes in (100, 300]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants