Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.action.commit.SmallFile;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -41,7 +39,6 @@
*
* <p>Note: assumes the index can always index log files for Flink write.
*/
@Slf4j
public class DeltaWriteProfile extends WriteProfile {

public DeltaWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
Expand Down Expand Up @@ -93,7 +90,7 @@ protected List<SmallFile> smallFilesProfile(String partitionPath) {

@Override
protected long averageBytesPerRecord() {
long avgSize = config.getCopyOnWriteRecordSizeEstimate();
long avgSize = this.avgSize > 0 ? this.avgSize : config.getCopyOnWriteRecordSizeEstimate();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the this.avgSize > 0 ? this.avgSize : config.getCopyOnWriteRecordSizeEstimate() initializer is now identical in both WriteProfile.averageBytesPerRecord() and here — could you pull it into a small protected helper on WriteProfile (e.g. initialAvgSizeEstimate()) so the two overrides stay in sync if the fallback logic ever changes?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants();
if (!commitTimeline.empty()) {
long sizeFromCommitMetadata = calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D);
Expand All @@ -109,7 +106,6 @@ protected long averageBytesPerRecord() {
}
}
}
log.info("Refresh average bytes per record => " + avgSize);
return avgSize;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the log.info above removed, DeltaWriteProfile has no remaining log usage, so its class-level @Slf4j annotation and the import lombok.extern.slf4j.Slf4j (both added in #18991 for that line) are now dead. The inherited recordProfile() still logs through WriteProfile's own logger. Drop the annotation and the import from DeltaWriteProfile.

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class WriteProfile {
* The average record size.
*/
@Getter
private long avgSize = -1L;
protected long avgSize = -1L;

/**
* Total records to write for each bucket based on
Expand Down Expand Up @@ -134,15 +134,14 @@ public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context)
* records pack into one file.
*/
protected long averageBytesPerRecord() {
long avgSize = config.getCopyOnWriteRecordSizeEstimate();
long avgSize = this.avgSize > 0 ? this.avgSize : config.getCopyOnWriteRecordSizeEstimate();
HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants();
if (!commitTimeline.empty()) {
long sizeFromCommitMetadata = calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D);
if (sizeFromCommitMetadata > 0) {
avgSize = sizeFromCommitMetadata;
}
}
log.info("Refresh average bytes per record => " + avgSize);
return avgSize;
}

Expand Down Expand Up @@ -238,8 +237,9 @@ private void cleanMetadataCache(Stream<HoodieInstant> instants) {

private void recordProfile() {
this.avgSize = averageBytesPerRecord();
log.info("Refresh average bytes per record => {}", avgSize);
this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize;
log.info("Refresh insert records per bucket => " + recordsPerBucket);
log.info("Refresh insert records per bucket => {}", recordsPerBucket);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
Expand All @@ -46,11 +47,13 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -444,6 +447,26 @@ public void testWriteProfileRecordsPerBucketUsesProfiledRecordSizeWithSmallEstim
writeProfile.getRecordsPerBucket(), is(writeConfig.getParquetMaxFileSize() / expectedAvgSize));
}

@Test
public void testWriteProfileReusesPreviousAvgSizeWhenNoEligibleCommitOnReload() throws Exception {
conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), "1024");
TestData.writeData(TestData.DATA_SET_INSERT, conf);

writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);
setScriptedRecordSizes(512L, -1L);
WriteProfile writeProfile = new ScriptedRecordSizeWriteProfile(writeConfig, context);
assertThat("Average record size should use the profiled commit metadata",
writeProfile.getAvgSize(), is(512L));

writeProfile.reload(1);

assertThat("Average record size should reuse the previous estimate when no eligible commit metadata is found",
writeProfile.getAvgSize(), is(512L));
assertThat("Records per bucket should continue to use the previous estimate",
writeProfile.getRecordsPerBucket(), is(writeConfig.getParquetMaxFileSize() / 512L));
}

@Test
public void testDeltaWriteProfileRecordsPerBucketUsesCompressionRatio() throws Exception {
File morPath = new File(tempFile, "mor");
Expand Down Expand Up @@ -505,6 +528,34 @@ public void testDeltaWriteProfileRecordsPerBucketSkipsCompressionRatioForParquet
writeProfile.getRecordsPerBucket(), is(morWriteConfig.getParquetMaxFileSize() / expectedAvgSize));
}

@Test
public void testDeltaWriteProfileReusesPreviousAvgSizeWhenNoEligibleDeltaCommitOnReload() throws Exception {
File morPath = new File(tempFile, "mor_reuse_previous_avg");
Configuration morConf = TestConfigurations.getDefaultConf(morPath.getAbsolutePath());
morConf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
morConf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
morConf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), "1024");
StreamerUtil.initTableIfNotExists(morConf);
TestData.writeData(TestData.DATA_SET_INSERT, morConf);

HoodieWriteConfig morWriteConfig = FlinkWriteClients.getHoodieClientConfig(morConf);
HoodieFlinkEngineContext morContext = new HoodieFlinkEngineContext(
HadoopFSUtils.getStorageConf(HadoopConfigurations.getHadoopConf(morConf)),
new FlinkTaskContextSupplier(null));

setScriptedRecordSizes(256L, -1L);
DeltaWriteProfile writeProfile = new ScriptedRecordSizeDeltaWriteProfile(morWriteConfig, morContext);
assertThat("Average record size should use the profiled delta commit metadata",
writeProfile.getAvgSize(), is(256L));

writeProfile.reload(1);

assertThat("Average record size should reuse the previous estimate when no eligible delta commit metadata is found",
writeProfile.getAvgSize(), is(256L));
assertThat("Records per bucket should continue to use the previous estimate",
writeProfile.getRecordsPerBucket(), is(morWriteConfig.getParquetMaxFileSize() / 256L));
}

private static String getLastCompleteInstant(WriteProfile profile) {
return StreamerUtil.getLastCompletedInstant(profile.getMetaClient());
}
Expand All @@ -526,6 +577,40 @@ private void assertBucketEquals(
assertThat(bucketInfo.getBucketType(), is(bucketType));
}

private static Queue<Long> scriptedRecordSizes = new ArrayDeque<>();

private static void setScriptedRecordSizes(Long... recordSizes) {
scriptedRecordSizes = new ArrayDeque<>(Arrays.asList(recordSizes));
}

/**
* WriteProfile with scripted record size estimates.
*/
static class ScriptedRecordSizeWriteProfile extends WriteProfile {
ScriptedRecordSizeWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
super(config, context);
}

@Override
protected long calculateRecordSizeThroughCommitMetadata(HoodieTimeline commitTimeline, double fileSizeCalibrationRatio) {
return scriptedRecordSizes.remove();
}
}

/**
* DeltaWriteProfile with scripted record size estimates.
*/
static class ScriptedRecordSizeDeltaWriteProfile extends DeltaWriteProfile {
ScriptedRecordSizeDeltaWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
super(config, context);
}

@Override
protected long calculateRecordSizeThroughCommitMetadata(HoodieTimeline commitTimeline, double fileSizeCalibrationRatio) {
return scriptedRecordSizes.remove();
}
}

/**
* Mock BucketAssigner that can specify small files explicitly.
*/
Expand Down
Loading