Problem
HoodieDatasetBulkInsertHelper.prepareForBulkInsert invokes the configured KeyGenerator for every row via df.queryExecution.toRdd.mapPartitions { iter => iter.map { row => keyGenerator.getRecordKey(row, schema); keyGenerator.getPartitionPath(row, schema); ... } }. This forces an RDD round-trip and per-row reflection-based keygen invocation on every record, even for the keygens (NonpartitionedKeyGenerator, SimpleKeyGenerator) where the record-key and partition-path values can be sourced directly from input columns.
Before the original row-writer PR (#5470) consolidated the path, these two keygens were handled with withColumn(col(field).cast(String)) projections — pure Catalyst, free codegen, no row materialization. Collapsing the dispatch onto an RDD mapPartitions regressed bulk-insert throughput for the common SimpleKeyGen and NonpartitionedKeyGen cases. Related: #18969 (same module, drop-partition-columns hot path).
Proposed fix
Restore tiered dispatch in prepareForBulkInsert:
- Tier 1 —
NonpartitionedKeyGenerator (single record-key field): emit col(rk).cast(String) + lit("") as Catalyst columns.
- Tier 2 —
SimpleKeyGenerator (single record-key + single partition-path, URL-encoding off, slash-separated dates off): emit col(rk).cast(String) and a partition-path expression mirroring PartitionPathFormatterBase#combine, including the handleEmpty -> __HIVE_DEFAULT_PARTITION__ substitution and hive-style field= prefixing.
- Tier 3 — everything else (multi-field keys, ComplexKeyGenerator, TimestampBased, Custom, Simple with URL-encode or slash-sep): anonymous
functions.udf over a struct of input columns calling the canonical BuiltinKeyGenerator.getRecordKey(Row) / getPartitionPath(Row) — i.e. the same Avro-aligned formatter the read side and Tier 3 callers already share. UDFs are not registered against the SparkSession (no leak across writes).
- Auto record key generation keeps the existing RDD path; it needs
TaskContext.partitionId and a stateful per-task counter, which can't be expressed cleanly as a driver-side closure.
The Tier 3 UDF goes through getRecordKey(Row) / getPartitionPath(Row), which use the String formatter (combinePartitionPath), so all three formatter flags (hive-style, URL encode, slash-separated dates) are honored for the keygens that fall through. The Tier 2 fast-path encodes only the default+hive-style flag combinations (URL encoding has no efficient pure-Catalyst equivalent, and the 1.2.0+ slash-separated branch in the formatter exercises a separate code path we'd rather not encode twice).
Test coverage
New tests added to TestHoodieDatasetBulkInsertHelper:
- Parity against canonical Avro keygen, parameterized across 11 cases — every supported keygen class plus the SimpleKeyGen flag combinations: default, hive-style only, slash-sep (Tier 3), hive+slash (Tier 3), URL-encode (Tier 3), hive+URL (Tier 3), ComplexKeyGen single/multi, TimestampBased, Custom.
- Non-string record key cast — verifies Tier 1/2 cast
ts: long correctly.
- Logical plan inspection — Tier 1/2 plans contain no
ScalaUDF (i.e. they actually benefit from Catalyst codegen, not silently fall to Tier 3).
- Empty partition value — confirms
__HIVE_DEFAULT_PARTITION__ substitution under both default and hive-style flags.
- Driver session timezone propagation — Tier 3 UDF respects
spark.sql.session.timeZone set on the driver (guards against executor JVM default leakage on TimestampBasedKeyGenerator).
Out of scope (followups)
PR
Patch will be linked here once opened.
Problem
HoodieDatasetBulkInsertHelper.prepareForBulkInsertinvokes the configuredKeyGeneratorfor every row viadf.queryExecution.toRdd.mapPartitions { iter => iter.map { row => keyGenerator.getRecordKey(row, schema); keyGenerator.getPartitionPath(row, schema); ... } }. This forces an RDD round-trip and per-row reflection-based keygen invocation on every record, even for the keygens (NonpartitionedKeyGenerator, SimpleKeyGenerator) where the record-key and partition-path values can be sourced directly from input columns.Before the original row-writer PR (#5470) consolidated the path, these two keygens were handled with
withColumn(col(field).cast(String))projections — pure Catalyst, free codegen, no row materialization. Collapsing the dispatch onto an RDDmapPartitionsregressed bulk-insert throughput for the common SimpleKeyGen and NonpartitionedKeyGen cases. Related: #18969 (same module, drop-partition-columns hot path).Proposed fix
Restore tiered dispatch in
prepareForBulkInsert:NonpartitionedKeyGenerator(single record-key field): emitcol(rk).cast(String)+lit("")as Catalyst columns.SimpleKeyGenerator(single record-key + single partition-path, URL-encoding off, slash-separated dates off): emitcol(rk).cast(String)and a partition-path expression mirroringPartitionPathFormatterBase#combine, including thehandleEmpty -> __HIVE_DEFAULT_PARTITION__substitution and hive-stylefield=prefixing.functions.udfover a struct of input columns calling the canonicalBuiltinKeyGenerator.getRecordKey(Row)/getPartitionPath(Row)— i.e. the same Avro-aligned formatter the read side and Tier 3 callers already share. UDFs are not registered against the SparkSession (no leak across writes).TaskContext.partitionIdand a stateful per-task counter, which can't be expressed cleanly as a driver-side closure.The Tier 3 UDF goes through
getRecordKey(Row)/getPartitionPath(Row), which use theStringformatter (combinePartitionPath), so all three formatter flags (hive-style, URL encode, slash-separated dates) are honored for the keygens that fall through. The Tier 2 fast-path encodes only the default+hive-style flag combinations (URL encoding has no efficient pure-Catalyst equivalent, and the 1.2.0+ slash-separated branch in the formatter exercises a separate code path we'd rather not encode twice).Test coverage
New tests added to
TestHoodieDatasetBulkInsertHelper:ts: longcorrectly.ScalaUDF(i.e. they actually benefit from Catalyst codegen, not silently fall to Tier 3).__HIVE_DEFAULT_PARTITION__substitution under both default and hive-style flags.spark.sql.session.timeZoneset on the driver (guards against executor JVM default leakage on TimestampBasedKeyGenerator).Out of scope (followups)
dropPartitionColumns = truepath insideBulkInsertDataInternalWriterHelper.writedoes the partition-column strip per-row in a Java loop with aHashSetlookup. Lifting that intoprepareForBulkInsertwould let us also avoid materialising columns that are about to be dropped — see perf: Row-writer bulk insert re-instantiates the key generator and boxes the full row per record when dropping partition columns #18969 for the adjacent discussion.toRddround-trip with a custom CatalystExpressionforpartitionId + monotonic rowId. Out of scope here.PR
Patch will be linked here once opened.