Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.action.commit.SmallFile;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -38,7 +41,9 @@
*
* <p>Note: assumes the index can always index log files for Flink write.
*/
@Slf4j
public class DeltaWriteProfile extends WriteProfile {

public DeltaWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
super(config, context);
}
Expand Down Expand Up @@ -86,17 +91,46 @@ protected List<SmallFile> smallFilesProfile(String partitionPath) {
return smallFileLocations;
}

@Override
protected long averageBytesPerRecord() {
long avgSize = config.getCopyOnWriteRecordSizeEstimate();
HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants();
if (!commitTimeline.empty()) {
long sizeFromCommitMetadata = calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D);
if (sizeFromCommitMetadata > 0) {
avgSize = sizeFromCommitMetadata;
}
} else {
HoodieTimeline deltaCommitTimeline = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
if (!deltaCommitTimeline.empty()) {
long sizeFromCommitMetadata = calculateRecordSizeThroughCommitMetadata(deltaCommitTimeline, logFileToParquetCompressionRatio());
if (sizeFromCommitMetadata > 0) {
avgSize = sizeFromCommitMetadata;
}
}
}
log.info("Refresh average bytes per record => " + avgSize);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

log.info("Refresh average bytes per record => {}", avgSize);

return avgSize;
}

protected SyncableFileSystemView getFileSystemView() {
return (SyncableFileSystemView) getTable().getSliceView();
}

private long getTotalFileSize(FileSlice fileSlice) {
return fileSlice.getTotalFileSizeAsParquetFormat(config.getLogFileToParquetCompressionRatio());
return fileSlice.getTotalFileSizeAsParquetFormat(logFileToParquetCompressionRatio());
}

private boolean isSmallFile(FileSlice fileSlice) {
long totalSize = getTotalFileSize(fileSlice);
return totalSize < config.getParquetMaxFileSize();
}

private double logFileToParquetCompressionRatio() {
if (config.getLogDataBlockFormat().isPresent()
&& config.getLogDataBlockFormat().get() == HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) {
return 1D;
}
return config.getLogFileToParquetCompressionRatio();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context)
this.context = context;
this.basePath = new Path(config.getBasePath());
this.smallFilesMap = new HashMap<>();
this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
this.metaClient = StreamerUtil.createMetaClient(
config.getBasePath(), context.getStorageConf().unwrapAs(Configuration.class));
this.metadataCache = new HashMap<>();
Expand All @@ -134,35 +133,46 @@ public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context)
* Obtains the average record size based on records written during previous commits. Used for estimating how many
* records pack into one file.
*/
private long averageBytesPerRecord() {
protected long averageBytesPerRecord() {
long avgSize = config.getCopyOnWriteRecordSizeEstimate();
long fileSizeThreshold = (long) (config.getRecordSizeEstimationThreshold() * config.getParquetSmallFileLimit());
HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();
HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants();
if (!commitTimeline.empty()) {
// Go over the reverse ordered commits to get a more recent estimate of average record size.
Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
while (instants.hasNext()) {
HoodieInstant instant = instants.next();
final HoodieCommitMetadata commitMetadata =
this.metadataCache.computeIfAbsent(
instant.requestedTime(),
k -> WriteProfiles.getCommitMetadataSafely(config.getTableName(), basePath, instant, commitTimeline)
.orElse(null));
if (commitMetadata == null) {
continue;
}
long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
break;
}
long sizeFromCommitMetadata = calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D);
if (sizeFromCommitMetadata > 0) {
avgSize = sizeFromCommitMetadata;
}
}
log.info("Refresh average bytes per record => " + avgSize);
return avgSize;
}

