diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java index 7ce49aa9eaf25..af3bc81095b82 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java @@ -111,7 +111,8 @@ private List performBucketMergeForGroup(ReaderContextFactory rea Option> extraMetadata = clusteringGroup.getExtraMetadata(); ValidationUtils.checkArgument(extraMetadata.isPresent(), "Extra metadata should be present for consistent hashing operations"); String partition = extraMetadata.get().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY); - ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(partition), "Partition should not be null or empty"); + // Note: partition can be an empty string for non-partitioned tables, so only check for null here. + ValidationUtils.checkArgument(partition != null, "Partition should not be null"); List nodes = decodeConsistentHashingNodes(clusteringGroup); Option newBucket = Option.fromJavaOptional(nodes.stream().filter(node -> node.getTag() == ConsistentHashingNode.NodeTag.REPLACE).findFirst()); ValidationUtils.checkArgument(newBucket.isPresent(), "New bucket should be present for merge operation"); @@ -197,7 +198,8 @@ private List performBucketSplitForGroup(ReaderContextFactory rea Option> extraMetadata = clusteringGroup.getExtraMetadata(); ValidationUtils.checkArgument(extraMetadata.isPresent(), "Extra metadata should be present for consistent hashing operations"); String partition = extraMetadata.get().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY); - ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(partition), "Partition should not be null or empty"); + // Note: partition can be an empty string for non-partitioned tables, so only check for null here. + ValidationUtils.checkArgument(partition != null, "Partition should not be null"); List nodes = decodeConsistentHashingNodes(clusteringGroup); Integer seqNo = Integer.parseInt(extraMetadata.get().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_SEQUENCE_NUMBER_KEY)); HoodieConsistentHashingMetadata metadata = new HoodieConsistentHashingMetadata((short) 0, partition, instantTime, 0, seqNo + 1, Collections.emptyList()); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java index cd14ef61fb7fc..93e01b75ae282 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaField; import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; @@ -49,6 +50,7 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; @@ -102,6 +104,10 @@ public void setup(int maxFileSize, Map options) throws IOExcepti } public void setup(int maxFileSize, Map options, boolean singleJob) throws IOException { + setup(maxFileSize, options, singleJob, false); + } + + public void setup(int maxFileSize, Map options, boolean singleJob, boolean nonPartitioned) throws IOException { initPath(); initSparkContexts(); initTestDataGenerator(); @@ -109,6 +115,13 @@ public void setup(int maxFileSize, Map options, boolean singleJo Properties props = getPropertiesForKeyGen(true); props.putAll(options); props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key"); + if (nonPartitioned) { + // Non-partitioned tables produce records with an empty partition path and use the non-partition key generator. + dataGen = new HoodieTestDataGenerator(new String[] {HoodieTestDataGenerator.NO_PARTITION_PATH}); + props.setProperty(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), KeyGeneratorType.NON_PARTITION.name()); + props.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), ""); + props.remove(HoodieTableConfig.PARTITION_FIELDS.key()); + } metaClient = HoodieTestUtils.init(storageConf, basePath, HoodieTableType.MERGE_ON_READ, props); config = getConfigBuilder().withProps(props) .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props) @@ -130,16 +143,21 @@ public void tearDown() throws IOException { } /** - * Test resizing with bucket number upper bound and lower bound + * Test resizing with bucket number upper bound and lower bound, on both partitioned and non-partitioned tables. + * + *

Non-partitioned coverage guards GitHub issue #18161: consistent hashing clustering used to fail on + * non-partitioned tables because the empty partition path was rejected by the execution strategy. For a + * non-partitioned table {@code dataGen.getPartitionPaths()} returns the single empty partition path, so the + * assertions below cover both cases without branching. * * @throws IOException */ @ParameterizedTest - @MethodSource("configParams") - public void testResizing(boolean isSplit, boolean rowWriterEnable, boolean single) throws IOException { + @MethodSource("resizingConfigParams") + public void testResizing(boolean isSplit, boolean rowWriterEnable, boolean single, boolean nonPartitioned) throws IOException { final int maxFileSize = isSplit ? 5120 : 128 * 1024 * 1024; final int targetBucketNum = isSplit ? 14 : 4; - setup(maxFileSize, Collections.emptyMap(), single); + setup(maxFileSize, Collections.emptyMap(), single, nonPartitioned); config.setValue("hoodie.datasource.write.row.writer.enable", String.valueOf(rowWriterEnable)); config.setValue("hoodie.metadata.enable", "false"); writeData(2000, true); @@ -384,6 +402,16 @@ private static Stream configParams() { ); } + // configParams crossed with the partitioned / non-partitioned dimension (isSplit, rowWriterEnable, single, nonPartitioned). + private static Stream resizingConfigParams() { + return configParams().flatMap(args -> { + Object[] a = args.get(); + return Stream.of( + Arguments.of(a[0], a[1], a[2], false), + Arguments.of(a[0], a[1], a[2], true)); + }); + } + private static Stream configParamsForSorting() { return Stream.of( Arguments.of("begin_lat", true),