Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,24 @@ import org.apache.hudi.common.engine.TaskContextSupplier
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.{OrderingValues, ReflectionUtils}
import org.apache.hudi.common.util.{OrderingValues, PartitionPathEncodeUtils, ReflectionUtils}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.data.HoodieJavaRDD
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.index.{HoodieIndex, SparkHoodieIndexFactory}
import org.apache.hudi.index.HoodieIndex.BucketIndexEngineType
import org.apache.hudi.keygen.{AutoRecordGenWrapperKeyGenerator, BuiltinKeyGenerator, KeyGenUtils}
import org.apache.hudi.keygen.{AutoRecordGenWrapperKeyGenerator, BuiltinKeyGenerator, KeyGenUtils, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable}
import org.apache.hudi.table.action.commit.{BucketBulkInsertDataInternalWriterHelper, BulkInsertDataInternalWriterHelper, ConsistentBucketBulkInsertDataInternalWriterHelper, ParallelismHelper}
import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.{DataFrame, Dataset, Row, functions}
import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath, composeNestedFieldPath, getNestedInternalRowValue}
import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Project
Expand Down Expand Up @@ -94,44 +96,84 @@ object HoodieDatasetBulkInsertHelper
val keyGeneratorClassName = config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
"Key-generator class name is required")

val prependedRdd: RDD[InternalRow] = {
injectSQLConf(df.queryExecution.toRdd.mapPartitions { iter =>
val typedProps = TypedProperties.copy(config.getProps)
if (autoGenerateRecordKeys) {
if (autoGenerateRecordKeys) {
// Auto-keygen needs per-task partitionId from TaskContext and is stateful (rowId++),
// so it can't be expressed as a driver-side UDF closure. Keep the RDD path here.
// TODO: a custom Catalyst Expression could let this path avoid the toRdd round-trip too.
val prependedRdd: RDD[InternalRow] = {
injectSQLConf(df.queryExecution.toRdd.mapPartitions { iter =>
val typedProps = TypedProperties.copy(config.getProps)
typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, String.valueOf(TaskContext.getPartitionId()))
typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime)
}
val sparkKeyGenerator =
ReflectionUtils.loadClass(HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName), typedProps)
.asInstanceOf[BuiltinKeyGenerator]
val keyGenerator: BuiltinKeyGenerator = if (autoGenerateRecordKeys) {
new AutoRecordGenWrapperKeyGenerator(typedProps, sparkKeyGenerator).asInstanceOf[BuiltinKeyGenerator]
} else {
sparkKeyGenerator
}

iter.map { row =>
// auto generate record keys if needed
val metaFields = new Array[UTF8String](5)
metaFields(2) = keyGenerator.getRecordKey(row, schema)
metaFields(3) = keyGenerator.getPartitionPath(row, schema)
metaFields(0) = UTF8String.EMPTY_UTF8
metaFields(1) = UTF8String.EMPTY_UTF8
metaFields(4) = UTF8String.EMPTY_UTF8

// TODO use mutable row, avoid re-allocating
sparkAdapter.createInternalRow(metaFields, row, false)
}
}, SQLConf.get)
}
val sparkKeyGenerator =
ReflectionUtils.loadClass(HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName), typedProps)
.asInstanceOf[BuiltinKeyGenerator]
val keyGenerator: BuiltinKeyGenerator =
new AutoRecordGenWrapperKeyGenerator(typedProps, sparkKeyGenerator).asInstanceOf[BuiltinKeyGenerator]

iter.map { row =>
val metaFields = new Array[UTF8String](5)
metaFields(2) = keyGenerator.getRecordKey(row, schema)
metaFields(3) = keyGenerator.getPartitionPath(row, schema)
metaFields(0) = UTF8String.EMPTY_UTF8
metaFields(1) = UTF8String.EMPTY_UTF8
metaFields(4) = UTF8String.EMPTY_UTF8
sparkAdapter.createInternalRow(metaFields, row, false)
}
}, SQLConf.get)
}

val dedupedRdd = if (config.shouldCombineBeforeInsert) {
dedupeRows(prependedRdd, updatedSchema, tableConfig.getOrderingFields.asScala.toList, SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism)
} else {
prependedRdd
}