protected long calculateRecordSizeThroughCommitMetadata(HoodieTimeline commitTimeline, double fileSizeCalibrationRatio) {
long fileSizeThreshold = recordSizeEstimationFileSizeThreshold();
// Go over the reverse ordered commits to get a more recent estimate of average record size.
Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
while (instants.hasNext()) {
HoodieInstant instant = instants.next();
final HoodieCommitMetadata commitMetadata =
this.metadataCache.computeIfAbsent(
instant.requestedTime(),
k -> WriteProfiles.getCommitMetadataSafely(config.getTableName(), basePath, instant, commitTimeline)
.orElse(null));
if (commitMetadata == null) {
continue;
}
long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
return (long) Math.ceil((fileSizeCalibrationRatio * totalBytesWritten) / totalRecordsWritten);
}
}
return -1L;
}

private long recordSizeEstimationFileSizeThreshold() {
return (long) (0.1D * config.getParquetSmallFileLimit());
}

/**
* Returns a list of small files in the given partition path.
*
Expand Down Expand Up @@ -228,10 +238,8 @@ private void cleanMetadataCache(Stream<HoodieInstant> instants) {

private void recordProfile() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Removing the shouldAllowMultiWriteOnSameInstant gate means COPY_ON_WRITE_INSERT_SPLIT_SIZE is now silently ignored on the Flink side, while the Spark UpsertPartitioner still honors it when shouldAutoTuneInsertSplits() is false. Was the intent to fully deprecate that config for Flink, or should this be gated on shouldAutoTuneInsertSplits() (defaulting to true) to mirror Spark? @danny0405

- AI-generated; verify before applying. React 👍/👎 to flag quality.

this.avgSize = averageBytesPerRecord();
if (config.shouldAllowMultiWriteOnSameInstant()) {
this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize;
log.info("Refresh insert records per bucket => " + recordsPerBucket);
}
this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize;
Comment thread
danny0405 marked this conversation as resolved.
log.info("Refresh insert records per bucket => " + recordsPerBucket);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

log.info("Refresh insert records per bucket => {}", recordsPerBucket);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@
package org.apache.hudi.sink.partitioner;

import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.sink.partitioner.profile.DeltaWriteProfile;
import org.apache.hudi.sink.partitioner.profile.WriteProfile;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
Expand Down Expand Up @@ -151,7 +156,8 @@ public void testAddInsert() {

@Test
public void testInsertOverBucketAssigned() {
conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(), "2");
conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), String.valueOf(512 * 1024));
writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);

MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig);
Expand Down Expand Up @@ -402,6 +408,103 @@ public void testWriteProfileMetadataCache() throws Exception {
writeProfile.getMetadataCache().size(), is(3));
}

@Test
public void testWriteProfileRecordsPerBucketUsesProfiledRecordSize() {
conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(), "2");
conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), "1024");
writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);

WriteProfile writeProfile = new WriteProfile(writeConfig, context);

assertThat("Average record size should use the configured estimate for an empty table",
writeProfile.getAvgSize(), is(1024L));
assertThat("Records per bucket should be derived from the max parquet file size",
writeProfile.getRecordsPerBucket(), is(1024L));
}

@Test
public void testWriteProfileRecordsPerBucketUsesProfiledRecordSizeWithSmallEstimationThreshold() throws Exception {
conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), String.valueOf(1024 * 1024));
conf.setString(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1");
TestData.writeData(TestData.DATA_SET_INSERT, conf);

writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);
WriteProfile writeProfile = new WriteProfile(writeConfig, context);
String latestInstant = getLastCompleteInstant(writeProfile);
HoodieCommitMetadata commitMetadata = writeProfile.getMetadataCache().get(latestInstant);
assertNotNull(commitMetadata);
long expectedAvgSize = (long) Math.ceil(
1.0 * commitMetadata.fetchTotalBytesWritten() / commitMetadata.fetchTotalRecordsWritten());

assertThat("Average record size should use commit metadata when it is large enough relative to small file limit",
writeProfile.getAvgSize(), is(expectedAvgSize));
assertThat("Records per bucket should use the profiled record size",
writeProfile.getRecordsPerBucket(), is(writeConfig.getParquetMaxFileSize() / expectedAvgSize));
}

@Test
public void testDeltaWriteProfileRecordsPerBucketUsesCompressionRatio() throws Exception {
File morPath = new File(tempFile, "mor");
Configuration morConf = TestConfigurations.getDefaultConf(morPath.getAbsolutePath());
morConf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
morConf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
morConf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), "1024");
morConf.setString(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1");
morConf.setString(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO_FRACTION.key(), "0.5");
StreamerUtil.initTableIfNotExists(morConf);
TestData.writeData(TestData.DATA_SET_INSERT, morConf);

HoodieWriteConfig morWriteConfig = FlinkWriteClients.getHoodieClientConfig(morConf);
HoodieFlinkEngineContext morContext = new HoodieFlinkEngineContext(
HadoopFSUtils.getStorageConf(HadoopConfigurations.getHadoopConf(morConf)),
new FlinkTaskContextSupplier(null));

DeltaWriteProfile writeProfile = new DeltaWriteProfile(morWriteConfig, morContext);
String latestInstant = getLastCompleteInstant(writeProfile);
HoodieCommitMetadata commitMetadata = writeProfile.getMetadataCache().get(latestInstant);
assertNotNull(commitMetadata);
long expectedAvgSize = (long) Math.ceil(
0.5 * commitMetadata.fetchTotalBytesWritten() / commitMetadata.fetchTotalRecordsWritten());

assertThat("Average record size from commit metadata should be corrected for MOR log-to-parquet compression",
writeProfile.getAvgSize(), is(expectedAvgSize));
assertThat("Records per bucket should use the corrected MOR average record size",
writeProfile.getRecordsPerBucket(), is(morWriteConfig.getParquetMaxFileSize() / expectedAvgSize));
}

@Test
public void testDeltaWriteProfileRecordsPerBucketSkipsCompressionRatioForParquetLogBlocks() throws Exception {
File morPath = new File(tempFile, "mor_parquet_logs");
Configuration morConf = TestConfigurations.getDefaultConf(morPath.getAbsolutePath());
morConf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
morConf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
morConf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), "1024");
morConf.setString(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1");
morConf.setString(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO_FRACTION.key(), "0.5");
morConf.setString(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet");
StreamerUtil.initTableIfNotExists(morConf);
TestData.writeData(TestData.DATA_SET_INSERT, morConf);

HoodieWriteConfig morWriteConfig = FlinkWriteClients.getHoodieClientConfig(morConf);
HoodieFlinkEngineContext morContext = new HoodieFlinkEngineContext(
HadoopFSUtils.getStorageConf(HadoopConfigurations.getHadoopConf(morConf)),
new FlinkTaskContextSupplier(null));

DeltaWriteProfile writeProfile = new DeltaWriteProfile(morWriteConfig, morContext);
String latestInstant = getLastCompleteInstant(writeProfile);
HoodieCommitMetadata commitMetadata = writeProfile.getMetadataCache().get(latestInstant);
assertNotNull(commitMetadata);
long expectedAvgSize = (long) Math.ceil(
1.0 * commitMetadata.fetchTotalBytesWritten() / commitMetadata.fetchTotalRecordsWritten());

assertThat("Average record size from parquet log blocks should not be corrected again",
writeProfile.getAvgSize(), is(expectedAvgSize));
assertThat("Records per bucket should use the uncorrected parquet log block average record size",
writeProfile.getRecordsPerBucket(), is(morWriteConfig.getParquetMaxFileSize() / expectedAvgSize));
}

private static String getLastCompleteInstant(WriteProfile profile) {
return StreamerUtil.getLastCompletedInstant(profile.getMetaClient());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,13 @@ void testInsertOverwrite(HoodieTableType tableType) {
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testBucketScalesUpWithContinuousWrites(HoodieTableType tableType) {
Map<String, String> smallBucketOptions = Map.of(
HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(), "1",
FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE.key(), "1",
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1",
HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), String.valueOf(1024 * 1024));
streamTableEnv.executeSql(getTableDDL(
"t1", tableType, Collections.singletonMap(HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(), "1"), true));
"t1", tableType, smallBucketOptions, true));

execInsertSql(streamTableEnv, "insert into t1 values\n"
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par_scale'),\n"
Expand Down
Loading