fix(flink): reuse the preceeding avg size if there is no eligible estimation#19022
Conversation
| } | ||
| } | ||
| log.info("Refresh average bytes per record => " + avgSize); | ||
| return avgSize; |
There was a problem hiding this comment.
With the log.info above removed, DeltaWriteProfile has no remaining log usage, so its class-level @Slf4j annotation and the import lombok.extern.slf4j.Slf4j (both added in #18991 for that line) are now dead. The inherited recordProfile() still logs through WriteProfile's own logger. Drop the annotation and the import from DeltaWriteProfile.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR preserves the previously-learned average record size in WriteProfile and DeltaWriteProfile when a reload finds no eligible commit/delta-commit metadata, avoiding regressions in recordsPerBucket after the profile has already learned a better value. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A minor readability nit on the double getter call in DeltaWriteProfile; otherwise the changes are clean.
cc @yihua
| @Override | ||
| protected long averageBytesPerRecord() { | ||
| long avgSize = config.getCopyOnWriteRecordSizeEstimate(); | ||
| long avgSize = getAvgSize() > 0 ? getAvgSize() : config.getCopyOnWriteRecordSizeEstimate(); |
There was a problem hiding this comment.
🤖 nit: getAvgSize() is invoked twice in the ternary — could you cache it in a local first (e.g. long prevAvgSize = getAvgSize();) so the intent reads in one pass? The sibling WriteProfile.averageBytesPerRecord() avoids the double-touch by referencing this.avgSize directly, which makes the fallback logic a bit easier to scan.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR makes Flink's WriteProfile/DeltaWriteProfile preserve the last successfully profiled average record size when a later refresh has no eligible commit metadata, instead of reverting to the configured default. The change is well-scoped and the new tests cover both COW and MOR reload paths. No correctness issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. One small suggestion to deduplicate the fallback expression.
cc @yihua
| @Override | ||
| protected long averageBytesPerRecord() { | ||
| long avgSize = config.getCopyOnWriteRecordSizeEstimate(); | ||
| long avgSize = this.avgSize > 0 ? this.avgSize : config.getCopyOnWriteRecordSizeEstimate(); |
There was a problem hiding this comment.
🤖 nit: the this.avgSize > 0 ? this.avgSize : config.getCopyOnWriteRecordSizeEstimate() initializer is now identical in both WriteProfile.averageBytesPerRecord() and here — could you pull it into a small protected helper on WriteProfile (e.g. initialAvgSizeEstimate()) so the two overrides stay in sync if the fallback logic ever changes?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Describe the issue this Pull Request addresses
Flink
WriteProfileandDeltaWriteProfilerefresh average record size from commit or delta-commit metadata. When the current timeline has no eligible commit metadata for size estimation, the refresh path could fall back to the configured default record size estimate, even if the profile had already learned a better value from a preceding eligible commit.This can make
recordsPerBucketjump back to the default estimate after a reload, affecting insert bucket sizing for Flink COW and MOR writes. This PR keeps the preceding profiled average size when a later refresh cannot derive a valid estimate from eligible commits or delta commits.Summary and Changelog
Flink write profiles now preserve the last successful average record size estimate across reloads when no eligible commit metadata is available for the current estimation pass.
WriteProfile.averageBytesPerRecord()to seed estimation from the existingavgSizeonce initialized, falling back toCOPY_ON_WRITE_RECORD_SIZE_ESTIMATEonly before any profiled value exists.DeltaWriteProfile.averageBytesPerRecord()with the same reuse behavior for MOR commit and delta-commit estimation.WriteProfile.recordProfile()so the refreshed value is logged once after the profile is updated.TestBucketAssignerfor:testWriteProfileReusesPreviousAvgSizeWhenNoEligibleCommitOnReloadtestDeltaWriteProfileReusesPreviousAvgSizeWhenNoEligibleDeltaCommitOnReloadImpact
This affects Flink write bucket sizing for COW and MOR tables when profile reloads encounter commits or delta commits that are not eligible for record-size estimation. The change avoids reverting to the configured default estimate after a better preceding estimate has already been learned.
There are no public API, config, storage format, or compatibility changes. Performance impact is negligible; the change only reuses an already cached in-memory value during profile refresh.
Risk Level
low
The change is limited to Flink
WriteProfileandDeltaWriteProfileaverage record size estimation. The main behavioral risk is retaining a stale estimate longer when no eligible metadata exists, but that is the intended behavior and is safer than reverting to a less accurate configured default. Once eligible commit metadata is available again, the estimate is refreshed from metadata as before.Verified with:
mvn -pl hudi-flink-datasource/hudi-flink -am -Dtest=TestBucketAssigner -Dsurefire.failIfNoSpecifiedTests=false test -DskipITsResult:
TestBucketAssignerran 16 tests with 0 failures.Documentation Update
none
This is an internal Flink write profile estimation fix with no new user-facing configuration, API, or documented behavior change.
Contributor's checklist