Bug Description
What happened:
When using hoodie.write.record.merge.mode=CUSTOM with a custom merger class specified via hoodie.write.record.merge.custom.implementation.classes, inline clustering (and standalone clustering) fails with:
java.lang.IllegalArgumentException: No valid spark merger implementation set for
`hoodie.write.record.merge.custom.implementation.classes`
The error is thrown during the clustering execution phase when Hudi attempts to read source file groups to produce the clustered output. Every clustering task fails on every executor with the same error. The same custom merger class works correctly for inline compaction on the same table without any issues. Additionally, the merger is successfully used on other COW tables and all MOR read paths function correctly with the same implementation. Therefore, the issue appears to be specific to the clustering code path rather than the custom merger implementation itself.
What you expected:
Clustering should successfully read file groups using the configured custom merger class, the same way inline compaction does. Since hoodie.write.record.merge.custom.implementation.classes is explicitly set in the write config, clustering should be able to resolve and instantiate the custom merger implementation.
Steps to reproduce:
-
Create a MOR table with hoodie.write.record.merge.mode=CUSTOM and a custom merger class that extends HoodieSparkRecordMerger, e.g.:
hoodie.write.record.merge.mode=CUSTOM
hoodie.write.record.merge.custom.implementation.classes=com.example.CustomHoodieSparkRecordMerger
hoodie.write.record.merge.strategy.id=<custom-uuid>
-
Write data to the table (upserts) — these succeed.
-
Enable inline clustering:
hoodie.clustering.inline=true
hoodie.clustering.inline.max.commits=1
hoodie.clustering.plan.strategy.target.file.max.bytes=10485760 // low threshold for easy reproduce
hoodie.clustering.plan.strategy.small.file.limit=5242880 // low threshold for easy reproduce
hoodie.clustering.plan.strategy.partition.selected=<list of partitions - comma separated> hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
-
After the first successful delta commit, clustering is triggered automatically. Observe that all clustering tasks fail with IllegalArgumentException: No valid spark merger implementation set.
-
Note that inline compaction (hoodie.compact.inline=true) on the same table with the same configs succeeds without error.
Additional Notes / Investigation Hunch
Note: The following is our working hypothesis based on code inspection — not a confirmed root cause. We may be wrong; please treat this as a starting point for investigation.
Looking at the stack trace, the failure path is:
MultipleSparkJobExecutionStrategy → ClusteringExecutionStrategy.getFileGroupReader() → HoodieFileGroupReader.<init> → HoodieReaderContext.initRecordMerger() → BaseSparkInternalRowReaderContext.getRecordMerger()
The key question is: why does initRecordMerger not find the custom class when it is present in the write config?
One possible explanation we noticed from looking at ClusteringExecutionStrategy:
The clustering execution strategy builds its own TypedProperties to pass to HoodieFileGroupReader (via something like getReaderProperties()). If this method only copies a limited set of properties (e.g., spill-map and memory-related keys) and does not propagate hoodie.write.record.merge.custom.implementation.classes, then initRecordMerger would receive an empty/missing value for that key.
Meanwhile, compaction works because it goes through a different code path (SparkMergeHelper / HoodieSparkMergeOnReadTableCompactor) which has access to the full write config props.
A secondary possible issue: even if ConfigUtils.getMergeProps() enriches the props with table config (hoodie.properties), initRecordMerger may be called with the original un-enriched TypedProperties rather than the enriched copy.
We have not confirmed this by stepping through the code — if the Hudi team can verify whether getReaderProperties() in ClusteringExecutionStrategy explicitly propagates the custom merge implementation class key, that would either confirm or rule out this hypothesis.
Environment
Hudi version: 1.1.0 (hudi-spark3.5-bundle_2.12-1.1.0.jar)
Query engine: (Spark/Flink/Trino etc): Spark 3.5 (AWS EMR-flavored, spark-sql_2.12-3.5.6-amzn-1)
Relevant configs:
hoodie.write.record.merge.mode=CUSTOM
hoodie.write.record.merge.custom.implementation.classes=com.example.CustomHoodieSparkRecordMerger
hoodie.write.record.merge.strategy.id=<custom-uuid>
hoodie.datasource.write.table.type=MERGE_ON_READ
hoodie.clustering.inline=true
hoodie.clustering.inline.max.commits=1
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
hoodie.clustering.plan.strategy.class=org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.clean.policy.failed.writes=LAZY
Logs and Stack Trace
Error thrown on each executor task (all tasks fail, Spark retries 4 times per task before aborting):
WARN [task-result-getter] - Lost task 0.0 in stage 39.0:
java.lang.IllegalArgumentException: No valid spark merger implementation set for
`hoodie.write.record.merge.custom.implementation.classes`
at org.apache.hudi.BaseSparkInternalRowReaderContext.getRecordMerger(BaseSparkInternalRowReaderContext.java:74)
at org.apache.hudi.common.engine.HoodieReaderContext.initRecordMerger(HoodieReaderContext.java:331)
at org.apache.hudi.common.engine.HoodieReaderContext.initRecordMerger(HoodieReaderContext.java:289)
at org.apache.hudi.common.table.read.HoodieFileGroupReader.<init>(HoodieFileGroupReader.java:111)
at org.apache.hudi.common.table.read.HoodieFileGroupReader.<init>(HoodieFileGroupReader.java:70)
at org.apache.hudi.common.table.read.HoodieFileGroupReader$Builder.build(HoodieFileGroupReader.java:534)
at org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy.getFileGroupReader(ClusteringExecutionStrategy.java:156)
at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.access$100(MultipleSparkJobExecutionStrategy.java:95)
at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy$1.call(MultipleSparkJobExecutionStrategy.java:323)
at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy$1.call(MultipleSparkJobExecutionStrategy.java:314)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
...
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:635)
Driver-level error after all tasks fail:
ERROR [stream execution thread] (BaseHoodieWriteClient.java:641) - Inline compaction or
clustering failed for table s3://...
java.util.concurrent.CompletionException: java.util.concurrent.CancellationException
at org.apache.hudi.common.util.FutureUtils.lambda$null$0(FutureUtils.java:52)
...
Caused by: java.util.concurrent.CancellationException
Bug Description
What happened:
When using
hoodie.write.record.merge.mode=CUSTOMwith a custom merger class specified viahoodie.write.record.merge.custom.implementation.classes, inline clustering (and standalone clustering) fails with:The error is thrown during the clustering execution phase when Hudi attempts to read source file groups to produce the clustered output. Every clustering task fails on every executor with the same error. The same custom merger class works correctly for inline compaction on the same table without any issues. Additionally, the merger is successfully used on other COW tables and all MOR read paths function correctly with the same implementation. Therefore, the issue appears to be specific to the clustering code path rather than the custom merger implementation itself.
What you expected:
Clustering should successfully read file groups using the configured custom merger class, the same way inline compaction does. Since
hoodie.write.record.merge.custom.implementation.classesis explicitly set in the write config, clustering should be able to resolve and instantiate the custom merger implementation.Steps to reproduce:
Create a MOR table with
hoodie.write.record.merge.mode=CUSTOMand a custom merger class that extendsHoodieSparkRecordMerger, e.g.:Write data to the table (upserts) — these succeed.
Enable inline clustering:
After the first successful delta commit, clustering is triggered automatically. Observe that all clustering tasks fail with
IllegalArgumentException: No valid spark merger implementation set.Note that inline compaction (
hoodie.compact.inline=true) on the same table with the same configs succeeds without error.Additional Notes / Investigation Hunch
Looking at the stack trace, the failure path is:
MultipleSparkJobExecutionStrategy→ClusteringExecutionStrategy.getFileGroupReader()→HoodieFileGroupReader.<init>→HoodieReaderContext.initRecordMerger()→BaseSparkInternalRowReaderContext.getRecordMerger()The key question is: why does
initRecordMergernot find the custom class when it is present in the write config?One possible explanation we noticed from looking at
ClusteringExecutionStrategy:The clustering execution strategy builds its own
TypedPropertiesto pass toHoodieFileGroupReader(via something likegetReaderProperties()). If this method only copies a limited set of properties (e.g., spill-map and memory-related keys) and does not propagatehoodie.write.record.merge.custom.implementation.classes, theninitRecordMergerwould receive an empty/missing value for that key.Meanwhile, compaction works because it goes through a different code path (
SparkMergeHelper/HoodieSparkMergeOnReadTableCompactor) which has access to the full write config props.A secondary possible issue: even if
ConfigUtils.getMergeProps()enriches the props with table config (hoodie.properties),initRecordMergermay be called with the original un-enrichedTypedPropertiesrather than the enriched copy.We have not confirmed this by stepping through the code — if the Hudi team can verify whether
getReaderProperties()inClusteringExecutionStrategyexplicitly propagates the custom merge implementation class key, that would either confirm or rule out this hypothesis.Environment
Hudi version: 1.1.0 (
hudi-spark3.5-bundle_2.12-1.1.0.jar)Query engine: (Spark/Flink/Trino etc): Spark 3.5 (AWS EMR-flavored,
spark-sql_2.12-3.5.6-amzn-1)Relevant configs:
Logs and Stack Trace
Error thrown on each executor task (all tasks fail, Spark retries 4 times per task before aborting):
Driver-level error after all tasks fail: