Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ private List<WriteStatus> performBucketMergeForGroup(ReaderContextFactory<T> rea
Option<Map<String, String>> extraMetadata = clusteringGroup.getExtraMetadata();
ValidationUtils.checkArgument(extraMetadata.isPresent(), "Extra metadata should be present for consistent hashing operations");
String partition = extraMetadata.get().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(partition), "Partition should not be null or empty");
// Note: partition can be an empty string for non-partitioned tables, so only check for null here.
ValidationUtils.checkArgument(partition != null, "Partition should not be null");
List<ConsistentHashingNode> nodes = decodeConsistentHashingNodes(clusteringGroup);
Option<ConsistentHashingNode> newBucket = Option.fromJavaOptional(nodes.stream().filter(node -> node.getTag() == ConsistentHashingNode.NodeTag.REPLACE).findFirst());
ValidationUtils.checkArgument(newBucket.isPresent(), "New bucket should be present for merge operation");
Expand Down Expand Up @@ -197,7 +198,8 @@ private List<WriteStatus> performBucketSplitForGroup(ReaderContextFactory<T> rea
Option<Map<String, String>> extraMetadata = clusteringGroup.getExtraMetadata();
ValidationUtils.checkArgument(extraMetadata.isPresent(), "Extra metadata should be present for consistent hashing operations");
String partition = extraMetadata.get().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(partition), "Partition should not be null or empty");
// Note: partition can be an empty string for non-partitioned tables, so only check for null here.
ValidationUtils.checkArgument(partition != null, "Partition should not be null");
List<ConsistentHashingNode> nodes = decodeConsistentHashingNodes(clusteringGroup);
Integer seqNo = Integer.parseInt(extraMetadata.get().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_SEQUENCE_NUMBER_KEY));
HoodieConsistentHashingMetadata metadata = new HoodieConsistentHashingMetadata((short) 0, partition, instantTime, 0, seqNo + 1, Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
Expand All @@ -49,6 +50,7 @@
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -102,13 +104,24 @@ public void setup(int maxFileSize, Map<String, String> options) throws IOExcepti
}

public void setup(int maxFileSize, Map<String, String> options, boolean singleJob) throws IOException {
setup(maxFileSize, options, singleJob, false);
}

public void setup(int maxFileSize, Map<String, String> options, boolean singleJob, boolean nonPartitioned) throws IOException {
initPath();
initSparkContexts();
initTestDataGenerator();
initHoodieStorage();
Properties props = getPropertiesForKeyGen(true);
props.putAll(options);
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
if (nonPartitioned) {
// Non-partitioned tables produce records with an empty partition path and use the non-partition key generator.
dataGen = new HoodieTestDataGenerator(new String[] {HoodieTestDataGenerator.NO_PARTITION_PATH});
props.setProperty(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), KeyGeneratorType.NON_PARTITION.name());
props.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "");
props.remove(HoodieTableConfig.PARTITION_FIELDS.key());
}
metaClient = HoodieTestUtils.init(storageConf, basePath, HoodieTableType.MERGE_ON_READ, props);
config = getConfigBuilder().withProps(props)
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props)
Expand All @@ -130,16 +143,21 @@ public void tearDown() throws IOException {
}

/**
* Test resizing with bucket number upper bound and lower bound
* Test resizing with bucket number upper bound and lower bound, on both partitioned and non-partitioned tables.
*
* <p>Non-partitioned coverage guards GitHub issue #18161: consistent hashing clustering used to fail on
* non-partitioned tables because the empty partition path was rejected by the execution strategy. For a
* non-partitioned table {@code dataGen.getPartitionPaths()} returns the single empty partition path, so the
* assertions below cover both cases without branching.
*
* @throws IOException
*/
@ParameterizedTest
@MethodSource("configParams")
public void testResizing(boolean isSplit, boolean rowWriterEnable, boolean single) throws IOException {
@MethodSource("resizingConfigParams")
public void testResizing(boolean isSplit, boolean rowWriterEnable, boolean single, boolean nonPartitioned) throws IOException {
final int maxFileSize = isSplit ? 5120 : 128 * 1024 * 1024;
final int targetBucketNum = isSplit ? 14 : 4;
setup(maxFileSize, Collections.emptyMap(), single);
setup(maxFileSize, Collections.emptyMap(), single, nonPartitioned);
config.setValue("hoodie.datasource.write.row.writer.enable", String.valueOf(rowWriterEnable));
config.setValue("hoodie.metadata.enable", "false");
writeData(2000, true);
Expand Down Expand Up @@ -384,6 +402,16 @@ private static Stream<Arguments> configParams() {
);
}

// configParams crossed with the partitioned / non-partitioned dimension (isSplit, rowWriterEnable, single, nonPartitioned).
private static Stream<Arguments> resizingConfigParams() {
return configParams().flatMap(args -> {
Object[] a = args.get();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you rename a to something like origArgs or baseArgs? The single-letter name is a bit hard to follow when it's indexed three times on the lines below.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

return Stream.of(
Arguments.of(a[0], a[1], a[2], false),
Arguments.of(a[0], a[1], a[2], true));
});
}

private static Stream<Arguments> configParamsForSorting() {
return Stream.of(
Arguments.of("begin_lat", true),
Expand Down
Loading