diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
index a79c78e6d3639..2cdee3453a467 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
@@ -29,8 +29,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.action.commit.SmallFile;
-import lombok.extern.slf4j.Slf4j;
-
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -41,7 +39,6 @@
*
*
Note: assumes the index can always index log files for Flink write.
*/
-@Slf4j
public class DeltaWriteProfile extends WriteProfile {
public DeltaWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
@@ -93,7 +90,7 @@ protected List smallFilesProfile(String partitionPath) {
@Override
protected long averageBytesPerRecord() {
- long avgSize = config.getCopyOnWriteRecordSizeEstimate();
+ long avgSize = this.avgSize > 0 ? this.avgSize : config.getCopyOnWriteRecordSizeEstimate();
HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants();
if (!commitTimeline.empty()) {
long sizeFromCommitMetadata = calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D);
@@ -109,7 +106,6 @@ protected long averageBytesPerRecord() {
}
}
}
- log.info("Refresh average bytes per record => " + avgSize);
return avgSize;
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
index c3819bf081166..304d8f5d7452e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
@@ -78,7 +78,7 @@ public class WriteProfile {
* The average record size.
*/
@Getter
- private long avgSize = -1L;
+ protected long avgSize = -1L;
/**
* Total records to write for each bucket based on
@@ -134,7 +134,7 @@ public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context)
* records pack into one file.
*/
protected long averageBytesPerRecord() {
- long avgSize = config.getCopyOnWriteRecordSizeEstimate();
+ long avgSize = this.avgSize > 0 ? this.avgSize : config.getCopyOnWriteRecordSizeEstimate();
HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants();
if (!commitTimeline.empty()) {
long sizeFromCommitMetadata = calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D);
@@ -142,7 +142,6 @@ protected long averageBytesPerRecord() {
avgSize = sizeFromCommitMetadata;
}
}
- log.info("Refresh average bytes per record => " + avgSize);
return avgSize;
}
@@ -238,8 +237,9 @@ private void cleanMetadataCache(Stream instants) {
private void recordProfile() {
this.avgSize = averageBytesPerRecord();
+ log.info("Refresh average bytes per record => {}", avgSize);
this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize;
- log.info("Refresh insert records per bucket => " + recordsPerBucket);
+ log.info("Refresh insert records per bucket => {}", recordsPerBucket);
}
/**
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
index 8281ebbd86e4f..3b5c265f7c916 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
@@ -23,6 +23,7 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
@@ -46,11 +47,13 @@
import java.io.File;
import java.io.IOException;
+import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -444,6 +447,26 @@ public void testWriteProfileRecordsPerBucketUsesProfiledRecordSizeWithSmallEstim
writeProfile.getRecordsPerBucket(), is(writeConfig.getParquetMaxFileSize() / expectedAvgSize));
}
+ @Test
+ public void testWriteProfileReusesPreviousAvgSizeWhenNoEligibleCommitOnReload() throws Exception {
+ conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+ conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), "1024");
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);
+ setScriptedRecordSizes(512L, -1L);
+ WriteProfile writeProfile = new ScriptedRecordSizeWriteProfile(writeConfig, context);
+ assertThat("Average record size should use the profiled commit metadata",
+ writeProfile.getAvgSize(), is(512L));
+
+ writeProfile.reload(1);
+
+ assertThat("Average record size should reuse the previous estimate when no eligible commit metadata is found",
+ writeProfile.getAvgSize(), is(512L));
+ assertThat("Records per bucket should continue to use the previous estimate",
+ writeProfile.getRecordsPerBucket(), is(writeConfig.getParquetMaxFileSize() / 512L));
+ }
+
@Test
public void testDeltaWriteProfileRecordsPerBucketUsesCompressionRatio() throws Exception {
File morPath = new File(tempFile, "mor");
@@ -505,6 +528,34 @@ public void testDeltaWriteProfileRecordsPerBucketSkipsCompressionRatioForParquet
writeProfile.getRecordsPerBucket(), is(morWriteConfig.getParquetMaxFileSize() / expectedAvgSize));
}
+ @Test
+ public void testDeltaWriteProfileReusesPreviousAvgSizeWhenNoEligibleDeltaCommitOnReload() throws Exception {
+ File morPath = new File(tempFile, "mor_reuse_previous_avg");
+ Configuration morConf = TestConfigurations.getDefaultConf(morPath.getAbsolutePath());
+ morConf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+ morConf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+ morConf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), "1024");
+ StreamerUtil.initTableIfNotExists(morConf);
+ TestData.writeData(TestData.DATA_SET_INSERT, morConf);
+
+ HoodieWriteConfig morWriteConfig = FlinkWriteClients.getHoodieClientConfig(morConf);
+ HoodieFlinkEngineContext morContext = new HoodieFlinkEngineContext(
+ HadoopFSUtils.getStorageConf(HadoopConfigurations.getHadoopConf(morConf)),
+ new FlinkTaskContextSupplier(null));
+
+ setScriptedRecordSizes(256L, -1L);
+ DeltaWriteProfile writeProfile = new ScriptedRecordSizeDeltaWriteProfile(morWriteConfig, morContext);
+ assertThat("Average record size should use the profiled delta commit metadata",
+ writeProfile.getAvgSize(), is(256L));
+
+ writeProfile.reload(1);
+
+ assertThat("Average record size should reuse the previous estimate when no eligible delta commit metadata is found",
+ writeProfile.getAvgSize(), is(256L));
+ assertThat("Records per bucket should continue to use the previous estimate",
+ writeProfile.getRecordsPerBucket(), is(morWriteConfig.getParquetMaxFileSize() / 256L));
+ }
+
private static String getLastCompleteInstant(WriteProfile profile) {
return StreamerUtil.getLastCompletedInstant(profile.getMetaClient());
}
@@ -526,6 +577,40 @@ private void assertBucketEquals(
assertThat(bucketInfo.getBucketType(), is(bucketType));
}
+ private static Queue scriptedRecordSizes = new ArrayDeque<>();
+
+ private static void setScriptedRecordSizes(Long... recordSizes) {
+ scriptedRecordSizes = new ArrayDeque<>(Arrays.asList(recordSizes));
+ }
+
+ /**
+ * WriteProfile with scripted record size estimates.
+ */
+ static class ScriptedRecordSizeWriteProfile extends WriteProfile {
+ ScriptedRecordSizeWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
+ super(config, context);
+ }
+
+ @Override
+ protected long calculateRecordSizeThroughCommitMetadata(HoodieTimeline commitTimeline, double fileSizeCalibrationRatio) {
+ return scriptedRecordSizes.remove();
+ }
+ }
+
+ /**
+ * DeltaWriteProfile with scripted record size estimates.
+ */
+ static class ScriptedRecordSizeDeltaWriteProfile extends DeltaWriteProfile {
+ ScriptedRecordSizeDeltaWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
+ super(config, context);
+ }
+
+ @Override
+ protected long calculateRecordSizeThroughCommitMetadata(HoodieTimeline commitTimeline, double fileSizeCalibrationRatio) {
+ return scriptedRecordSizes.remove();
+ }
+ }
+
/**
* Mock BucketAssigner that can specify small files explicitly.
*/