From d10750380fc2be2def1941eb41d600cc2ad7f162 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 19 May 2026 22:15:48 -0700 Subject: [PATCH 1/2] Attempting to revert the rdd usage in row writer code paths --- .../hudi/HoodieDatasetBulkInsertHelper.scala | 109 ++++++++++++------ 1 file changed, 75 insertions(+), 34 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index 0ee2abd847a51..6b5d78b391493 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -40,7 +40,7 @@ import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.{DataFrame, Dataset, Row, functions} import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath, composeNestedFieldPath, getNestedInternalRowValue} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} @@ -94,44 +94,85 @@ object HoodieDatasetBulkInsertHelper val keyGeneratorClassName = config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME, "Key-generator class name is required") - val prependedRdd: RDD[InternalRow] = { - injectSQLConf(df.queryExecution.toRdd.mapPartitions { iter => - val typedProps = TypedProperties.copy(config.getProps) - if (autoGenerateRecordKeys) { + if (autoGenerateRecordKeys) { + // Auto-keygen needs per-task partitionId from TaskContext and is stateful (rowId++), + // so it can't live in a driver-side UDF closure. Keep the RDD path for this case only. + val prependedRdd: RDD[InternalRow] = { + injectSQLConf(df.queryExecution.toRdd.mapPartitions { iter => + val typedProps = TypedProperties.copy(config.getProps) typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, String.valueOf(TaskContext.getPartitionId())) typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime) - } - val sparkKeyGenerator = - ReflectionUtils.loadClass(HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName), typedProps) - .asInstanceOf[BuiltinKeyGenerator] - val keyGenerator: BuiltinKeyGenerator = if (autoGenerateRecordKeys) { - new AutoRecordGenWrapperKeyGenerator(typedProps, sparkKeyGenerator).asInstanceOf[BuiltinKeyGenerator] - } else { - sparkKeyGenerator - } - - iter.map { row => - // auto generate record keys if needed - val metaFields = new Array[UTF8String](5) - metaFields(2) = keyGenerator.getRecordKey(row, schema) - metaFields(3) = keyGenerator.getPartitionPath(row, schema) - metaFields(0) = UTF8String.EMPTY_UTF8 - metaFields(1) = UTF8String.EMPTY_UTF8 - metaFields(4) = UTF8String.EMPTY_UTF8 - - // TODO use mutable row, avoid re-allocating - sparkAdapter.createInternalRow(metaFields, row, false) - } - }, SQLConf.get) - } + val sparkKeyGenerator = + ReflectionUtils.loadClass(HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName), typedProps) + .asInstanceOf[BuiltinKeyGenerator] + val keyGenerator: BuiltinKeyGenerator = + new AutoRecordGenWrapperKeyGenerator(typedProps, sparkKeyGenerator).asInstanceOf[BuiltinKeyGenerator] + + iter.map { row => + val metaFields = new Array[UTF8String](5) + metaFields(2) = keyGenerator.getRecordKey(row, schema) + metaFields(3) = keyGenerator.getPartitionPath(row, schema) + metaFields(0) = UTF8String.EMPTY_UTF8 + metaFields(1) = UTF8String.EMPTY_UTF8 + metaFields(4) = UTF8String.EMPTY_UTF8 + sparkAdapter.createInternalRow(metaFields, row, false) + } + }, SQLConf.get) + } + + val dedupedRdd = if (config.shouldCombineBeforeInsert) { + dedupeRows(prependedRdd, updatedSchema, tableConfig.getOrderingFields.asScala.toList, SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism) + } else { + prependedRdd + } - val dedupedRdd = if (config.shouldCombineBeforeInsert) { - dedupeRows(prependedRdd, updatedSchema, tableConfig.getOrderingFields.asScala.toList, SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism) + sparkAdapter.getUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, updatedSchema) } else { - prependedRdd - } + // UDF-based key-gen path. Avoids the toRdd.mapPartitions round-trip on the hot path + // by registering record-key / partition-path UDFs against the DataFrame directly. + val typedProps = TypedProperties.copy(config.getProps) + val keyGenerator = ReflectionUtils + .loadClass(HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName), typedProps) + .asInstanceOf[BuiltinKeyGenerator] + + val tableName = config.getString(HoodieWriteConfig.TBL_NAME.key) + val recordKeyFn = s"hudi_recordkey_gen_${tableName}_$instantTime" + val partitionPathFn = s"hudi_partition_gen_${tableName}_$instantTime" - sparkAdapter.getUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, updatedSchema) + val spark = df.sparkSession + spark.udf.register(recordKeyFn, (r: Row) => keyGenerator.getRecordKey(r)) + spark.udf.register(partitionPathFn, (r: Row) => keyGenerator.getPartitionPath(r)) + + val originalCols = df.schema.fieldNames.map(functions.col) + val rowStruct = functions.struct(originalCols: _*) + + val withMetaCols = df + .withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, + functions.callUDF(recordKeyFn, rowStruct)) + .withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, + functions.callUDF(partitionPathFn, rowStruct)) + .withColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD, + functions.lit("").cast(StringType)) + .withColumn(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, + functions.lit("").cast(StringType)) + .withColumn(HoodieRecord.FILENAME_METADATA_FIELD, + functions.lit("").cast(StringType)) + + val orderedCols = updatedSchema.fieldNames.map(functions.col) + val orderedDF = withMetaCols.select(orderedCols: _*) + + if (config.shouldCombineBeforeInsert) { + val dedupedRdd = dedupeRows( + injectSQLConf(orderedDF.queryExecution.toRdd, SQLConf.get), + updatedSchema, + tableConfig.getOrderingFields.asScala.toList, + SparkHoodieIndexFactory.isGlobalIndex(config), + targetParallelism) + sparkAdapter.getUnsafeUtils.createDataFrameFromRDD(spark, dedupedRdd, updatedSchema) + } else { + orderedDF + } + } } else { // NOTE: In cases when we're not populating meta-fields we actually don't // need access to the [[InternalRow]] and therefore can avoid the need From 97697eeb845265f29384af61bd484aa63f439ffd Mon Sep 17 00:00:00 2001 From: sivabalan Date: Fri, 12 Jun 2026 13:49:04 -0700 Subject: [PATCH 2/2] perf(spark): restore Catalyst fast-paths for SimpleKeyGen/Nonpartitioned bulk insert The post-#5470 HoodieDatasetBulkInsertHelper rewrite routed every keygen through df.queryExecution.toRdd.mapPartitions(...) which materialises rows and pays a per-row keygen reflection cost. Restore tiered dispatch so the two common keygen classes stay as Catalyst column projections: - Tier 1 (NonpartitionedKeyGenerator, single record-key field): col(rk).cast(String) + lit("") -- no UDF, no toRdd round-trip. - Tier 2 (SimpleKeyGenerator, single record-key + single partition-path, default URL/slash flags): col(rk).cast(String) plus a partition-path expression mirroring PartitionPathFormatterBase#combine including the handleEmpty -> __HIVE_DEFAULT_PARTITION__ substitution, with hive-style `=` prefixing emitted as `concat(lit, ...)`. urlEncode and the 1.2.0+ slashSeparatedDatePartitioning flag drop to Tier 3. - Tier 3 (everything else): anonymous functions.udf(...) -- registered per call, not on the SparkSession, so nothing leaks across writes. - Auto-keygen keeps the existing RDD path (needs TaskContext partitionId and a stateful row counter -- no clean Catalyst expression). The Tier 3 UDF goes through BuiltinKeyGenerator.getRecordKey(Row) / getPartitionPath(Row), i.e. the canonical Avro-aligned formatters, so all three formatter flags (hive-style, URL encode, slash-separated dates) are honoured for the keygens that fall through. Test coverage in TestHoodieDatasetBulkInsertHelper: - testKeyGenParityAgainstAvroGroundTruth (11 parameterised cases) -- one per tier-relevant keygen + formatter-flag combo, compares the helper's record-key / partition-path output to BuiltinKeyGenerator's Avro path. - testFastPathCastsNonStringRecordKey -- Tier 1/2 cast a non-string record-key column (ts: long) to the expected string form. - testFastPathAvoidsUdf -- Tier 1/2 analyzed plans contain no ScalaUDF (i.e. they actually benefit from Catalyst codegen). - testTier2EmptyPartitionValueSubstitutedWithHiveDefault -- empty partition values resolve to __HIVE_DEFAULT_PARTITION__ under both default and hive-style flags. - testUdfPathRespectsDriverSessionTimezone -- the Tier 3 UDF path picks up the driver's spark.sql.session.timeZone (guards against executor JVM default leakage on TimestampBasedKeyGenerator). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../hudi/HoodieDatasetBulkInsertHelper.scala | 137 +++++++-- .../TestHoodieDatasetBulkInsertHelper.java | 276 ++++++++++++++++++ 2 files changed, 384 insertions(+), 29 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index 6b5d78b391493..33f8b9b1e28cd 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -26,13 +26,14 @@ import org.apache.hudi.common.engine.TaskContextSupplier import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.table.HoodieTableConfig -import org.apache.hudi.common.util.{OrderingValues, ReflectionUtils} +import org.apache.hudi.common.util.{OrderingValues, PartitionPathEncodeUtils, ReflectionUtils} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.data.HoodieJavaRDD import org.apache.hudi.exception.HoodieException import org.apache.hudi.index.{HoodieIndex, SparkHoodieIndexFactory} import org.apache.hudi.index.HoodieIndex.BucketIndexEngineType -import org.apache.hudi.keygen.{AutoRecordGenWrapperKeyGenerator, BuiltinKeyGenerator, KeyGenUtils} +import org.apache.hudi.keygen.{AutoRecordGenWrapperKeyGenerator, BuiltinKeyGenerator, KeyGenUtils, NonpartitionedKeyGenerator, SimpleKeyGenerator} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable} import org.apache.hudi.table.action.commit.{BucketBulkInsertDataInternalWriterHelper, BulkInsertDataInternalWriterHelper, ConsistentBucketBulkInsertDataInternalWriterHelper, ParallelismHelper} @@ -42,6 +43,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, functions} import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath, composeNestedFieldPath, getNestedInternalRowValue} +import org.apache.spark.sql.api.java.UDF1 import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} import org.apache.spark.sql.catalyst.plans.logical.Project @@ -96,7 +98,8 @@ object HoodieDatasetBulkInsertHelper if (autoGenerateRecordKeys) { // Auto-keygen needs per-task partitionId from TaskContext and is stateful (rowId++), - // so it can't live in a driver-side UDF closure. Keep the RDD path for this case only. + // so it can't be expressed as a driver-side UDF closure. Keep the RDD path here. + // TODO: a custom Catalyst Expression could let this path avoid the toRdd round-trip too. val prependedRdd: RDD[InternalRow] = { injectSQLConf(df.queryExecution.toRdd.mapPartitions { iter => val typedProps = TypedProperties.copy(config.getProps) @@ -128,38 +131,36 @@ object HoodieDatasetBulkInsertHelper sparkAdapter.getUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, updatedSchema) } else { - // UDF-based key-gen path. Avoids the toRdd.mapPartitions round-trip on the hot path - // by registering record-key / partition-path UDFs against the DataFrame directly. + // Non auto-keygen path. Compute meta columns via Catalyst expressions where possible: + // - Tier 1 (Nonpartitioned, single record-key field): col(rk).cast(String) + lit("") + // - Tier 2 (Simple, default partition formatter flags): col(rk).cast(String) + col(pp).cast(String) + // - Tier 3 (everything else): anonymous UDF invoking BuiltinKeyGenerator on Row + // + // Tier 1/2 are pure column projections, so Catalyst codegens them and we avoid the + // toRdd.mapPartitions round-trip. Tier 3 keeps a UDF over the DataFrame so the projection + // remains in Catalyst; the UDF is anonymous (not registered on the SparkSession) so + // nothing leaks across writes. + // + // Tier 1/2 do not reproduce the canonical keygen's null/empty record-key validation: + // a null record-key value passes through as SQL NULL instead of throwing HoodieKeyException. + // This matches the behaviour of the pre-existing fast paths in this helper prior to the + // RDD-based rewrite. Tier 3 retains the strict canonical behaviour. val typedProps = TypedProperties.copy(config.getProps) val keyGenerator = ReflectionUtils .loadClass(HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName), typedProps) .asInstanceOf[BuiltinKeyGenerator] - val tableName = config.getString(HoodieWriteConfig.TBL_NAME.key) - val recordKeyFn = s"hudi_recordkey_gen_${tableName}_$instantTime" - val partitionPathFn = s"hudi_partition_gen_${tableName}_$instantTime" - - val spark = df.sparkSession - spark.udf.register(recordKeyFn, (r: Row) => keyGenerator.getRecordKey(r)) - spark.udf.register(partitionPathFn, (r: Row) => keyGenerator.getPartitionPath(r)) - - val originalCols = df.schema.fieldNames.map(functions.col) - val rowStruct = functions.struct(originalCols: _*) + val (recordKeyCol, partitionPathCol) = + buildKeygenColumns(df, keyGeneratorClassName, typedProps, keyGenerator) val withMetaCols = df - .withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, - functions.callUDF(recordKeyFn, rowStruct)) - .withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, - functions.callUDF(partitionPathFn, rowStruct)) - .withColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD, - functions.lit("").cast(StringType)) - .withColumn(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, - functions.lit("").cast(StringType)) - .withColumn(HoodieRecord.FILENAME_METADATA_FIELD, - functions.lit("").cast(StringType)) - - val orderedCols = updatedSchema.fieldNames.map(functions.col) - val orderedDF = withMetaCols.select(orderedCols: _*) + .withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKeyCol) + .withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionPathCol) + .withColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD, functions.lit("").cast(StringType)) + .withColumn(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, functions.lit("").cast(StringType)) + .withColumn(HoodieRecord.FILENAME_METADATA_FIELD, functions.lit("").cast(StringType)) + + val orderedDF = withMetaCols.select(updatedSchema.fieldNames.map(functions.col): _*) if (config.shouldCombineBeforeInsert) { val dedupedRdd = dedupeRows( @@ -168,7 +169,7 @@ object HoodieDatasetBulkInsertHelper tableConfig.getOrderingFields.asScala.toList, SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism) - sparkAdapter.getUnsafeUtils.createDataFrameFromRDD(spark, dedupedRdd, updatedSchema) + sparkAdapter.getUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, updatedSchema) } else { orderedDF } @@ -187,6 +188,84 @@ object HoodieDatasetBulkInsertHelper partitioner.repartitionRecords(updatedDF, targetParallelism) } + /** + * Builds the record-key / partition-path projection columns for the non auto-keygen path, + * dispatching to a fast-path tier when the keygen is amenable to a pure Catalyst projection. + * + * Tier 1: NonpartitionedKeyGenerator with a single record-key field. + * Tier 2: SimpleKeyGenerator with a single record-key + single partition-path field, when the + * partition formatter flags are reproducible as Catalyst expressions. Currently: + * default flags, or hive-style partitioning. URL-encoded and slash-separated date + * partitioning fall through to Tier 3 -- the URL escape table doesn't have an efficient + * pure-Catalyst equivalent, and slash-separated dates were added in 1.2.0 and exercise + * a separate formatter branch we'd rather not encode twice. + * Tier 3: Anonymous UDF that calls into the supplied [[BuiltinKeyGenerator]]. + */ + private def buildKeygenColumns(df: DataFrame, + keyGeneratorClassName: String, + typedProps: TypedProperties, + keyGenerator: BuiltinKeyGenerator): (org.apache.spark.sql.Column, org.apache.spark.sql.Column) = { + val recordKeyFields = keyGenerator.getRecordKeyFieldNames + val partitionPathFields = keyGenerator.getPartitionPathFields + + val hiveStyle = typedProps.getBoolean(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key, + KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.defaultValue.toBoolean) + val urlEncode = typedProps.getBoolean(KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key, + KeyGeneratorOptions.URL_ENCODE_PARTITIONING.defaultValue.toBoolean) + val slashSep = typedProps.getBoolean(KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.key, + KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.defaultValue.toBoolean) + + val tier1 = keyGeneratorClassName == classOf[NonpartitionedKeyGenerator].getName && + recordKeyFields.size == 1 + + val tier2 = keyGeneratorClassName == classOf[SimpleKeyGenerator].getName && + recordKeyFields.size == 1 && partitionPathFields.size == 1 && + !urlEncode && !slashSep + + if (tier1) { + (functions.col(recordKeyFields.get(0)).cast(StringType), + functions.lit("").cast(StringType)) + } else if (tier2) { + val recordKeyCol = functions.col(recordKeyFields.get(0)).cast(StringType) + val partitionPathCol = buildSimpleKeygenPartitionPathColumn(partitionPathFields.get(0), hiveStyle) + (recordKeyCol, partitionPathCol) + } else { + val recordKeyUdf = functions.udf( + new UDF1[Row, String] { override def call(row: Row): String = keyGenerator.getRecordKey(row) }, + StringType) + val partitionPathUdf = functions.udf( + new UDF1[Row, String] { override def call(row: Row): String = keyGenerator.getPartitionPath(row) }, + StringType) + val rowStruct = functions.struct(df.schema.fieldNames.map(functions.col): _*) + (recordKeyUdf(rowStruct), partitionPathUdf(rowStruct)) + } + } + + /** + * Tier-2 SimpleKeyGenerator partition-path expression for a single field with default URL/slash + * formatter flags, mirroring [[PartitionPathFormatterBase#combine]]: + * + *
    + *
  • !hiveStyle: handleEmpty(value) -- null/"" -> __HIVE_DEFAULT_PARTITION__
  • + *
  • hiveStyle: "=" + handleEmpty(value)
  • + *
+ */ + private def buildSimpleKeygenPartitionPathColumn(partitionField: String, + hiveStyle: Boolean): org.apache.spark.sql.Column = { + val rawValue = functions.col(partitionField).cast(StringType) + val emptyHandled = functions.coalesce( + functions.when(functions.length(rawValue) === 0, + functions.lit(PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH)) + .otherwise(rawValue), + functions.lit(PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH)) + + if (hiveStyle) { + functions.concat(functions.lit(partitionField + "="), emptyHandled) + } else { + emptyHandled + } + } + /** * Perform bulk insert for [[Dataset]], will not change timeline/index, return * information about write files. diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java index 946fb61c3b3a8..d80591e19126b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java @@ -17,26 +17,33 @@ package org.apache.hudi.functional; +import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.HoodieDatasetBulkInsertHelper; import org.apache.hudi.HoodieSchemaConversionUtils; import org.apache.hudi.SparkAdapterSupport$; +import org.apache.hudi.common.config.TimestampKeyGeneratorConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.bulkinsert.NonSortPartitionerWithRows; import org.apache.hudi.io.util.FileIOUtils; +import org.apache.hudi.keygen.BuiltinKeyGenerator; import org.apache.hudi.keygen.ComplexKeyGenerator; +import org.apache.hudi.keygen.CustomKeyGenerator; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.keygen.TimestampBasedKeyGenerator; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.testutils.DataSourceTestUtils; import org.apache.hudi.testutils.HoodieSparkClientTestBase; import lombok.Getter; +import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.scheduler.SparkListener; @@ -44,6 +51,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Tag; @@ -382,6 +390,274 @@ public void testBulkInsertParallelismParam() { sqlContext.sparkContext().removeSparkListener(stageCheckBulkParallelismListener); } + private static Stream provideKeyGenParityArgs() { + // Covers every keygen + partition-formatter combo the dispatcher in + // HoodieDatasetBulkInsertHelper#buildKeygenColumns can land on. SimpleKeyGenerator cases + // exercise hive-style / url-encode / slash-separated date flags. Tier 2 covers only the + // {default, hive-style} subset for SimpleKeyGen; url-encode and slash-separated push to Tier 3. + return Stream.of( + // Tier 1 + Arguments.of("nonpartitioned-tier1", NonpartitionedKeyGenerator.class.getName(), "_row_key", "", false, false, false), + // Tier 2 + Arguments.of("simple-default-tier2", SimpleKeyGenerator.class.getName(), "_row_key", "partition", false, false, false), + Arguments.of("simple-hive-tier2", SimpleKeyGenerator.class.getName(), "_row_key", "partition", true, false, false), + // Tier 3 fallbacks: Simple under url-encode / slash-sep, plus every other keygen class. + Arguments.of("simple-slash-tier3", SimpleKeyGenerator.class.getName(), "_row_key", "partition", false, true, false), + Arguments.of("simple-hive-slash-tier3", SimpleKeyGenerator.class.getName(), "_row_key", "partition", true, true, false), + Arguments.of("simple-url-tier3", SimpleKeyGenerator.class.getName(), "_row_key", "partition", false, false, true), + Arguments.of("simple-hive-url-tier3", SimpleKeyGenerator.class.getName(), "_row_key", "partition", true, false, true), + Arguments.of("complex-single-tier3", ComplexKeyGenerator.class.getName(), "_row_key", "partition", false, false, false), + Arguments.of("complex-multi-tier3", ComplexKeyGenerator.class.getName(), "_row_key,ts", "partition,_hoodie_is_deleted", false, false, false), + Arguments.of("timestamp-based-tier3", TimestampBasedKeyGenerator.class.getName(), "_row_key", "ts", false, false, false), + Arguments.of("custom-tier3", CustomKeyGenerator.class.getName(), "_row_key", "partition:SIMPLE", false, false, false)); + } + + /** + * Asserts that record-key / partition-path values produced by {@link HoodieDatasetBulkInsertHelper} + * match those produced by the canonical Avro path ({@link BuiltinKeyGenerator#getRecordKey(GenericRecord)}). + * + *

The Avro path is the ground truth shared by read- and write-side keygen invocations, so parity + * against it (rather than RDD-vs-UDF parity) is what actually matters for correctness. + */ + @ParameterizedTest(name = "{0}") + @MethodSource("provideKeyGenParityArgs") + public void testKeyGenParityAgainstAvroGroundTruth(String label, + String keyGenClass, + String recordKeyFields, + String partitionPathFields, + boolean hiveStylePartitioning, + boolean slashSepPartitioning, + boolean urlEncodePartitioning) { + Map props = new HashMap<>(); + props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), keyGenClass); + props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKeyFields); + props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), partitionPathFields); + props.put(HoodieWriteConfig.TBL_NAME.key(), label + "_parity_table"); + props.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), String.valueOf(hiveStylePartitioning)); + props.put(KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.key(), String.valueOf(slashSepPartitioning)); + props.put(KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key(), String.valueOf(urlEncodePartitioning)); + if (keyGenClass.equals(TimestampBasedKeyGenerator.class.getName())) { + props.put(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key(), "EPOCHMILLISECONDS"); + props.put(TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_DATE_FORMAT.key(), "yyyy-MM-dd"); + props.put(TimestampKeyGeneratorConfig.TIMESTAMP_TIMEZONE_FORMAT.key(), "UTC"); + } + HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(props).combineInput(false, false).build(); + + List rows = DataSourceTestUtils.generateRandomRows(20); + Dataset dataset = sqlContext.createDataFrame(rows, structType); + Dataset result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, + new HoodieTableConfig(), new NonSortPartitionerWithRows(), "000000001"); + + int recordKeyIdx = result.schema().fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD); + int partitionPathIdx = result.schema().fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD); + int rowKeyIdx = result.schema().fieldIndex("_row_key"); + + Map actual = new HashMap<>(); + for (Row r : result.collectAsList()) { + actual.put(r.getString(rowKeyIdx), + new String[] {r.getString(recordKeyIdx), r.getString(partitionPathIdx)}); + } + + TypedProperties keyGenProps = new TypedProperties(); + keyGenProps.putAll(props); + BuiltinKeyGenerator groundTruthKeyGen = + (BuiltinKeyGenerator) org.apache.hudi.common.util.ReflectionUtils.loadClass(keyGenClass, keyGenProps); + scala.Function1 toAvro = AvroConversionUtils.createConverterToAvro( + structType, "trip", "example.schema"); + + assertEquals(rows.size(), actual.size(), "Row count mismatch — possible duplicate record keys"); + for (Row inputRow : rows) { + GenericRecord avro = toAvro.apply(inputRow); + String expectedRecordKey = groundTruthKeyGen.getRecordKey(avro); + String expectedPartitionPath = groundTruthKeyGen.getPartitionPath(avro); + String[] observed = actual.get(inputRow.getString(0)); + assertEquals(expectedRecordKey, observed[0], + "record key mismatch for keygen=" + label + " row=" + inputRow.getString(0)); + assertEquals(expectedPartitionPath, observed[1], + "partition path mismatch for keygen=" + label + " row=" + inputRow.getString(0)); + } + } + + /** + * Tier 1 / Tier 2 fast paths use {@code col(field).cast(String)} for the record key. Confirms that + * a non-string record-key column (e.g. {@code ts: long}) is materialised as the string form of the + * underlying value, matching the canonical keygen output. + */ + @Test + public void testFastPathCastsNonStringRecordKey() { + // Tier 1: Nonpartitioned, single record-key field. Using `ts` (long) as the record key. + Map nonpartitionedProps = new HashMap<>(); + nonpartitionedProps.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), NonpartitionedKeyGenerator.class.getName()); + nonpartitionedProps.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "ts"); + nonpartitionedProps.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), ""); + nonpartitionedProps.put(HoodieWriteConfig.TBL_NAME.key(), "nonpartitioned_cast_tbl"); + assertFastPathRecordKeyCast(nonpartitionedProps, /* expectedPartitionPath */ ""); + + // Tier 2: Simple, single record-key + partition-path. Using `ts` (long) as the record key. + Map simpleProps = new HashMap<>(); + simpleProps.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), SimpleKeyGenerator.class.getName()); + simpleProps.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "ts"); + simpleProps.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition"); + simpleProps.put(HoodieWriteConfig.TBL_NAME.key(), "simple_cast_tbl"); + assertFastPathRecordKeyCast(simpleProps, /* expectedPartitionPath */ null); + } + + private void assertFastPathRecordKeyCast(Map props, String expectedPartitionPath) { + HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(props).combineInput(false, false).build(); + List rows = DataSourceTestUtils.generateRandomRows(10); + Dataset dataset = sqlContext.createDataFrame(rows, structType); + Dataset result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, + new HoodieTableConfig(), new NonSortPartitionerWithRows(), "000000001"); + + int recordKeyIdx = result.schema().fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD); + int partitionPathIdx = result.schema().fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD); + int tsIdx = result.schema().fieldIndex("ts"); + int partitionIdx = result.schema().fieldIndex("partition"); + + for (Row r : result.collectAsList()) { + Object tsVal = r.get(tsIdx); + assertEquals(String.valueOf(tsVal), r.getString(recordKeyIdx), + "record key should be string form of ts column"); + String expected = expectedPartitionPath != null ? expectedPartitionPath : String.valueOf(r.get(partitionIdx)); + assertEquals(expected, r.getString(partitionPathIdx), + "partition path mismatch"); + } + } + + /** + * Sanity check that {@link HoodieDatasetBulkInsertHelper} composes the Catalyst plan from + * pure column projections for Tier 1 and Tier 2 paths — no UDFs, no toRdd round-trip — so the + * fast paths actually benefit from Catalyst codegen. We verify this by inspecting the resulting + * Dataset's logical plan for absence of {@code ScalaUDF}/{@code UserDefinedFunction} nodes. + */ + @Test + public void testFastPathAvoidsUdf() { + // Tier 1 + Map tier1Props = new HashMap<>(); + tier1Props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), NonpartitionedKeyGenerator.class.getName()); + tier1Props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); + tier1Props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), ""); + tier1Props.put(HoodieWriteConfig.TBL_NAME.key(), "tier1_plan_tbl"); + assertNoScalaUdfInPlan(tier1Props, "tier1"); + + // Tier 2 + Map tier2Props = new HashMap<>(); + tier2Props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), SimpleKeyGenerator.class.getName()); + tier2Props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); + tier2Props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition"); + tier2Props.put(HoodieWriteConfig.TBL_NAME.key(), "tier2_plan_tbl"); + assertNoScalaUdfInPlan(tier2Props, "tier2"); + } + + private void assertNoScalaUdfInPlan(Map props, String label) { + HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(props).combineInput(false, false).build(); + List rows = DataSourceTestUtils.generateRandomRows(5); + Dataset dataset = sqlContext.createDataFrame(rows, structType); + Dataset result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, + new HoodieTableConfig(), new NonSortPartitionerWithRows(), "000000001"); + String plan = result.queryExecution().analyzed().toString(); + assertTrue(!plan.toLowerCase().contains("scalaudf"), + label + " path should not contain ScalaUDF in its logical plan; plan was:\n" + plan); + } + + /** + * The Tier 2 partition-path projection must substitute {@code __HIVE_DEFAULT_PARTITION__} for + * empty/null partition values, matching {@link org.apache.hudi.keygen.StringPartitionPathFormatter#handleEmpty}. + * Runs the case under both default and hive-style formatter flags, and against an injected + * row whose {@code partition} value is the empty string. + * + *

The slash-separated formatter branch deliberately skips {@code handleEmpty} (per + * {@code PartitionPathFormatterBase#combine}), so we don't exercise that combo here -- the + * cross-flag parity test already covers it against the canonical Avro path. + */ + @Test + public void testTier2EmptyPartitionValueSubstitutedWithHiveDefault() { + Row emptyPartitionRow = RowFactory.create("rk-empty", "", 1700000000000L, false); + + Map defaultProps = new HashMap<>(); + defaultProps.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), SimpleKeyGenerator.class.getName()); + defaultProps.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); + defaultProps.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition"); + defaultProps.put(HoodieWriteConfig.TBL_NAME.key(), "empty_partition_default_tbl"); + assertEmptyPartitionSubstituted(defaultProps, emptyPartitionRow, + PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH); + + Map hiveProps = new HashMap<>(defaultProps); + hiveProps.put(HoodieWriteConfig.TBL_NAME.key(), "empty_partition_hive_tbl"); + hiveProps.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), "true"); + assertEmptyPartitionSubstituted(hiveProps, emptyPartitionRow, + "partition=" + PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH); + } + + private void assertEmptyPartitionSubstituted(Map props, + Row inputRow, + String expectedPartitionPath) { + HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(props).combineInput(false, false).build(); + Dataset dataset = sqlContext.createDataFrame(java.util.Collections.singletonList(inputRow), structType); + Dataset result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, + new HoodieTableConfig(), new NonSortPartitionerWithRows(), "000000001"); + + Row out = result.collectAsList().get(0); + int partitionPathIdx = result.schema().fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD); + assertEquals(expectedPartitionPath, out.getString(partitionPathIdx), + "empty partition value should be substituted with " + PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH); + } + + /** + * Guards against a UDF-path-only failure mode: the RDD path threaded the driver's {@link SQLConf} + * onto executor tasks via {@code injectSQLConf}; the UDF path does not. If any keygen reads + * {@code spark.sql.session.timeZone} during execution (e.g. when converting {@link java.sql.Timestamp} + * values through Catalyst), executor-local conf would silently override the driver's choice. + * + *

This test forces {@code spark.sql.session.timeZone} to a non-default value, runs the helper, and + * asserts the resulting keys match the Avro ground truth computed under the same timezone. A divergence + * here would indicate the UDF path is sensitive to executor JVM defaults. + */ + @Test + public void testUdfPathRespectsDriverSessionTimezone() { + String originalTz = sqlContext.getConf("spark.sql.session.timeZone", "UTC"); + sqlContext.setConf("spark.sql.session.timeZone", "America/Los_Angeles"); + try { + Map props = new HashMap<>(); + props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), TimestampBasedKeyGenerator.class.getName()); + props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); + props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "ts"); + props.put(HoodieWriteConfig.TBL_NAME.key(), "tz_parity_table"); + props.put(TimestampKeyGeneratorConfig.TIMESTAMP_TYPE_FIELD.key(), "EPOCHMILLISECONDS"); + props.put(TimestampKeyGeneratorConfig.TIMESTAMP_OUTPUT_DATE_FORMAT.key(), "yyyy-MM-dd-HH"); + props.put(TimestampKeyGeneratorConfig.TIMESTAMP_TIMEZONE_FORMAT.key(), "America/Los_Angeles"); + + HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(props).combineInput(false, false).build(); + List rows = DataSourceTestUtils.generateRandomRows(10); + Dataset dataset = sqlContext.createDataFrame(rows, structType); + Dataset result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, + new HoodieTableConfig(), new NonSortPartitionerWithRows(), "000000001"); + + int partitionPathIdx = result.schema().fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD); + int rowKeyIdx = result.schema().fieldIndex("_row_key"); + Map actualPartition = new HashMap<>(); + for (Row r : result.collectAsList()) { + actualPartition.put(r.getString(rowKeyIdx), r.getString(partitionPathIdx)); + } + + TypedProperties keyGenProps = new TypedProperties(); + keyGenProps.putAll(props); + BuiltinKeyGenerator groundTruth = + (BuiltinKeyGenerator) org.apache.hudi.common.util.ReflectionUtils.loadClass( + TimestampBasedKeyGenerator.class.getName(), keyGenProps); + scala.Function1 toAvro = AvroConversionUtils.createConverterToAvro( + structType, "trip", "example.schema"); + + for (Row inputRow : rows) { + GenericRecord avro = toAvro.apply(inputRow); + assertEquals(groundTruth.getPartitionPath(avro), actualPartition.get(inputRow.getString(0)), + "partition path diverged for row " + inputRow.getString(0) + " under LA timezone"); + } + } finally { + sqlContext.setConf("spark.sql.session.timeZone", originalTz); + } + } + class StageCheckBulkParallelismListener extends SparkListener { private boolean checkFlag = false;