diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
index f73adb37d3379..a79c78e6d3639 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
@@ -22,12 +22,15 @@
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.action.commit.SmallFile;
+import lombok.extern.slf4j.Slf4j;
+
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -38,7 +41,9 @@
*
*
Note: assumes the index can always index log files for Flink write.
*/
+@Slf4j
public class DeltaWriteProfile extends WriteProfile {
+
public DeltaWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
super(config, context);
}
@@ -86,12 +91,34 @@ protected List smallFilesProfile(String partitionPath) {
return smallFileLocations;
}
+ @Override
+ protected long averageBytesPerRecord() {
+ long avgSize = config.getCopyOnWriteRecordSizeEstimate();
+ HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants();
+ if (!commitTimeline.empty()) {
+ long sizeFromCommitMetadata = calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D);
+ if (sizeFromCommitMetadata > 0) {
+ avgSize = sizeFromCommitMetadata;
+ }
+ } else {
+ HoodieTimeline deltaCommitTimeline = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
+ if (!deltaCommitTimeline.empty()) {
+ long sizeFromCommitMetadata = calculateRecordSizeThroughCommitMetadata(deltaCommitTimeline, logFileToParquetCompressionRatio());
+ if (sizeFromCommitMetadata > 0) {
+ avgSize = sizeFromCommitMetadata;
+ }
+ }
+ }
+ log.info("Refresh average bytes per record => " + avgSize);
+ return avgSize;
+ }
+
protected SyncableFileSystemView getFileSystemView() {
return (SyncableFileSystemView) getTable().getSliceView();
}
private long getTotalFileSize(FileSlice fileSlice) {
- return fileSlice.getTotalFileSizeAsParquetFormat(config.getLogFileToParquetCompressionRatio());
+ return fileSlice.getTotalFileSizeAsParquetFormat(logFileToParquetCompressionRatio());
}
private boolean isSmallFile(FileSlice fileSlice) {
@@ -99,4 +126,11 @@ private boolean isSmallFile(FileSlice fileSlice) {
return totalSize < config.getParquetMaxFileSize();
}
+ private double logFileToParquetCompressionRatio() {
+ if (config.getLogDataBlockFormat().isPresent()
+ && config.getLogDataBlockFormat().get() == HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) {
+ return 1D;
+ }
+ return config.getLogFileToParquetCompressionRatio();
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
index cbcc71cedb6ec..c3819bf081166 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
@@ -117,7 +117,6 @@ public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context)
this.context = context;
this.basePath = new Path(config.getBasePath());
this.smallFilesMap = new HashMap<>();
- this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
this.metaClient = StreamerUtil.createMetaClient(
config.getBasePath(), context.getStorageConf().unwrapAs(Configuration.class));
this.metadataCache = new HashMap<>();
@@ -134,35 +133,46 @@ public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context)
* Obtains the average record size based on records written during previous commits. Used for estimating how many
* records pack into one file.
*/
- private long averageBytesPerRecord() {
+ protected long averageBytesPerRecord() {
long avgSize = config.getCopyOnWriteRecordSizeEstimate();
- long fileSizeThreshold = (long) (config.getRecordSizeEstimationThreshold() * config.getParquetSmallFileLimit());
- HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();
+ HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants();
if (!commitTimeline.empty()) {
- // Go over the reverse ordered commits to get a more recent estimate of average record size.
- Iterator instants = commitTimeline.getReverseOrderedInstants().iterator();
- while (instants.hasNext()) {
- HoodieInstant instant = instants.next();
- final HoodieCommitMetadata commitMetadata =
- this.metadataCache.computeIfAbsent(
- instant.requestedTime(),
- k -> WriteProfiles.getCommitMetadataSafely(config.getTableName(), basePath, instant, commitTimeline)
- .orElse(null));
- if (commitMetadata == null) {
- continue;
- }
- long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
- long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
- if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
- avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
- break;
- }
+ long sizeFromCommitMetadata = calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D);
+ if (sizeFromCommitMetadata > 0) {
+ avgSize = sizeFromCommitMetadata;
}
}
log.info("Refresh average bytes per record => " + avgSize);
return avgSize;
}
+ protected long calculateRecordSizeThroughCommitMetadata(HoodieTimeline commitTimeline, double fileSizeCalibrationRatio) {
+ long fileSizeThreshold = recordSizeEstimationFileSizeThreshold();
+ // Go over the reverse ordered commits to get a more recent estimate of average record size.
+ Iterator instants = commitTimeline.getReverseOrderedInstants().iterator();
+ while (instants.hasNext()) {
+ HoodieInstant instant = instants.next();
+ final HoodieCommitMetadata commitMetadata =
+ this.metadataCache.computeIfAbsent(
+ instant.requestedTime(),
+ k -> WriteProfiles.getCommitMetadataSafely(config.getTableName(), basePath, instant, commitTimeline)
+ .orElse(null));
+ if (commitMetadata == null) {
+ continue;
+ }
+ long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
+ long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
+ if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
+ return (long) Math.ceil((fileSizeCalibrationRatio * totalBytesWritten) / totalRecordsWritten);
+ }
+ }
+ return -1L;
+ }
+
+ private long recordSizeEstimationFileSizeThreshold() {
+ return (long) (0.1D * config.getParquetSmallFileLimit());
+ }
+
/**
* Returns a list of small files in the given partition path.
*
@@ -228,10 +238,8 @@ private void cleanMetadataCache(Stream instants) {
private void recordProfile() {
this.avgSize = averageBytesPerRecord();
- if (config.shouldAllowMultiWriteOnSameInstant()) {
- this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize;
- log.info("Refresh insert records per bucket => " + recordsPerBucket);
- }
+ this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize;
+ log.info("Refresh insert records per bucket => " + recordsPerBucket);
}
/**
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
index 4f682084050bc..8281ebbd86e4f 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
@@ -19,11 +19,16 @@
package org.apache.hudi.sink.partitioner;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.sink.partitioner.profile.DeltaWriteProfile;
import org.apache.hudi.sink.partitioner.profile.WriteProfile;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
@@ -151,7 +156,8 @@ public void testAddInsert() {
@Test
public void testInsertOverBucketAssigned() {
- conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(), "2");
+ conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+ conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), String.valueOf(512 * 1024));
writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);
MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig);
@@ -402,6 +408,103 @@ public void testWriteProfileMetadataCache() throws Exception {
writeProfile.getMetadataCache().size(), is(3));
}
+ @Test
+ public void testWriteProfileRecordsPerBucketUsesProfiledRecordSize() {
+ conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+ conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(), "2");
+ conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), "1024");
+ writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);
+
+ WriteProfile writeProfile = new WriteProfile(writeConfig, context);
+
+ assertThat("Average record size should use the configured estimate for an empty table",
+ writeProfile.getAvgSize(), is(1024L));
+ assertThat("Records per bucket should be derived from the max parquet file size",
+ writeProfile.getRecordsPerBucket(), is(1024L));
+ }
+
+ @Test
+ public void testWriteProfileRecordsPerBucketUsesProfiledRecordSizeWithSmallEstimationThreshold() throws Exception {
+ conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+ conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), String.valueOf(1024 * 1024));
+ conf.setString(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1");
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);
+ WriteProfile writeProfile = new WriteProfile(writeConfig, context);
+ String latestInstant = getLastCompleteInstant(writeProfile);
+ HoodieCommitMetadata commitMetadata = writeProfile.getMetadataCache().get(latestInstant);
+ assertNotNull(commitMetadata);
+ long expectedAvgSize = (long) Math.ceil(
+ 1.0 * commitMetadata.fetchTotalBytesWritten() / commitMetadata.fetchTotalRecordsWritten());
+
+ assertThat("Average record size should use commit metadata when it is large enough relative to small file limit",
+ writeProfile.getAvgSize(), is(expectedAvgSize));
+ assertThat("Records per bucket should use the profiled record size",
+ writeProfile.getRecordsPerBucket(), is(writeConfig.getParquetMaxFileSize() / expectedAvgSize));
+ }
+
+ @Test
+ public void testDeltaWriteProfileRecordsPerBucketUsesCompressionRatio() throws Exception {
+ File morPath = new File(tempFile, "mor");
+ Configuration morConf = TestConfigurations.getDefaultConf(morPath.getAbsolutePath());
+ morConf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+ morConf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+ morConf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), "1024");
+ morConf.setString(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1");
+ morConf.setString(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO_FRACTION.key(), "0.5");
+ StreamerUtil.initTableIfNotExists(morConf);
+ TestData.writeData(TestData.DATA_SET_INSERT, morConf);
+
+ HoodieWriteConfig morWriteConfig = FlinkWriteClients.getHoodieClientConfig(morConf);
+ HoodieFlinkEngineContext morContext = new HoodieFlinkEngineContext(
+ HadoopFSUtils.getStorageConf(HadoopConfigurations.getHadoopConf(morConf)),
+ new FlinkTaskContextSupplier(null));
+
+ DeltaWriteProfile writeProfile = new DeltaWriteProfile(morWriteConfig, morContext);
+ String latestInstant = getLastCompleteInstant(writeProfile);
+ HoodieCommitMetadata commitMetadata = writeProfile.getMetadataCache().get(latestInstant);
+ assertNotNull(commitMetadata);
+ long expectedAvgSize = (long) Math.ceil(
+ 0.5 * commitMetadata.fetchTotalBytesWritten() / commitMetadata.fetchTotalRecordsWritten());
+
+ assertThat("Average record size from commit metadata should be corrected for MOR log-to-parquet compression",
+ writeProfile.getAvgSize(), is(expectedAvgSize));
+ assertThat("Records per bucket should use the corrected MOR average record size",
+ writeProfile.getRecordsPerBucket(), is(morWriteConfig.getParquetMaxFileSize() / expectedAvgSize));
+ }
+
+ @Test
+ public void testDeltaWriteProfileRecordsPerBucketSkipsCompressionRatioForParquetLogBlocks() throws Exception {
+ File morPath = new File(tempFile, "mor_parquet_logs");
+ Configuration morConf = TestConfigurations.getDefaultConf(morPath.getAbsolutePath());
+ morConf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+ morConf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+ morConf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), "1024");
+ morConf.setString(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1");
+ morConf.setString(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO_FRACTION.key(), "0.5");
+ morConf.setString(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet");
+ StreamerUtil.initTableIfNotExists(morConf);
+ TestData.writeData(TestData.DATA_SET_INSERT, morConf);
+
+ HoodieWriteConfig morWriteConfig = FlinkWriteClients.getHoodieClientConfig(morConf);
+ HoodieFlinkEngineContext morContext = new HoodieFlinkEngineContext(
+ HadoopFSUtils.getStorageConf(HadoopConfigurations.getHadoopConf(morConf)),
+ new FlinkTaskContextSupplier(null));
+
+ DeltaWriteProfile writeProfile = new DeltaWriteProfile(morWriteConfig, morContext);
+ String latestInstant = getLastCompleteInstant(writeProfile);
+ HoodieCommitMetadata commitMetadata = writeProfile.getMetadataCache().get(latestInstant);
+ assertNotNull(commitMetadata);
+ long expectedAvgSize = (long) Math.ceil(
+ 1.0 * commitMetadata.fetchTotalBytesWritten() / commitMetadata.fetchTotalRecordsWritten());
+
+ assertThat("Average record size from parquet log blocks should not be corrected again",
+ writeProfile.getAvgSize(), is(expectedAvgSize));
+ assertThat("Records per bucket should use the uncorrected parquet log block average record size",
+ writeProfile.getRecordsPerBucket(), is(morWriteConfig.getParquetMaxFileSize() / expectedAvgSize));
+ }
+
private static String getLastCompleteInstant(WriteProfile profile) {
return StreamerUtil.getLastCompletedInstant(profile.getMetaClient());
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java
index 83b5b7f73cdfe..6fb0dceb6fa93 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java
@@ -166,8 +166,13 @@ void testInsertOverwrite(HoodieTableType tableType) {
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testBucketScalesUpWithContinuousWrites(HoodieTableType tableType) {
+ Map smallBucketOptions = Map.of(
+ HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(), "1",
+ FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE.key(), "1",
+ HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1",
+ HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), String.valueOf(1024 * 1024));
streamTableEnv.executeSql(getTableDDL(
- "t1", tableType, Collections.singletonMap(HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(), "1"), true));
+ "t1", tableType, smallBucketOptions, true));
execInsertSql(streamTableEnv, "insert into t1 values\n"
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par_scale'),\n"