From b259527de31870bc6d435d9987ec4708a45eb3ae Mon Sep 17 00:00:00 2001 From: danny0405 Date: Sat, 13 Jun 2026 12:44:10 +0800 Subject: [PATCH 1/3] fix(flink): fix the mor small file record size estimation --- .../profile/DeltaWriteProfile.java | 6 ++- .../partitioner/profile/WriteProfile.java | 12 +++-- .../sink/partitioner/TestBucketAssigner.java | 53 ++++++++++++++++++- 3 files changed, 64 insertions(+), 7 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java index f73adb37d3379..c93cd6a5eef62 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java @@ -86,6 +86,11 @@ protected List smallFilesProfile(String partitionPath) { return smallFileLocations; } + @Override + protected double fileSizeParquetCompressionRatio() { + return config.getLogFileToParquetCompressionRatio(); + } + protected SyncableFileSystemView getFileSystemView() { return (SyncableFileSystemView) getTable().getSliceView(); } @@ -98,5 +103,4 @@ private boolean isSmallFile(FileSlice fileSlice) { long totalSize = getTotalFileSize(fileSlice); return totalSize < config.getParquetMaxFileSize(); } - } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java index cbcc71cedb6ec..7222ca925cb5e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java @@ -154,7 +154,7 @@ private long averageBytesPerRecord() { long totalBytesWritten = commitMetadata.fetchTotalBytesWritten(); long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten(); if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) { - avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten); + avgSize = (long) Math.ceil((fileSizeParquetCompressionRatio() * totalBytesWritten) / totalRecordsWritten); break; } } @@ -163,6 +163,10 @@ private long averageBytesPerRecord() { return avgSize; } + protected double fileSizeParquetCompressionRatio() { + return 1; + } + /** * Returns a list of small files in the given partition path. * @@ -228,10 +232,8 @@ private void cleanMetadataCache(Stream instants) { private void recordProfile() { this.avgSize = averageBytesPerRecord(); - if (config.shouldAllowMultiWriteOnSameInstant()) { - this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize; - log.info("Refresh insert records per bucket => " + recordsPerBucket); - } + this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize; + log.info("Refresh insert records per bucket => " + recordsPerBucket); } /** diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java index 4f682084050bc..74f8db2caa3a9 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java @@ -19,11 +19,16 @@ package org.apache.hudi.sink.partitioner; import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.sink.partitioner.profile.DeltaWriteProfile; import org.apache.hudi.sink.partitioner.profile.WriteProfile; import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.table.action.commit.BucketType; @@ -151,7 +156,8 @@ public void testAddInsert() { @Test public void testInsertOverBucketAssigned() { - conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(), "2"); + conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1); + conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), String.valueOf(512 * 1024)); writeConfig = FlinkWriteClients.getHoodieClientConfig(conf); MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig); @@ -402,6 +408,51 @@ public void testWriteProfileMetadataCache() throws Exception { writeProfile.getMetadataCache().size(), is(3)); } + @Test + public void testWriteProfileRecordsPerBucketUsesProfiledRecordSize() { + conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1); + conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(), "2"); + conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), "1024"); + writeConfig = FlinkWriteClients.getHoodieClientConfig(conf); + + WriteProfile writeProfile = new WriteProfile(writeConfig, context); + + assertThat("Average record size should use the configured estimate for an empty table", + writeProfile.getAvgSize(), is(1024L)); + assertThat("Records per bucket should be derived from the max parquet file size", + writeProfile.getRecordsPerBucket(), is(1024L)); + } + + @Test + public void testDeltaWriteProfileRecordsPerBucketUsesCompressionRatio() throws Exception { + File morPath = new File(tempFile, "mor"); + Configuration morConf = TestConfigurations.getDefaultConf(morPath.getAbsolutePath()); + morConf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name()); + morConf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1); + morConf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), "1024"); + morConf.setString(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1"); + morConf.setString(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO_FRACTION.key(), "0.5"); + StreamerUtil.initTableIfNotExists(morConf); + TestData.writeData(TestData.DATA_SET_INSERT, morConf); + + HoodieWriteConfig morWriteConfig = FlinkWriteClients.getHoodieClientConfig(morConf); + HoodieFlinkEngineContext morContext = new HoodieFlinkEngineContext( + HadoopFSUtils.getStorageConf(HadoopConfigurations.getHadoopConf(morConf)), + new FlinkTaskContextSupplier(null)); + + DeltaWriteProfile writeProfile = new DeltaWriteProfile(morWriteConfig, morContext); + String latestInstant = getLastCompleteInstant(writeProfile); + HoodieCommitMetadata commitMetadata = writeProfile.getMetadataCache().get(latestInstant); + assertNotNull(commitMetadata); + long expectedAvgSize = (long) Math.ceil( + 0.5 * commitMetadata.fetchTotalBytesWritten() / commitMetadata.fetchTotalRecordsWritten()); + + assertThat("Average record size from commit metadata should be corrected for MOR log-to-parquet compression", + writeProfile.getAvgSize(), is(expectedAvgSize)); + assertThat("Records per bucket should use the corrected MOR average record size", + writeProfile.getRecordsPerBucket(), is(morWriteConfig.getParquetMaxFileSize() / expectedAvgSize)); + } + private static String getLastCompleteInstant(WriteProfile profile) { return StreamerUtil.getLastCompletedInstant(profile.getMetaClient()); } From 6aaae60dd47257b85dc3d932cb6e07870cc830b4 Mon Sep 17 00:00:00 2001 From: danny0405 Date: Sun, 14 Jun 2026 11:45:09 +0800 Subject: [PATCH 2/3] fix parquet record size estimation --- .../profile/DeltaWriteProfile.java | 14 +++++++-- .../partitioner/profile/WriteProfile.java | 6 ++-- .../sink/partitioner/TestBucketAssigner.java | 31 +++++++++++++++++++ .../table/ITTestDynamicBucketStreamWrite.java | 6 +++- 4 files changed, 50 insertions(+), 7 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java index c93cd6a5eef62..fbf9105307011 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.SyncableFileSystemView; @@ -87,8 +88,8 @@ protected List smallFilesProfile(String partitionPath) { } @Override - protected double fileSizeParquetCompressionRatio() { - return config.getLogFileToParquetCompressionRatio(); + protected double fileSizeCalibrationRatio() { + return logFileToParquetCompressionRatio(); } protected SyncableFileSystemView getFileSystemView() { @@ -96,11 +97,18 @@ protected SyncableFileSystemView getFileSystemView() { } private long getTotalFileSize(FileSlice fileSlice) { - return fileSlice.getTotalFileSizeAsParquetFormat(config.getLogFileToParquetCompressionRatio()); + return fileSlice.getTotalFileSizeAsParquetFormat(logFileToParquetCompressionRatio()); } private boolean isSmallFile(FileSlice fileSlice) { long totalSize = getTotalFileSize(fileSlice); return totalSize < config.getParquetMaxFileSize(); } + + private double logFileToParquetCompressionRatio() { + return config.getLogDataBlockFormat() + .map(logBlockType -> logBlockType == HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK + ? 1D : config.getLogFileToParquetCompressionRatio()) + .orElse(config.getLogFileToParquetCompressionRatio()); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java index 7222ca925cb5e..cff0778c1aa6c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java @@ -154,7 +154,7 @@ private long averageBytesPerRecord() { long totalBytesWritten = commitMetadata.fetchTotalBytesWritten(); long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten(); if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) { - avgSize = (long) Math.ceil((fileSizeParquetCompressionRatio() * totalBytesWritten) / totalRecordsWritten); + avgSize = (long) Math.ceil((fileSizeCalibrationRatio() * totalBytesWritten) / totalRecordsWritten); break; } } @@ -163,8 +163,8 @@ private long averageBytesPerRecord() { return avgSize; } - protected double fileSizeParquetCompressionRatio() { - return 1; + protected double fileSizeCalibrationRatio() { + return 1D; } /** diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java index 74f8db2caa3a9..005c7cf00e7ad 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java @@ -453,6 +453,37 @@ public void testDeltaWriteProfileRecordsPerBucketUsesCompressionRatio() throws E writeProfile.getRecordsPerBucket(), is(morWriteConfig.getParquetMaxFileSize() / expectedAvgSize)); } + @Test + public void testDeltaWriteProfileRecordsPerBucketSkipsCompressionRatioForParquetLogBlocks() throws Exception { + File morPath = new File(tempFile, "mor_parquet_logs"); + Configuration morConf = TestConfigurations.getDefaultConf(morPath.getAbsolutePath()); + morConf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name()); + morConf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1); + morConf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), "1024"); + morConf.setString(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1"); + morConf.setString(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO_FRACTION.key(), "0.5"); + morConf.setString(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"); + StreamerUtil.initTableIfNotExists(morConf); + TestData.writeData(TestData.DATA_SET_INSERT, morConf); + + HoodieWriteConfig morWriteConfig = FlinkWriteClients.getHoodieClientConfig(morConf); + HoodieFlinkEngineContext morContext = new HoodieFlinkEngineContext( + HadoopFSUtils.getStorageConf(HadoopConfigurations.getHadoopConf(morConf)), + new FlinkTaskContextSupplier(null)); + + DeltaWriteProfile writeProfile = new DeltaWriteProfile(morWriteConfig, morContext); + String latestInstant = getLastCompleteInstant(writeProfile); + HoodieCommitMetadata commitMetadata = writeProfile.getMetadataCache().get(latestInstant); + assertNotNull(commitMetadata); + long expectedAvgSize = (long) Math.ceil( + 1.0 * commitMetadata.fetchTotalBytesWritten() / commitMetadata.fetchTotalRecordsWritten()); + + assertThat("Average record size from parquet log blocks should not be corrected again", + writeProfile.getAvgSize(), is(expectedAvgSize)); + assertThat("Records per bucket should use the uncorrected parquet log block average record size", + writeProfile.getRecordsPerBucket(), is(morWriteConfig.getParquetMaxFileSize() / expectedAvgSize)); + } + private static String getLastCompleteInstant(WriteProfile profile) { return StreamerUtil.getLastCompletedInstant(profile.getMetaClient()); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java index 83b5b7f73cdfe..f82f8fdf904ec 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java @@ -166,8 +166,12 @@ void testInsertOverwrite(HoodieTableType tableType) { @ParameterizedTest @EnumSource(value = HoodieTableType.class) void testBucketScalesUpWithContinuousWrites(HoodieTableType tableType) { + Map smallBucketOptions = Map.of( + HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(), "1", + FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE.key(), "1", + HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), String.valueOf(1024 * 1024)); streamTableEnv.executeSql(getTableDDL( - "t1", tableType, Collections.singletonMap(HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(), "1"), true)); + "t1", tableType, smallBucketOptions, true)); execInsertSql(streamTableEnv, "insert into t1 values\n" + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par_scale'),\n" From 5a358314436a6e4b50359f60b088f248728885dd Mon Sep 17 00:00:00 2001 From: danny0405 Date: Tue, 16 Jun 2026 13:37:41 +0800 Subject: [PATCH 3/3] address review comments --- .../profile/DeltaWriteProfile.java | 34 +++++++++--- .../partitioner/profile/WriteProfile.java | 54 ++++++++++--------- .../sink/partitioner/TestBucketAssigner.java | 21 ++++++++ .../table/ITTestDynamicBucketStreamWrite.java | 1 + 4 files changed, 80 insertions(+), 30 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java index fbf9105307011..a79c78e6d3639 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java @@ -29,6 +29,8 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.action.commit.SmallFile; +import lombok.extern.slf4j.Slf4j; + import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -39,7 +41,9 @@ * *

Note: assumes the index can always index log files for Flink write. */ +@Slf4j public class DeltaWriteProfile extends WriteProfile { + public DeltaWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) { super(config, context); } @@ -88,8 +92,25 @@ protected List smallFilesProfile(String partitionPath) { } @Override - protected double fileSizeCalibrationRatio() { - return logFileToParquetCompressionRatio(); + protected long averageBytesPerRecord() { + long avgSize = config.getCopyOnWriteRecordSizeEstimate(); + HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants(); + if (!commitTimeline.empty()) { + long sizeFromCommitMetadata = calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D); + if (sizeFromCommitMetadata > 0) { + avgSize = sizeFromCommitMetadata; + } + } else { + HoodieTimeline deltaCommitTimeline = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants(); + if (!deltaCommitTimeline.empty()) { + long sizeFromCommitMetadata = calculateRecordSizeThroughCommitMetadata(deltaCommitTimeline, logFileToParquetCompressionRatio()); + if (sizeFromCommitMetadata > 0) { + avgSize = sizeFromCommitMetadata; + } + } + } + log.info("Refresh average bytes per record => " + avgSize); + return avgSize; } protected SyncableFileSystemView getFileSystemView() { @@ -106,9 +127,10 @@ private boolean isSmallFile(FileSlice fileSlice) { } private double logFileToParquetCompressionRatio() { - return config.getLogDataBlockFormat() - .map(logBlockType -> logBlockType == HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK - ? 1D : config.getLogFileToParquetCompressionRatio()) - .orElse(config.getLogFileToParquetCompressionRatio()); + if (config.getLogDataBlockFormat().isPresent() + && config.getLogDataBlockFormat().get() == HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) { + return 1D; + } + return config.getLogFileToParquetCompressionRatio(); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java index cff0778c1aa6c..c3819bf081166 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java @@ -117,7 +117,6 @@ public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) this.context = context; this.basePath = new Path(config.getBasePath()); this.smallFilesMap = new HashMap<>(); - this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize(); this.metaClient = StreamerUtil.createMetaClient( config.getBasePath(), context.getStorageConf().unwrapAs(Configuration.class)); this.metadataCache = new HashMap<>(); @@ -134,37 +133,44 @@ public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) * Obtains the average record size based on records written during previous commits. Used for estimating how many * records pack into one file. */ - private long averageBytesPerRecord() { + protected long averageBytesPerRecord() { long avgSize = config.getCopyOnWriteRecordSizeEstimate(); - long fileSizeThreshold = (long) (config.getRecordSizeEstimationThreshold() * config.getParquetSmallFileLimit()); - HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants(); + HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants(); if (!commitTimeline.empty()) { - // Go over the reverse ordered commits to get a more recent estimate of average record size. - Iterator instants = commitTimeline.getReverseOrderedInstants().iterator(); - while (instants.hasNext()) { - HoodieInstant instant = instants.next(); - final HoodieCommitMetadata commitMetadata = - this.metadataCache.computeIfAbsent( - instant.requestedTime(), - k -> WriteProfiles.getCommitMetadataSafely(config.getTableName(), basePath, instant, commitTimeline) - .orElse(null)); - if (commitMetadata == null) { - continue; - } - long totalBytesWritten = commitMetadata.fetchTotalBytesWritten(); - long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten(); - if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) { - avgSize = (long) Math.ceil((fileSizeCalibrationRatio() * totalBytesWritten) / totalRecordsWritten); - break; - } + long sizeFromCommitMetadata = calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D); + if (sizeFromCommitMetadata > 0) { + avgSize = sizeFromCommitMetadata; } } log.info("Refresh average bytes per record => " + avgSize); return avgSize; } - protected double fileSizeCalibrationRatio() { - return 1D; + protected long calculateRecordSizeThroughCommitMetadata(HoodieTimeline commitTimeline, double fileSizeCalibrationRatio) { + long fileSizeThreshold = recordSizeEstimationFileSizeThreshold(); + // Go over the reverse ordered commits to get a more recent estimate of average record size. + Iterator instants = commitTimeline.getReverseOrderedInstants().iterator(); + while (instants.hasNext()) { + HoodieInstant instant = instants.next(); + final HoodieCommitMetadata commitMetadata = + this.metadataCache.computeIfAbsent( + instant.requestedTime(), + k -> WriteProfiles.getCommitMetadataSafely(config.getTableName(), basePath, instant, commitTimeline) + .orElse(null)); + if (commitMetadata == null) { + continue; + } + long totalBytesWritten = commitMetadata.fetchTotalBytesWritten(); + long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten(); + if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) { + return (long) Math.ceil((fileSizeCalibrationRatio * totalBytesWritten) / totalRecordsWritten); + } + } + return -1L; + } + + private long recordSizeEstimationFileSizeThreshold() { + return (long) (0.1D * config.getParquetSmallFileLimit()); } /** diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java index 005c7cf00e7ad..8281ebbd86e4f 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java @@ -423,6 +423,27 @@ public void testWriteProfileRecordsPerBucketUsesProfiledRecordSize() { writeProfile.getRecordsPerBucket(), is(1024L)); } + @Test + public void testWriteProfileRecordsPerBucketUsesProfiledRecordSizeWithSmallEstimationThreshold() throws Exception { + conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1); + conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), String.valueOf(1024 * 1024)); + conf.setString(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1"); + TestData.writeData(TestData.DATA_SET_INSERT, conf); + + writeConfig = FlinkWriteClients.getHoodieClientConfig(conf); + WriteProfile writeProfile = new WriteProfile(writeConfig, context); + String latestInstant = getLastCompleteInstant(writeProfile); + HoodieCommitMetadata commitMetadata = writeProfile.getMetadataCache().get(latestInstant); + assertNotNull(commitMetadata); + long expectedAvgSize = (long) Math.ceil( + 1.0 * commitMetadata.fetchTotalBytesWritten() / commitMetadata.fetchTotalRecordsWritten()); + + assertThat("Average record size should use commit metadata when it is large enough relative to small file limit", + writeProfile.getAvgSize(), is(expectedAvgSize)); + assertThat("Records per bucket should use the profiled record size", + writeProfile.getRecordsPerBucket(), is(writeConfig.getParquetMaxFileSize() / expectedAvgSize)); + } + @Test public void testDeltaWriteProfileRecordsPerBucketUsesCompressionRatio() throws Exception { File morPath = new File(tempFile, "mor"); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java index f82f8fdf904ec..6fb0dceb6fa93 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java @@ -169,6 +169,7 @@ void testBucketScalesUpWithContinuousWrites(HoodieTableType tableType) { Map smallBucketOptions = Map.of( HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(), "1", FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE.key(), "1", + HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1", HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), String.valueOf(1024 * 1024)); streamTableEnv.executeSql(getTableDDL( "t1", tableType, smallBucketOptions, true));