val dedupedRdd = if (config.shouldCombineBeforeInsert) {
dedupeRows(prependedRdd, updatedSchema, tableConfig.getOrderingFields.asScala.toList, SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism)
sparkAdapter.getUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, updatedSchema)
} else {
prependedRdd
}
// Non auto-keygen path. Compute meta columns via Catalyst expressions where possible:
// - Tier 1 (Nonpartitioned, single record-key field): col(rk).cast(String) + lit("")
// - Tier 2 (Simple, default partition formatter flags): col(rk).cast(String) + col(pp).cast(String)
// - Tier 3 (everything else): anonymous UDF invoking BuiltinKeyGenerator on Row
//
// Tier 1/2 are pure column projections, so Catalyst codegens them and we avoid the
// toRdd.mapPartitions round-trip. Tier 3 keeps a UDF over the DataFrame so the projection
// remains in Catalyst; the UDF is anonymous (not registered on the SparkSession) so
// nothing leaks across writes.
//
// Tier 1/2 do not reproduce the canonical keygen's null/empty record-key validation:
// a null record-key value passes through as SQL NULL instead of throwing HoodieKeyException.
// This matches the behaviour of the pre-existing fast paths in this helper prior to the
// RDD-based rewrite. Tier 3 retains the strict canonical behaviour.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The comment notes that Tier 1/2 pass NULL record-keys through instead of throwing, but the divergence also covers empty-string record keys — SimpleKeyGenerator.getRecordKey(Row) calls requireNonNullNonEmptyKey which throws on both null and empty. Worth mentioning explicitly so future readers don't assume empty strings still validate.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

val typedProps = TypedProperties.copy(config.getProps)
val keyGenerator = ReflectionUtils
.loadClass(HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName), typedProps)
.asInstanceOf[BuiltinKeyGenerator]

val (recordKeyCol, partitionPathCol) =
buildKeygenColumns(df, keyGeneratorClassName, typedProps, keyGenerator)

sparkAdapter.getUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, updatedSchema)
val withMetaCols = df
.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKeyCol)
.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionPathCol)
.withColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD, functions.lit("").cast(StringType))
.withColumn(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, functions.lit("").cast(StringType))
.withColumn(HoodieRecord.FILENAME_METADATA_FIELD, functions.lit("").cast(StringType))

val orderedDF = withMetaCols.select(updatedSchema.fieldNames.map(functions.col): _*)

if (config.shouldCombineBeforeInsert) {
val dedupedRdd = dedupeRows(
injectSQLConf(orderedDF.queryExecution.toRdd, SQLConf.get),
updatedSchema,
tableConfig.getOrderingFields.asScala.toList,
SparkHoodieIndexFactory.isGlobalIndex(config),
targetParallelism)
sparkAdapter.getUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, updatedSchema)
} else {
orderedDF
}
}
} else {
// NOTE: In cases when we're not populating meta-fields we actually don't
// need access to the [[InternalRow]] and therefore can avoid the need
Expand All @@ -146,6 +188,84 @@ object HoodieDatasetBulkInsertHelper
partitioner.repartitionRecords(updatedDF, targetParallelism)
}

