diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index 0ee2abd847a51..33f8b9b1e28cd 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -26,13 +26,14 @@ import org.apache.hudi.common.engine.TaskContextSupplier import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.table.HoodieTableConfig -import org.apache.hudi.common.util.{OrderingValues, ReflectionUtils} +import org.apache.hudi.common.util.{OrderingValues, PartitionPathEncodeUtils, ReflectionUtils} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.data.HoodieJavaRDD import org.apache.hudi.exception.HoodieException import org.apache.hudi.index.{HoodieIndex, SparkHoodieIndexFactory} import org.apache.hudi.index.HoodieIndex.BucketIndexEngineType -import org.apache.hudi.keygen.{AutoRecordGenWrapperKeyGenerator, BuiltinKeyGenerator, KeyGenUtils} +import org.apache.hudi.keygen.{AutoRecordGenWrapperKeyGenerator, BuiltinKeyGenerator, KeyGenUtils, NonpartitionedKeyGenerator, SimpleKeyGenerator} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable} import org.apache.hudi.table.action.commit.{BucketBulkInsertDataInternalWriterHelper, BulkInsertDataInternalWriterHelper, ConsistentBucketBulkInsertDataInternalWriterHelper, ParallelismHelper} @@ -40,8 +41,9 @@ import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.{DataFrame, Dataset, Row, functions} import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath, composeNestedFieldPath, getNestedInternalRowValue} +import org.apache.spark.sql.api.java.UDF1 import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} import org.apache.spark.sql.catalyst.plans.logical.Project @@ -94,44 +96,84 @@ object HoodieDatasetBulkInsertHelper val keyGeneratorClassName = config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME, "Key-generator class name is required") - val prependedRdd: RDD[InternalRow] = { - injectSQLConf(df.queryExecution.toRdd.mapPartitions { iter => - val typedProps = TypedProperties.copy(config.getProps) - if (autoGenerateRecordKeys) { + if (autoGenerateRecordKeys) { + // Auto-keygen needs per-task partitionId from TaskContext and is stateful (rowId++), + // so it can't be expressed as a driver-side UDF closure. Keep the RDD path here. + // TODO: a custom Catalyst Expression could let this path avoid the toRdd round-trip too. + val prependedRdd: RDD[InternalRow] = { + injectSQLConf(df.queryExecution.toRdd.mapPartitions { iter => + val typedProps = TypedProperties.copy(config.getProps) typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, String.valueOf(TaskContext.getPartitionId())) typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime) - } - val sparkKeyGenerator = - ReflectionUtils.loadClass(HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName), typedProps) - .asInstanceOf[BuiltinKeyGenerator] - val keyGenerator: BuiltinKeyGenerator = if (autoGenerateRecordKeys) { - new AutoRecordGenWrapperKeyGenerator(typedProps, sparkKeyGenerator).asInstanceOf[BuiltinKeyGenerator] - } else { - sparkKeyGenerator - } - - iter.map { row => - // auto generate record keys if needed - val metaFields = new Array[UTF8String](5) - metaFields(2) = keyGenerator.getRecordKey(row, schema) - metaFields(3) = keyGenerator.getPartitionPath(row, schema) - metaFields(0) = UTF8String.EMPTY_UTF8 - metaFields(1) = UTF8String.EMPTY_UTF8 - metaFields(4) = UTF8String.EMPTY_UTF8 - - // TODO use mutable row, avoid re-allocating - sparkAdapter.createInternalRow(metaFields, row, false) - } - }, SQLConf.get) - } + val sparkKeyGenerator = + ReflectionUtils.loadClass(HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName), typedProps) + .asInstanceOf[BuiltinKeyGenerator] + val keyGenerator: BuiltinKeyGenerator = + new AutoRecordGenWrapperKeyGenerator(typedProps, sparkKeyGenerator).asInstanceOf[BuiltinKeyGenerator] + + iter.map { row => + val metaFields = new Array[UTF8String](5) + metaFields(2) = keyGenerator.getRecordKey(row, schema) + metaFields(3) = keyGenerator.getPartitionPath(row, schema) + metaFields(0) = UTF8String.EMPTY_UTF8 + metaFields(1) = UTF8String.EMPTY_UTF8 + metaFields(4) = UTF8String.EMPTY_UTF8 + sparkAdapter.createInternalRow(metaFields, row, false) + } + }, SQLConf.get) + } + + val dedupedRdd = if (config.shouldCombineBeforeInsert) { + dedupeRows(prependedRdd, updatedSchema, tableConfig.getOrderingFields.asScala.toList, SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism) + } else { + prependedRdd + } - val dedupedRdd = if (config.shouldCombineBeforeInsert) { - dedupeRows(prependedRdd, updatedSchema, tableConfig.getOrderingFields.asScala.toList, SparkHoodieIndexFactory.isGlobalIndex(config), targetParallelism) + sparkAdapter.getUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, updatedSchema) } else { - prependedRdd - } + // Non auto-keygen path. Compute meta columns via Catalyst expressions where possible: + // - Tier 1 (Nonpartitioned, single record-key field): col(rk).cast(String) + lit("") + // - Tier 2 (Simple, default partition formatter flags): col(rk).cast(String) + col(pp).cast(String) + // - Tier 3 (everything else): anonymous UDF invoking BuiltinKeyGenerator on Row + // + // Tier 1/2 are pure column projections, so Catalyst codegens them and we avoid the + // toRdd.mapPartitions round-trip. Tier 3 keeps a UDF over the DataFrame so the projection + // remains in Catalyst; the UDF is anonymous (not registered on the SparkSession) so + // nothing leaks across writes. + // + // Tier 1/2 do not reproduce the canonical keygen's null/empty record-key validation: + // a null record-key value passes through as SQL NULL instead of throwing HoodieKeyException. + // This matches the behaviour of the pre-existing fast paths in this helper prior to the + // RDD-based rewrite. Tier 3 retains the strict canonical behaviour. + val typedProps = TypedProperties.copy(config.getProps) + val keyGenerator = ReflectionUtils + .loadClass(HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName), typedProps) + .asInstanceOf[BuiltinKeyGenerator] + + val (recordKeyCol, partitionPathCol) = + buildKeygenColumns(df, keyGeneratorClassName, typedProps, keyGenerator) - sparkAdapter.getUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, updatedSchema) + val withMetaCols = df + .withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKeyCol) + .withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionPathCol) + .withColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD, functions.lit("").cast(StringType)) + .withColumn(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, functions.lit("").cast(StringType)) + .withColumn(HoodieRecord.FILENAME_METADATA_FIELD, functions.lit("").cast(StringType)) + + val orderedDF = withMetaCols.select(updatedSchema.fieldNames.map(functions.col): _*) + + if (config.shouldCombineBeforeInsert) { + val dedupedRdd = dedupeRows( + injectSQLConf(orderedDF.queryExecution.toRdd, SQLConf.get), + updatedSchema, + tableConfig.getOrderingFields.asScala.toList, + SparkHoodieIndexFactory.isGlobalIndex(config), + targetParallelism) + sparkAdapter.getUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, updatedSchema) + } else { + orderedDF + } + } } else { // NOTE: In cases when we're not populating meta-fields we actually don't // need access to the [[InternalRow]] and therefore can avoid the need @@ -146,6 +188,84 @@ object HoodieDatasetBulkInsertHelper partitioner.repartitionRecords(updatedDF, targetParallelism) } + /** + * Builds the record-key / partition-path projection columns for the non auto-keygen path, + * dispatching to a fast-path tier when the keygen is amenable to a pure Catalyst projection. + * + * Tier 1: NonpartitionedKeyGenerator with a single record-key field. + * Tier 2: SimpleKeyGenerator with a single record-key + single partition-path field, when the + * partition formatter flags are reproducible as Catalyst expressions. Currently: + * default flags, or hive-style partitioning. URL-encoded and slash-separated date + * partitioning fall through to Tier 3 -- the URL escape table doesn't have an efficient + * pure-Catalyst equivalent, and slash-separated dates were added in 1.2.0 and exercise + * a separate formatter branch we'd rather not encode twice. + * Tier 3: Anonymous UDF that calls into the supplied [[BuiltinKeyGenerator]]. + */ + private def buildKeygenColumns(df: DataFrame, + keyGeneratorClassName: String, + typedProps: TypedProperties, + keyGenerator: BuiltinKeyGenerator): (org.apache.spark.sql.Column, org.apache.spark.sql.Column) = { + val recordKeyFields = keyGenerator.getRecordKeyFieldNames + val partitionPathFields = keyGenerator.getPartitionPathFields + + val hiveStyle = typedProps.getBoolean(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key, + KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.defaultValue.toBoolean) + val urlEncode = typedProps.getBoolean(KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key, + KeyGeneratorOptions.URL_ENCODE_PARTITIONING.defaultValue.toBoolean) + val slashSep = typedProps.getBoolean(KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.key, + KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.defaultValue.toBoolean) + + val tier1 = keyGeneratorClassName == classOf[NonpartitionedKeyGenerator].getName && + recordKeyFields.size == 1 + + val tier2 = keyGeneratorClassName == classOf[SimpleKeyGenerator].getName && + recordKeyFields.size == 1 && partitionPathFields.size == 1 && + !urlEncode && !slashSep + + if (tier1) { + (functions.col(recordKeyFields.get(0)).cast(StringType), + functions.lit("").cast(StringType)) + } else if (tier2) { + val recordKeyCol = functions.col(recordKeyFields.get(0)).cast(StringType) + val partitionPathCol = buildSimpleKeygenPartitionPathColumn(partitionPathFields.get(0), hiveStyle) + (recordKeyCol, partitionPathCol) + } else { + val recordKeyUdf = functions.udf( + new UDF1[Row, String] { override def call(row: Row): String = keyGenerator.getRecordKey(row) }, + StringType) + val partitionPathUdf = functions.udf( + new UDF1[Row, String] { override def call(row: Row): String = keyGenerator.getPartitionPath(row) }, + StringType) + val rowStruct = functions.struct(df.schema.fieldNames.map(functions.col): _*) + (recordKeyUdf(rowStruct), partitionPathUdf(rowStruct)) + } + } + + /** + * Tier-2 SimpleKeyGenerator partition-path expression for a single field with default URL/slash + * formatter flags, mirroring [[PartitionPathFormatterBase#combine]]: + * + *
The Avro path is the ground truth shared by read- and write-side keygen invocations, so parity
+ * against it (rather than RDD-vs-UDF parity) is what actually matters for correctness.
+ */
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("provideKeyGenParityArgs")
+ public void testKeyGenParityAgainstAvroGroundTruth(String label,
+ String keyGenClass,
+ String recordKeyFields,
+ String partitionPathFields,
+ boolean hiveStylePartitioning,
+ boolean slashSepPartitioning,
+ boolean urlEncodePartitioning) {
+ Map The slash-separated formatter branch deliberately skips {@code handleEmpty} (per
+ * {@code PartitionPathFormatterBase#combine}), so we don't exercise that combo here -- the
+ * cross-flag parity test already covers it against the canonical Avro path.
+ */
+ @Test
+ public void testTier2EmptyPartitionValueSubstitutedWithHiveDefault() {
+ Row emptyPartitionRow = RowFactory.create("rk-empty", "", 1700000000000L, false);
+
+ Map This test forces {@code spark.sql.session.timeZone} to a non-default value, runs the helper, and
+ * asserts the resulting keys match the Avro ground truth computed under the same timezone. A divergence
+ * here would indicate the UDF path is sensitive to executor JVM defaults.
+ */
+ @Test
+ public void testUdfPathRespectsDriverSessionTimezone() {
+ String originalTz = sqlContext.getConf("spark.sql.session.timeZone", "UTC");
+ sqlContext.setConf("spark.sql.session.timeZone", "America/Los_Angeles");
+ try {
+ Map