Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,17 @@ class HoodieSparkSqlWriterInternal {
mergedParams.put(HoodieTableConfig.DROP_PARTITION_COLUMNS.key, "false")
}

// Lance is Spark-only and its writer/reader only handles Spark InternalRow, not Avro.
// Auto-set DefaultSparkRecordMerger when the user hasn't explicitly configured a merger.
if (HoodieFileFormat.LANCE.name().equalsIgnoreCase(
mergedParams.getOrElse(HoodieTableConfig.BASE_FILE_FORMAT.key(),
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().name()))
&& !optParams.contains(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key())
&& !optParams.contains(HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY)) {
mergedParams.put(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
}

val tableVersion = if (tableConfig != null) {
tableConfig.getTableVersion
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.hudi.common.table.read

import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, DefaultSparkRecordMerger, HoodieSchemaConversionUtils, HoodieSparkUtils, SparkAdapterSupport, SparkFileFormatInternalRowReaderContext}
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSchemaConversionUtils, HoodieSparkUtils, SparkAdapterSupport, SparkFileFormatInternalRowReaderContext}
import org.apache.hudi.DataSourceWriteOptions.{OPERATION, RECORDKEY_FIELD, TABLE_TYPE}
import org.apache.hudi.common.config.{HoodieReaderConfig, RecordMergeMode, TypedProperties}
import org.apache.hudi.common.engine.HoodieReaderContext
Expand Down Expand Up @@ -132,19 +132,12 @@ class TestHoodieFileGroupReaderOnSpark extends TestHoodieFileGroupReaderBase[Int
val inputDF: Dataset[Row] = AvroConversionUtils.createDataFrame(genericRecords, schemaStr, spark);

// Check if Lance format is being used and add required configuration
val isLanceFormat = options.getOrDefault(HoodieTableConfig.BASE_FILE_FORMAT.key(), "").equalsIgnoreCase("LANCE")

var writer = inputDF.write.format("hudi")
.options(options)
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
.option("hoodie.datasource.write.operation", operation)
.option("hoodie.datasource.write.table.type", "MERGE_ON_READ")

// Lance requires DefaultSparkRecordMerger for Spark InternalRow compatibility
if (isLanceFormat) {
writer = writer.option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName)
}

writer.mode(if (firstCommit) SaveMode.Overwrite else SaveMode.Append)
.save(getBasePath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hudi.functional

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DefaultSparkRecordMerger
import org.apache.hudi.blob.BlobTestHelpers
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
import org.apache.hudi.common.engine.HoodieLocalEngineContext
Expand Down Expand Up @@ -1103,7 +1102,6 @@ class TestLanceDataSource extends HoodieSparkClientTestBase {
val createTablePartitionClause = if (isPartitioned) "partitioned by (dt)" else ""

// CREATE TABLE with Lance configuration
// Lance format requires Spark record merger for writing
spark.sql(s"""
create table $tableName (
id int,
Expand All @@ -1115,8 +1113,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase {
tblproperties (
hoodie.table.base.file.format = 'LANCE',
type = '${tableType.name()}',
primaryKey = 'id',
hoodie.datasource.write.record.merger.impls = '${classOf[DefaultSparkRecordMerger].getName}'
primaryKey = 'id'
)
$createTablePartitionClause
location '$tablePath'
Expand Down Expand Up @@ -1488,7 +1485,6 @@ class TestLanceDataSource extends HoodieSparkClientTestBase {
.option(PRECOMBINE_FIELD.key(), "age")
.option(TABLE_NAME.key(), tableName)
.option(HoodieWriteConfig.TBL_NAME.key(), tableName)
.option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), classOf[DefaultSparkRecordMerger].getName)

// Add operation if specified
writer = operation match {
Expand Down
Loading