/**
* Builds the record-key / partition-path projection columns for the non auto-keygen path,
* dispatching to a fast-path tier when the keygen is amenable to a pure Catalyst projection.
*
* Tier 1: NonpartitionedKeyGenerator with a single record-key field.
* Tier 2: SimpleKeyGenerator with a single record-key + single partition-path field, when the
* partition formatter flags are reproducible as Catalyst expressions. Currently:
* default flags, or hive-style partitioning. URL-encoded and slash-separated date
* partitioning fall through to Tier 3 -- the URL escape table doesn't have an efficient
* pure-Catalyst equivalent, and slash-separated dates were added in 1.2.0 and exercise
* a separate formatter branch we'd rather not encode twice.
* Tier 3: Anonymous UDF that calls into the supplied [[BuiltinKeyGenerator]].
*/
private def buildKeygenColumns(df: DataFrame,
keyGeneratorClassName: String,
typedProps: TypedProperties,
keyGenerator: BuiltinKeyGenerator): (org.apache.spark.sql.Column, org.apache.spark.sql.Column) = {
val recordKeyFields = keyGenerator.getRecordKeyFieldNames
val partitionPathFields = keyGenerator.getPartitionPathFields

val hiveStyle = typedProps.getBoolean(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key,
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.defaultValue.toBoolean)
val urlEncode = typedProps.getBoolean(KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key,
KeyGeneratorOptions.URL_ENCODE_PARTITIONING.defaultValue.toBoolean)
val slashSep = typedProps.getBoolean(KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.key,
KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.defaultValue.toBoolean)

val tier1 = keyGeneratorClassName == classOf[NonpartitionedKeyGenerator].getName &&

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The class-name comparison uses the user-supplied keyGeneratorClassName directly, so users who configured the Avro variant (SimpleAvroKeyGenerator / NonpartitionedAvroKeyGenerator) won't hit Tier 1/2 even though the factory converts them to the Spark variant at load time. Could compare against HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName) here. Not a correctness issue — Tier 3 still works for those — just a missed optimization for that config path.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

recordKeyFields.size == 1

val tier2 = keyGeneratorClassName == classOf[SimpleKeyGenerator].getName &&
recordKeyFields.size == 1 && partitionPathFields.size == 1 &&
!urlEncode && !slashSep

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The Tier 2 gate checks the keygen class and formatter flags but not the partition column's type. For a Timestamp partition column, col(field).cast(StringType) formats differently than the canonical Timestamp.toString() that combinePartitionPath uses (e.g. Spark's cast drops trailing zeros and uses session timezone, Timestamp.toString keeps fractional seconds and uses JVM default tz). Same concern applies to Tier 1 / the recordKeyCol cast. The parity test only exercises a String partition column so this divergence wouldn't be caught — could you confirm the type-mismatch case is intentional (matches the pre-#5470 fast path) and maybe add a StringType guard or test coverage for a Timestamp partition?

- AI-generated; verify before applying. React 👍/👎 to flag quality.


if (tier1) {
(functions.col(recordKeyFields.get(0)).cast(StringType),
functions.lit("").cast(StringType))
} else if (tier2) {
val recordKeyCol = functions.col(recordKeyFields.get(0)).cast(StringType)
val partitionPathCol = buildSimpleKeygenPartitionPathColumn(partitionPathFields.get(0), hiveStyle)
(recordKeyCol, partitionPathCol)
} else {
val recordKeyUdf = functions.udf(
new UDF1[Row, String] { override def call(row: Row): String = keyGenerator.getRecordKey(row) },
StringType)
val partitionPathUdf = functions.udf(
new UDF1[Row, String] { override def call(row: Row): String = keyGenerator.getPartitionPath(row) },
StringType)
val rowStruct = functions.struct(df.schema.fieldNames.map(functions.col): _*)
(recordKeyUdf(rowStruct), partitionPathUdf(rowStruct))
}
}

/**
* Tier-2 SimpleKeyGenerator partition-path expression for a single field with default URL/slash
* formatter flags, mirroring [[PartitionPathFormatterBase#combine]]:
*
* <ul>
* <li>!hiveStyle: handleEmpty(value) -- null/"" -> __HIVE_DEFAULT_PARTITION__</li>
* <li>hiveStyle: "<field>=" + handleEmpty(value)</li>
* </ul>
*/
private def buildSimpleKeygenPartitionPathColumn(partitionField: String,
hiveStyle: Boolean): org.apache.spark.sql.Column = {
val rawValue = functions.col(partitionField).cast(StringType)
val emptyHandled = functions.coalesce(
functions.when(functions.length(rawValue) === 0,
functions.lit(PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH))
.otherwise(rawValue),
functions.lit(PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH))

if (hiveStyle) {
functions.concat(functions.lit(partitionField + "="), emptyHandled)
} else {
emptyHandled
}
}

/**
* Perform bulk insert for [[Dataset<Row>]], will not change timeline/index, return
* information about write files.
Expand Down
Loading
Loading