From e4a1779a8920e1d2404ab2da6604890bcddc3fa3 Mon Sep 17 00:00:00 2001 From: Aditya Goenka Date: Wed, 10 Jun 2026 21:08:00 +0530 Subject: [PATCH 1/2] fix(spark): support consistent hashing clustering on non-partitioned tables Consistent hashing clustering failed on non-partitioned tables because SingleSparkJobConsistentHashingExecutionStrategy rejected the empty ("") partition path with a "not null or empty" guard. Relax the guard to only reject null, since an empty partition path is valid for non-partitioned tables. Add a parameterized test (testResizingNonPartitioned) covering split and merge resizing on a non-partitioned table for both the single-job and multi-job execution strategies. Co-Authored-By: Claude Opus 4.8 --- ...JobConsistentHashingExecutionStrategy.java | 6 ++- .../TestSparkConsistentBucketClustering.java | 48 +++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java index 7ce49aa9eaf25..af3bc81095b82 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java @@ -111,7 +111,8 @@ private List performBucketMergeForGroup(ReaderContextFactory rea Option> extraMetadata = clusteringGroup.getExtraMetadata(); ValidationUtils.checkArgument(extraMetadata.isPresent(), "Extra metadata should be present for consistent hashing operations"); String partition = extraMetadata.get().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY); - ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(partition), "Partition should not be null or empty"); + // Note: partition can be an empty string for non-partitioned tables, so only check for null here. + ValidationUtils.checkArgument(partition != null, "Partition should not be null"); List nodes = decodeConsistentHashingNodes(clusteringGroup); Option newBucket = Option.fromJavaOptional(nodes.stream().filter(node -> node.getTag() == ConsistentHashingNode.NodeTag.REPLACE).findFirst()); ValidationUtils.checkArgument(newBucket.isPresent(), "New bucket should be present for merge operation"); @@ -197,7 +198,8 @@ private List performBucketSplitForGroup(ReaderContextFactory rea Option> extraMetadata = clusteringGroup.getExtraMetadata(); ValidationUtils.checkArgument(extraMetadata.isPresent(), "Extra metadata should be present for consistent hashing operations"); String partition = extraMetadata.get().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY); - ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(partition), "Partition should not be null or empty"); + // Note: partition can be an empty string for non-partitioned tables, so only check for null here. + ValidationUtils.checkArgument(partition != null, "Partition should not be null"); List nodes = decodeConsistentHashingNodes(clusteringGroup); Integer seqNo = Integer.parseInt(extraMetadata.get().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_SEQUENCE_NUMBER_KEY)); HoodieConsistentHashingMetadata metadata = new HoodieConsistentHashingMetadata((short) 0, partition, instantTime, 0, seqNo + 1, Collections.emptyList()); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java index cd14ef61fb7fc..07f847829b32d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaField; import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; @@ -49,6 +50,7 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; @@ -102,6 +104,10 @@ public void setup(int maxFileSize, Map options) throws IOExcepti } public void setup(int maxFileSize, Map options, boolean singleJob) throws IOException { + setup(maxFileSize, options, singleJob, false); + } + + public void setup(int maxFileSize, Map options, boolean singleJob, boolean nonPartitioned) throws IOException { initPath(); initSparkContexts(); initTestDataGenerator(); @@ -109,6 +115,13 @@ public void setup(int maxFileSize, Map options, boolean singleJo Properties props = getPropertiesForKeyGen(true); props.putAll(options); props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key"); + if (nonPartitioned) { + // Non-partitioned tables produce records with an empty partition path and use the non-partition key generator. + dataGen = new HoodieTestDataGenerator(new String[] {HoodieTestDataGenerator.NO_PARTITION_PATH}); + props.setProperty(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), KeyGeneratorType.NON_PARTITION.name()); + props.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), ""); + props.remove(HoodieTableConfig.PARTITION_FIELDS.key()); + } metaClient = HoodieTestUtils.init(storageConf, basePath, HoodieTableType.MERGE_ON_READ, props); config = getConfigBuilder().withProps(props) .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props) @@ -162,6 +175,41 @@ public void testResizing(boolean isSplit, boolean rowWriterEnable, boolean singl }); } + /** + * Test bucket resizing (split / merge) on a non-partitioned table, covering both the single-job and + * multi-job execution strategies. See HUDI-18161 / issue #18161: consistent hashing clustering used to + * fail on non-partitioned tables because the empty partition path was rejected by the execution strategy. + * + * @throws IOException + */ + @ParameterizedTest + @MethodSource("configParams") + public void testResizingNonPartitioned(boolean isSplit, boolean rowWriterEnable, boolean single) throws IOException { + final int maxFileSize = isSplit ? 5120 : 128 * 1024 * 1024; + final int targetBucketNum = isSplit ? 14 : 4; + setup(maxFileSize, Collections.emptyMap(), single, true); + config.setValue("hoodie.datasource.write.row.writer.enable", String.valueOf(rowWriterEnable)); + config.setValue("hoodie.metadata.enable", "false"); + writeData(2000, true); + String clusteringTime = (String) writeClient.scheduleClustering(Option.empty()).get(); + writeClient.cluster(clusteringTime, true); + + metaClient = HoodieTableMetaClient.reload(metaClient); + final HoodieTable table = HoodieSparkTable.create(config, context, metaClient); + Assertions.assertEquals(2000, readRecords().size()); + + // Non-partitioned table has a single empty partition path. + String partition = HoodieTestDataGenerator.NO_PARTITION_PATH; + HoodieConsistentHashingMetadata metadata = ConsistentBucketIndexUtils.loadMetadata(table, partition).get(); + Assertions.assertEquals(targetBucketNum, metadata.getNodes().size()); + + // The file slice has no log files after clustering. + table.getSliceView().getLatestFileSlices(partition).forEach(fs -> { + Assertions.assertTrue(fs.getBaseFile().isPresent()); + Assertions.assertEquals(0, fs.getLogFiles().count()); + }); + } + /** * Test running archival after clustering * From ce8febd2c757dd860acdca9ea089b73e1b04037c Mon Sep 17 00:00:00 2001 From: Aditya Goenka Date: Fri, 12 Jun 2026 16:34:35 +0530 Subject: [PATCH 2/2] test(spark): merge non-partitioned resizing into testResizing per review Address review feedback on PR #18968: - Fold testResizingNonPartitioned into testResizing via a nonPartitioned parameter and resizingConfigParams() cross-product, since the two tests were identical apart from the partition dimension. - Update the stale HUDI-18161 reference to GitHub issue #18161. Co-Authored-By: Claude Opus 4.8 --- .../TestSparkConsistentBucketClustering.java | 58 ++++++------------- 1 file changed, 19 insertions(+), 39 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java index 07f847829b32d..93e01b75ae282 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java @@ -143,16 +143,21 @@ public void tearDown() throws IOException { } /** - * Test resizing with bucket number upper bound and lower bound + * Test resizing with bucket number upper bound and lower bound, on both partitioned and non-partitioned tables. + * + *

