diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index aaa79929256f6..d6f2baf28d677 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -1125,6 +1125,17 @@ class HoodieSparkSqlWriterInternal { mergedParams.put(HoodieTableConfig.DROP_PARTITION_COLUMNS.key, "false") } + // Lance is Spark-only and its writer/reader only handles Spark InternalRow, not Avro. + // Auto-set DefaultSparkRecordMerger when the user hasn't explicitly configured a merger. + if (HoodieFileFormat.LANCE.name().equalsIgnoreCase( + mergedParams.getOrElse(HoodieTableConfig.BASE_FILE_FORMAT.key(), + HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().name())) + && !optParams.contains(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key()) + && !optParams.contains(HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY)) { + mergedParams.put(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), + classOf[DefaultSparkRecordMerger].getName) + } + val tableVersion = if (tableConfig != null) { tableConfig.getTableVersion } else { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala index d4eae9e79d74d..fab9f5d855513 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala @@ -19,7 +19,7 @@ package org.apache.hudi.common.table.read -import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, DefaultSparkRecordMerger, HoodieSchemaConversionUtils, HoodieSparkUtils, SparkAdapterSupport, SparkFileFormatInternalRowReaderContext} +import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSchemaConversionUtils, HoodieSparkUtils, SparkAdapterSupport, SparkFileFormatInternalRowReaderContext} import org.apache.hudi.DataSourceWriteOptions.{OPERATION, RECORDKEY_FIELD, TABLE_TYPE} import org.apache.hudi.common.config.{HoodieReaderConfig, RecordMergeMode, TypedProperties} import org.apache.hudi.common.engine.HoodieReaderContext @@ -132,19 +132,12 @@ class TestHoodieFileGroupReaderOnSpark extends TestHoodieFileGroupReaderBase[Int val inputDF: Dataset[Row] = AvroConversionUtils.createDataFrame(genericRecords, schemaStr, spark); // Check if Lance format is being used and add required configuration - val isLanceFormat = options.getOrDefault(HoodieTableConfig.BASE_FILE_FORMAT.key(), "").equalsIgnoreCase("LANCE") - var writer = inputDF.write.format("hudi") .options(options) .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same .option("hoodie.datasource.write.operation", operation) .option("hoodie.datasource.write.table.type", "MERGE_ON_READ") - // Lance requires DefaultSparkRecordMerger for Spark InternalRow compatibility - if (isLanceFormat) { - writer = writer.option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) - } - writer.mode(if (firstCommit) SaveMode.Overwrite else SaveMode.Append) .save(getBasePath) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala index 1cd5647d99495..3de7aa7061dfb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala @@ -18,7 +18,6 @@ package org.apache.hudi.functional import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.DefaultSparkRecordMerger import org.apache.hudi.blob.BlobTestHelpers import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} import org.apache.hudi.common.engine.HoodieLocalEngineContext @@ -1103,7 +1102,6 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val createTablePartitionClause = if (isPartitioned) "partitioned by (dt)" else "" // CREATE TABLE with Lance configuration - // Lance format requires Spark record merger for writing spark.sql(s""" create table $tableName ( id int, @@ -1115,8 +1113,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { tblproperties ( hoodie.table.base.file.format = 'LANCE', type = '${tableType.name()}', - primaryKey = 'id', - hoodie.datasource.write.record.merger.impls = '${classOf[DefaultSparkRecordMerger].getName}' + primaryKey = 'id' ) $createTablePartitionClause location '$tablePath' @@ -1488,7 +1485,6 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { .option(PRECOMBINE_FIELD.key(), "age") .option(TABLE_NAME.key(), tableName) .option(HoodieWriteConfig.TBL_NAME.key(), tableName) - .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName) // Add operation if specified writer = operation match {