From 52cf7756bbed5720e95d9efa7fb1b219db52c5ee Mon Sep 17 00:00:00 2001 From: Rahil Chertara Date: Sun, 3 May 2026 15:12:27 -0700 Subject: [PATCH] fix(lance): auto-infer DefaultSparkRecordMerger for Lance base file format Lance is Spark-only and its writer/reader only handles Spark InternalRow, not Avro. When users create a Hudi table with Lance base file format, they currently must manually set hoodie.datasource.write.record.merger.impls to DefaultSparkRecordMerger. Without this config the write path falls back to HoodieAvroRecordMerger (the global default), which is incompatible with Lance's Arrow/columnar data format. Auto-inject DefaultSparkRecordMerger in HoodieSparkSqlWriter's mergeParamsAndGetHoodieConfig when BASE_FILE_FORMAT is LANCE and the user hasn't explicitly set RECORD_MERGE_IMPL_CLASSES. This covers all Spark write paths (DataFrame API, Spark SQL, structured streaming) and respects any explicit user override. Co-Authored-By: Claude Opus 4.7 --- .../scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 11 +++++++++++ .../table/read/TestHoodieFileGroupReaderOnSpark.scala | 9 +-------- .../apache/hudi/functional/TestLanceDataSource.scala | 6 +----- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index aaa79929256f6..d6f2baf28d677 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -1125,6 +1125,17 @@ class HoodieSparkSqlWriterInternal { mergedParams.put(HoodieTableConfig.DROP_PARTITION_COLUMNS.key, "false") } + // Lance is Spark-only and its writer/reader only handles Spark InternalRow, not Avro. + // Auto-set DefaultSparkRecordMerger when the user hasn't explicitly configured a merger. + if (HoodieFileFormat.LANCE.name().equalsIgnoreCase( + mergedParams.getOrElse(HoodieTableConfig.BASE_FILE_FORMAT.key(), + HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().name())) + && !optParams.contains(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key()) + && !optParams.contains(HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY)) { + mergedParams.put(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), + classOf[DefaultSparkRecordMerger].getName) + } + val tableVersion = if (tableConfig != null) { tableConfig.getTableVersion } else { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala index d4eae9e79d74d..fab9f5d855513 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala @@ -19,7 +19,7 @@ package org.apache.hudi.common.table.read -import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, DefaultSparkRecordMerger, HoodieSchemaConversionUtils, HoodieSparkUtils, SparkAdapterSupport, SparkFileFormatInternalRowReaderContext} +import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSchemaConversionUtils, HoodieSparkUtils, SparkAdapterSupport, SparkFileFormatInternalRowReaderContext} import org.apache.hudi.DataSourceWriteOptions.{OPERATION, RECORDKEY_FIELD, TABLE_TYPE} import org.apache.hudi.common.config.{HoodieReaderConfig, RecordMergeMode, TypedProperties} import org.apache.hudi.common.engine.HoodieReaderContext @@ -132,19 +132,12 @@ class TestHoodieFileGroupReaderOnSpark extends TestHoodieFileGroupReaderBase[Int val inputDF: Dataset[Row] = AvroConversionUtils.createDataFrame(genericRecords, schemaStr, spark); // Check if Lance format is being used and add required configuration - val isLanceFormat = options.getOrDefault(HoodieTableConfig.BASE_FILE_FORMAT.key(), "").equalsIgnoreCase("LANCE") - var writer = inputDF.write.format("hudi") .options(options) .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same .option("hoodie.datasource.write.operation", operation) .option("hoodie.datasource.write.table.type", "MERGE_ON_READ") - // Lance requires DefaultSparkRecordMerger for Spark InternalRow compatibility - if (isLanceFormat) { - writer = writer.option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - } - writer.mode(if (firstCommit) SaveMode.Overwrite else SaveMode.Append) .save(getBasePath) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala index 1cd5647d99495..3de7aa7061dfb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala @@ -18,7 +18,6 @@ package org.apache.hudi.functional import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.DefaultSparkRecordMerger import org.apache.hudi.blob.BlobTestHelpers import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} import org.apache.hudi.common.engine.HoodieLocalEngineContext @@ -1103,7 +1102,6 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val createTablePartitionClause = if (isPartitioned) "partitioned by (dt)" else "" // CREATE TABLE with Lance configuration - // Lance format requires Spark record merger for writing spark.sql(s""" create table $tableName ( id int, @@ -1115,8 +1113,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { tblproperties ( hoodie.table.base.file.format = 'LANCE', type = '${tableType.name()}', - primaryKey = 'id', - hoodie.datasource.write.record.merger.impls = '${classOf[DefaultSparkRecordMerger].getName}' + primaryKey = 'id' ) $createTablePartitionClause location '$tablePath' @@ -1488,7 +1485,6 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) // Add operation if specified writer = operation match {