Non-partitioned coverage guards GitHub issue #18161: consistent hashing clustering used to fail on + * non-partitioned tables because the empty partition path was rejected by the execution strategy. For a + * non-partitioned table {@code dataGen.getPartitionPaths()} returns the single empty partition path, so the + * assertions below cover both cases without branching. * * @throws IOException */ @ParameterizedTest - @MethodSource("configParams") - public void testResizing(boolean isSplit, boolean rowWriterEnable, boolean single) throws IOException { + @MethodSource("resizingConfigParams") + public void testResizing(boolean isSplit, boolean rowWriterEnable, boolean single, boolean nonPartitioned) throws IOException { final int maxFileSize = isSplit ? 5120 : 128 * 1024 * 1024; final int targetBucketNum = isSplit ? 14 : 4; - setup(maxFileSize, Collections.emptyMap(), single); + setup(maxFileSize, Collections.emptyMap(), single, nonPartitioned); config.setValue("hoodie.datasource.write.row.writer.enable", String.valueOf(rowWriterEnable)); config.setValue("hoodie.metadata.enable", "false"); writeData(2000, true); @@ -175,41 +180,6 @@ public void testResizing(boolean isSplit, boolean rowWriterEnable, boolean singl }); } - /** - * Test bucket resizing (split / merge) on a non-partitioned table, covering both the single-job and - * multi-job execution strategies. See HUDI-18161 / issue #18161: consistent hashing clustering used to - * fail on non-partitioned tables because the empty partition path was rejected by the execution strategy. - * - * @throws IOException - */ - @ParameterizedTest - @MethodSource("configParams") - public void testResizingNonPartitioned(boolean isSplit, boolean rowWriterEnable, boolean single) throws IOException { - final int maxFileSize = isSplit ? 5120 : 128 * 1024 * 1024; - final int targetBucketNum = isSplit ? 14 : 4; - setup(maxFileSize, Collections.emptyMap(), single, true); - config.setValue("hoodie.datasource.write.row.writer.enable", String.valueOf(rowWriterEnable)); - config.setValue("hoodie.metadata.enable", "false"); - writeData(2000, true); - String clusteringTime = (String) writeClient.scheduleClustering(Option.empty()).get(); - writeClient.cluster(clusteringTime, true); - - metaClient = HoodieTableMetaClient.reload(metaClient); - final HoodieTable table = HoodieSparkTable.create(config, context, metaClient); - Assertions.assertEquals(2000, readRecords().size()); - - // Non-partitioned table has a single empty partition path. - String partition = HoodieTestDataGenerator.NO_PARTITION_PATH; - HoodieConsistentHashingMetadata metadata = ConsistentBucketIndexUtils.loadMetadata(table, partition).get(); - Assertions.assertEquals(targetBucketNum, metadata.getNodes().size()); - - // The file slice has no log files after clustering. - table.getSliceView().getLatestFileSlices(partition).forEach(fs -> { - Assertions.assertTrue(fs.getBaseFile().isPresent()); - Assertions.assertEquals(0, fs.getLogFiles().count()); - }); - } - /** * Test running archival after clustering * @@ -432,6 +402,16 @@ private static Stream configParams() { ); } + // configParams crossed with the partitioned / non-partitioned dimension (isSplit, rowWriterEnable, single, nonPartitioned). + private static Stream resizingConfigParams() { + return configParams().flatMap(args -> { + Object[] a = args.get(); + return Stream.of( + Arguments.of(a[0], a[1], a[2], false), + Arguments.of(a[0], a[1], a[2], true)); + }); + } + private static Stream configParamsForSorting() { return Stream.of( Arguments.of("begin_lat", true),