From efec1c41750ca2e31fc30859a8695c9c8af8fe40 Mon Sep 17 00:00:00 2001 From: danny0405 Date: Tue, 16 Jun 2026 21:35:55 +0800 Subject: [PATCH 1/2] fix(flink): reuse the preceeding avg size if there is no eligible estimation --- .../profile/DeltaWriteProfile.java | 3 +- .../partitioner/profile/WriteProfile.java | 6 +- .../sink/partitioner/TestBucketAssigner.java | 85 +++++++++++++++++++ 3 files changed, 89 insertions(+), 5 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java index a79c78e6d3639..66d7c005c5c2b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java @@ -93,7 +93,7 @@ protected List smallFilesProfile(String partitionPath) { @Override protected long averageBytesPerRecord() { - long avgSize = config.getCopyOnWriteRecordSizeEstimate(); + long avgSize = getAvgSize() > 0 ? getAvgSize() : config.getCopyOnWriteRecordSizeEstimate(); HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants(); if (!commitTimeline.empty()) { long sizeFromCommitMetadata = calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D); @@ -109,7 +109,6 @@ protected long averageBytesPerRecord() { } } } - log.info("Refresh average bytes per record => " + avgSize); return avgSize; } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java index c3819bf081166..6ceb45431e18e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java @@ -134,7 +134,7 @@ public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) * records pack into one file. */ protected long averageBytesPerRecord() { - long avgSize = config.getCopyOnWriteRecordSizeEstimate(); + long avgSize = this.avgSize > 0 ? this.avgSize : config.getCopyOnWriteRecordSizeEstimate(); HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants(); if (!commitTimeline.empty()) { long sizeFromCommitMetadata = calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D); @@ -142,7 +142,6 @@ protected long averageBytesPerRecord() { avgSize = sizeFromCommitMetadata; } } - log.info("Refresh average bytes per record => " + avgSize); return avgSize; } @@ -238,8 +237,9 @@ private void cleanMetadataCache(Stream instants) { private void recordProfile() { this.avgSize = averageBytesPerRecord(); + log.info("Refresh average bytes per record => {}", avgSize); this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize; - log.info("Refresh insert records per bucket => " + recordsPerBucket); + log.info("Refresh insert records per bucket => {}", recordsPerBucket); } /** diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java index 8281ebbd86e4f..3b5c265f7c916 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; @@ -46,11 +47,13 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayDeque; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Queue; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -444,6 +447,26 @@ public void testWriteProfileRecordsPerBucketUsesProfiledRecordSizeWithSmallEstim writeProfile.getRecordsPerBucket(), is(writeConfig.getParquetMaxFileSize() / expectedAvgSize)); } + @Test + public void testWriteProfileReusesPreviousAvgSizeWhenNoEligibleCommitOnReload() throws Exception { + conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1); + conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), "1024"); + TestData.writeData(TestData.DATA_SET_INSERT, conf); + + writeConfig = FlinkWriteClients.getHoodieClientConfig(conf); + setScriptedRecordSizes(512L, -1L); + WriteProfile writeProfile = new ScriptedRecordSizeWriteProfile(writeConfig, context); + assertThat("Average record size should use the profiled commit metadata", + writeProfile.getAvgSize(), is(512L)); + + writeProfile.reload(1); + + assertThat("Average record size should reuse the previous estimate when no eligible commit metadata is found", + writeProfile.getAvgSize(), is(512L)); + assertThat("Records per bucket should continue to use the previous estimate", + writeProfile.getRecordsPerBucket(), is(writeConfig.getParquetMaxFileSize() / 512L)); + } + @Test public void testDeltaWriteProfileRecordsPerBucketUsesCompressionRatio() throws Exception { File morPath = new File(tempFile, "mor"); @@ -505,6 +528,34 @@ public void testDeltaWriteProfileRecordsPerBucketSkipsCompressionRatioForParquet writeProfile.getRecordsPerBucket(), is(morWriteConfig.getParquetMaxFileSize() / expectedAvgSize)); } + @Test + public void testDeltaWriteProfileReusesPreviousAvgSizeWhenNoEligibleDeltaCommitOnReload() throws Exception { + File morPath = new File(tempFile, "mor_reuse_previous_avg"); + Configuration morConf = TestConfigurations.getDefaultConf(morPath.getAbsolutePath()); + morConf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name()); + morConf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1); + morConf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), "1024"); + StreamerUtil.initTableIfNotExists(morConf); + TestData.writeData(TestData.DATA_SET_INSERT, morConf); + + HoodieWriteConfig morWriteConfig = FlinkWriteClients.getHoodieClientConfig(morConf); + HoodieFlinkEngineContext morContext = new HoodieFlinkEngineContext( + HadoopFSUtils.getStorageConf(HadoopConfigurations.getHadoopConf(morConf)), + new FlinkTaskContextSupplier(null)); + + setScriptedRecordSizes(256L, -1L); + DeltaWriteProfile writeProfile = new ScriptedRecordSizeDeltaWriteProfile(morWriteConfig, morContext); + assertThat("Average record size should use the profiled delta commit metadata", + writeProfile.getAvgSize(), is(256L)); + + writeProfile.reload(1); + + assertThat("Average record size should reuse the previous estimate when no eligible delta commit metadata is found", + writeProfile.getAvgSize(), is(256L)); + assertThat("Records per bucket should continue to use the previous estimate", + writeProfile.getRecordsPerBucket(), is(morWriteConfig.getParquetMaxFileSize() / 256L)); + } + private static String getLastCompleteInstant(WriteProfile profile) { return StreamerUtil.getLastCompletedInstant(profile.getMetaClient()); } @@ -526,6 +577,40 @@ private void assertBucketEquals( assertThat(bucketInfo.getBucketType(), is(bucketType)); } + private static Queue scriptedRecordSizes = new ArrayDeque<>(); + + private static void setScriptedRecordSizes(Long... recordSizes) { + scriptedRecordSizes = new ArrayDeque<>(Arrays.asList(recordSizes)); + } + + /** + * WriteProfile with scripted record size estimates. + */ + static class ScriptedRecordSizeWriteProfile extends WriteProfile { + ScriptedRecordSizeWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) { + super(config, context); + } + + @Override + protected long calculateRecordSizeThroughCommitMetadata(HoodieTimeline commitTimeline, double fileSizeCalibrationRatio) { + return scriptedRecordSizes.remove(); + } + } + + /** + * DeltaWriteProfile with scripted record size estimates. + */ + static class ScriptedRecordSizeDeltaWriteProfile extends DeltaWriteProfile { + ScriptedRecordSizeDeltaWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) { + super(config, context); + } + + @Override + protected long calculateRecordSizeThroughCommitMetadata(HoodieTimeline commitTimeline, double fileSizeCalibrationRatio) { + return scriptedRecordSizes.remove(); + } + } + /** * Mock BucketAssigner that can specify small files explicitly. */ From d425033c98906c6651b2784fdd271416a2c668a1 Mon Sep 17 00:00:00 2001 From: danny0405 Date: Wed, 17 Jun 2026 09:48:46 +0800 Subject: [PATCH 2/2] address review comments --- .../hudi/sink/partitioner/profile/DeltaWriteProfile.java | 5 +---- .../apache/hudi/sink/partitioner/profile/WriteProfile.java | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java index 66d7c005c5c2b..2cdee3453a467 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java @@ -29,8 +29,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.action.commit.SmallFile; -import lombok.extern.slf4j.Slf4j; - import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -41,7 +39,6 @@ * *

Note: assumes the index can always index log files for Flink write. */ -@Slf4j public class DeltaWriteProfile extends WriteProfile { public DeltaWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) { @@ -93,7 +90,7 @@ protected List smallFilesProfile(String partitionPath) { @Override protected long averageBytesPerRecord() { - long avgSize = getAvgSize() > 0 ? getAvgSize() : config.getCopyOnWriteRecordSizeEstimate(); + long avgSize = this.avgSize > 0 ? this.avgSize : config.getCopyOnWriteRecordSizeEstimate(); HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants(); if (!commitTimeline.empty()) { long sizeFromCommitMetadata = calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java index 6ceb45431e18e..304d8f5d7452e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java @@ -78,7 +78,7 @@ public class WriteProfile { * The average record size. */ @Getter - private long avgSize = -1L; + protected long avgSize = -1L; /** * Total records to write for each bucket based on