From 8332f66c7ab65beb62bf5f997c8d5468eb28146a Mon Sep 17 00:00:00 2001 From: voon Date: Fri, 5 Jun 2026 17:59:49 +0800 Subject: [PATCH 1/3] refactor(reader): push variant projection into log readers, drop buffer hook Remove the engine-neutral FileGroupRecordBuffer variant-projection composition (#18674's getLogBlockRecordProjection hook) so the merge buffer stays format-agnostic; each log reader now emits rows already aligned to the projected read schema (#18739). - Parquet log blocks: thread the variant-overlaid StructType into a new HoodieSparkParquetReader.getUnsafeRowIterator(HoodieSchema, StructType, filters) overload so SPARK_ROW_REQUESTED_SCHEMA carries VariantMetadata and parquet-mr decodes variants into the projected struct shape natively (mirrors the base-file path). Wired in SparkFileFormatInternalRowReaderContext.getFileRecordIterator. - Avro log blocks: new no-op HoodieReaderContext.projectLogBlockRecords hook, invoked from HoodieAvroDataBlock.deserializeRecords; Spark overrides it to apply the VariantGet rewrite (relocated from the deleted buffer hook). - Both paths gated by a single shouldProjectVariants predicate (variant projection present AND merger not PAYLOAD_BASED), preserving the buffer's custom-payload skip. - FileGroupRecordBuffer/PositionBased now call getSchemaTransformerWithEvolvedSchema directly; getProjectedTransformer and getLogBlockRecordProjection deleted. - Sub-task 4: documented why the sparkRequiredSchema overlay must stay (HoodieSchema can't carry VariantMetadata); kept Spark-side, no schema-model change. buildVariantProjector / isVariantProjectionStruct unchanged (caller moved). Addresses #18739. --- .../io/storage/HoodieSparkParquetReader.java | 15 +++- ...rkFileFormatInternalRowReaderContext.scala | 74 +++++++++++++------ .../common/engine/HoodieReaderContext.java | 17 +++-- .../table/log/block/HoodieAvroDataBlock.java | 5 +- .../read/buffer/FileGroupRecordBuffer.java | 26 +------ .../PositionBasedFileGroupRecordBuffer.java | 10 +-- 6 files changed, 89 insertions(+), 58 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java index dec8eecd0383e..d35d713a8ef1c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java @@ -144,7 +144,20 @@ public ClosableIterator getUnsafeRowIterator(HoodieSchema requestedSc */ public ClosableIterator getUnsafeRowIterator(HoodieSchema requestedSchema, List readFilters) throws IOException { HoodieSchema nonNullSchema = requestedSchema.getNonNullType(); - StructType structSchema = HoodieInternalRowUtils.getCachedSchema(nonNullSchema); + return getUnsafeRowIterator(nonNullSchema, HoodieInternalRowUtils.getCachedSchema(nonNullSchema), readFilters); + } + + /** + * Variant overload. {@code projectedRequestedStruct} is the requested Spark schema, which may carry + * a Spark 4.1 PushVariantIntoScan variant projection (per-field {@code VariantMetadata}) that + * {@link HoodieSchema} cannot represent. Using it as the requested schema makes parquet-mr decode + * variant columns into the projected struct shape natively (mirroring the base-file read path) + * rather than returning the full {@code VariantType}. {@code requestedSchema} is still used for + * vector-column detection (orthogonal to variants) and the timestamp-repair MessageType. + */ + public ClosableIterator getUnsafeRowIterator(HoodieSchema requestedSchema, StructType projectedRequestedStruct, List readFilters) throws IOException { + HoodieSchema nonNullSchema = requestedSchema.getNonNullType(); + StructType structSchema = projectedRequestedStruct; // Detect vector columns: ordinal → Vector schema Map vectorColumnInfo = HoodieVectorUtils.detectVectorColumns(nonNullSchema); diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala index 06579bd99d27a..08eea094781e0 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala @@ -24,13 +24,14 @@ import org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForB import org.apache.hudi.common.engine.HoodieReaderContext import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} +import org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaUtils} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME import org.apache.hudi.common.util.HoodieVectorUtils import org.apache.hudi.common.util.{Option => HOption} import org.apache.hudi.common.util.ValidationUtils.checkState -import org.apache.hudi.common.util.collection.{CachingIterator, ClosableIterator, Pair => HPair} +import org.apache.hudi.common.util.collection.{CachingIterator, ClosableIterator, CloseableMappingIterator, Pair => HPair} import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, HoodieSparkParquetReader, VectorConversionUtils} import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath} import org.apache.hudi.util.CloseableInternalRowIterator @@ -86,6 +87,14 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR // For each field of `target`, replace its dataType with the matching field's projected // variant struct from `source` (when present). Non-matching fields pass through. + // + // Why a parallel `sparkRequiredSchema` overlay exists at all (#18739 sub-task 4): a Spark 4.1 + // PushVariantIntoScan projection carries per-field `VariantMetadata` (extraction path / timezone / + // failOnError) that `HoodieSchema` has nowhere to store — `HoodieSparkSchemaConverters` collapses + // the projected struct back to a plain VARIANT and the reverse can't reconstruct the metadata. So + // the engine must keep the projected Spark schema alongside the HoodieSchema; it cannot be + // recovered from a HoodieSchema round-trip. Kept Spark-side on purpose so the engine-neutral schema + // model stays free of Spark-4.1 variant concepts. private def overlayVariantProjections(target: StructType, source: StructType): StructType = { StructType(target.fields.map { f => SparkFileFormatInternalRowReaderContext.findFieldByName(source, f.name).map(_.dataType) match { @@ -96,29 +105,45 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR }) } - // Aligns log-block records with the PushVariantIntoScan-projected variant shape before - // they reach the merger. Preserves merger metadata cols (_hoodie_record_key, - // _tmp_metadata_row_index) which the merger reads by ordinal — projecting down to the - // bare required schema would drop them and the merger would read garbage offsets. - override def getLogBlockRecordProjection( - dataBlockSchema: HoodieSchema): HOption[JFunction[InternalRow, InternalRow]] = { - val needsProjection = sparkRequiredSchema.exists(_.fields.exists(f => f.dataType match { + // True only when there is a Spark 4.1 PushVariantIntoScan projection to apply AND the table is + // not using a custom (payload-based) merger. Payload-based tables round-trip records through + // PayloadUpdateProcessor.convertToAvroRecord against a schema that still types variant fields as + // VariantType, so a row already rewritten into the projected struct shape would be mis-decoded. + // Single source of truth for both reader paths (parquet native projection + avro rewrite). + private def shouldProjectVariants: Boolean = { + val hasVariantProjection = sparkRequiredSchema.exists(_.fields.exists(_.dataType match { case st: StructType => sparkAdapter.isVariantProjectionStruct(st) case _ => false })) - if (!needsProjection) { - return HOption.empty[JFunction[InternalRow, InternalRow]]() + hasVariantProjection && { + val merger = getRecordMerger() + !(merger != null && merger.isPresent && merger.get.getMergingStrategy == PAYLOAD_BASED_MERGE_STRATEGY_UUID) + } + } + + // Aligns avro log-block records with the PushVariantIntoScan-projected variant shape before + // they reach the merger. Preserves merger metadata cols (_hoodie_record_key, + // _tmp_metadata_row_index) which the merger reads by ordinal — projecting down to the bare + // required schema would drop them and the merger would read garbage offsets. Parquet log blocks + // project natively in getFileRecordIterator, so only the avro log reader calls this hook. + override def projectLogBlockRecords( + recordIterator: ClosableIterator[InternalRow], + dataBlockSchema: HoodieSchema): ClosableIterator[InternalRow] = { + if (!shouldProjectVariants) { + return recordIterator } val req = sparkRequiredSchema.get val dataStruct = HoodieInternalRowUtils.getCachedSchema(dataBlockSchema) val targetStruct = overlayVariantProjections(dataStruct, req) sparkAdapter.buildVariantProjector(dataStruct, targetStruct) match { - case Some(p) => HOption.of(new JFunction[InternalRow, InternalRow] { - // .copy() because the buffer stores rows into ExternalSpillableMap and - // UnsafeProjection reuses a single output buffer. - override def apply(r: InternalRow): InternalRow = p(r).copy() - }) - case None => HOption.empty[JFunction[InternalRow, InternalRow]]() + case Some(p) => + new CloseableMappingIterator[InternalRow, InternalRow](recordIterator, + new JFunction[InternalRow, InternalRow] { + // .copy() because the buffer stores rows into ExternalSpillableMap and + // UnsafeProjection reuses a single output buffer. + override def apply(r: InternalRow): InternalRow = p(r).copy() + }) + case None => recordIterator } } @@ -159,11 +184,18 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR val (readSchema, readFilters) = getSchemaAndFiltersForRead(parquetReadStructType, hasRowIndexField) if (FSUtils.isLogFile(filePath)) { - // NOTE: now only primary key based filtering is supported for log files - // Variant alignment happens later via getLogBlockRecordProjection in the merge buffer. - new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath) - .asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(requiredSchema, readFilters.asJava) - .asInstanceOf[ClosableIterator[InternalRow]] + // NOTE: now only primary key based filtering is supported for log files. + val reader = new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath) + .asInstanceOf[HoodieSparkParquetReader] + val rawIterator = if (shouldProjectVariants) { + // Thread the variant-overlaid struct (carrying VariantMetadata that HoodieSchema can't + // represent) so parquet-mr decodes variants into the projected struct shape natively, + // mirroring the base-file branch below. Gated so payload-based tables keep full variants. + reader.getUnsafeRowIterator(requiredSchema, structType, readFilters.asJava) + } else { + reader.getUnsafeRowIterator(requiredSchema, readFilters.asJava) + } + rawIterator.asInstanceOf[ClosableIterator[InternalRow]] } else { // partition value is empty because the spark parquet reader will append the partition columns to // each row if they are given. That is the only usage of the partition values in the reader. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java index 14e412ffcb184..bf9ba36a63489 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java @@ -60,7 +60,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.function.Function; import static org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY; import static org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY; @@ -317,12 +316,18 @@ public abstract ClosableIterator mergeBootstrapReaders(ClosableIterator sk List> requiredPartitionFieldAndValues); /** - * Optional per-row transformer applied to log-block records before they reach the merger. - * Engines override this to align records with a projected read schema (e.g. Spark 4.1's - * PushVariantIntoScan). Default is no projection. + * Wraps a log-block record iterator to align its rows with a projected read schema before they + * reach the merger (e.g. Spark 4.1's PushVariantIntoScan rewrites variant columns into struct + * extractions on the base-file side, so avro log rows must be rewritten to match). Default is a + * no-op pass-through; the parquet log path projects natively via the reader, so only engines + * whose avro log reader needs an explicit row rewrite (currently the Spark reader context) + * override this. + * + * @param recordIterator the deserialized log-block records in {@code dataBlockSchema} shape + * @param dataBlockSchema the schema the records are in */ - public Option> getLogBlockRecordProjection(HoodieSchema dataBlockSchema) { - return Option.empty(); + public ClosableIterator projectLogBlockRecords(ClosableIterator recordIterator, HoodieSchema dataBlockSchema) { + return recordIterator; } public Option> getPayloadClasses(TypedProperties props) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java index 9f265d4f26878..382a6cbef6b2e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java @@ -169,7 +169,10 @@ protected ClosableIterator> deserializeRecords( protected ClosableIterator deserializeRecords(HoodieReaderContext readerContext, byte[] content) throws IOException { checkState(this.readerSchema != null, "Reader's schema has to be non-null"); RecordIterator iterator = RecordIterator.getInstance(this, content, readerContext.enableLogicalTimestampFieldRepair()); - return new CloseableMappingIterator<>(iterator, data -> readerContext.getRecordContext().convertAvroRecord(data)); + ClosableIterator records = new CloseableMappingIterator<>(iterator, data -> readerContext.getRecordContext().convertAvroRecord(data)); + // Align records with the engine's projected read schema (e.g. Spark 4.1 PushVariantIntoScan). + // No-op for engines/queries that don't need it. Parquet log blocks project natively in the reader. + return readerContext.projectLogBlockRecords(records, this.readerSchema); } private static class RecordIterator implements ClosableIterator { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java index 30f872421057e..a44663e7f9f00 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java @@ -199,8 +199,8 @@ protected Pair, HoodieSchema> getRecordsIterator(HoodieDataB } else { blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext); } - Pair, HoodieSchema> projectedTransformer = getProjectedTransformer(dataBlock); - return Pair.of(new CloseableMappingIterator<>(blockRecordsIterator, projectedTransformer.getLeft()), projectedTransformer.getRight()); + Pair, HoodieSchema> schemaTransformer = getSchemaTransformerWithEvolvedSchema(dataBlock); + return Pair.of(new CloseableMappingIterator<>(blockRecordsIterator, schemaTransformer.getLeft()), schemaTransformer.getRight()); } catch (IOException e) { throw new HoodieIOException("Failed to deser records from log files ", e); } @@ -280,28 +280,6 @@ protected Pair, HoodieSchema> getSchemaTransformerWithEvolvedSche return Pair.of(transformer, evolvedSchema); } - /** - * Composes schema evolution then the engine's optional log-block record projection - * (currently only Spark 4.1's PushVariantIntoScan). Returns the evolved data-block schema - * — the projector preserves field shape, only rewriting variant fields, so merger - * metadata cols (read by ordinal) stay intact. - * - *

Skipped when a custom payload class is configured: {@code PayloadUpdateProcessor} - * round-trips through {@code convertToAvroRecord} against a schema that still types - * variant fields as {@code VariantType}, which would mis-decode rewritten rows. - */ - protected Pair, HoodieSchema> getProjectedTransformer(HoodieDataBlock dataBlock) { - Pair, HoodieSchema> evolved = getSchemaTransformerWithEvolvedSchema(dataBlock); - if (payloadClasses.isPresent()) { - return evolved; - } - Option> logProjOpt = readerContext.getLogBlockRecordProjection(evolved.getRight()); - if (!logProjOpt.isPresent()) { - return evolved; - } - return Pair.of(evolved.getLeft().andThen(logProjOpt.get()), evolved.getRight()); - } - private static class LogRecordIterator implements ClosableIterator> { private final FileGroupRecordBuffer fileGroupRecordBuffer; private final Iterator> logRecordIterator; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java index 73dd5dda5e870..70d71a087bf1a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java @@ -126,9 +126,9 @@ public void processDataBlock(HoodieDataBlock dataBlock, Option keySpecO partialUpdateModeOpt); } - Pair, HoodieSchema> projectedTransformer = getProjectedTransformer(dataBlock); + Pair, HoodieSchema> schemaTransformer = getSchemaTransformerWithEvolvedSchema(dataBlock); - HoodieSchema schema = HoodieSchemaCache.intern(projectedTransformer.getRight()); + HoodieSchema schema = HoodieSchemaCache.intern(schemaTransformer.getRight()); // TODO: Return an iterator that can generate sequence number with the record. // Then we can hide this logic into data block. @@ -144,9 +144,9 @@ public void processDataBlock(HoodieDataBlock dataBlock, Option keySpecO } long recordPosition = recordPositions.get(recordIndex++); - T projectedNextRecord = projectedTransformer.getLeft().apply(nextRecord); - boolean isDelete = readerContext.getRecordContext().isDeleteRecord(projectedNextRecord, deleteContext); - BufferedRecord bufferedRecord = BufferedRecords.fromEngineRecord(projectedNextRecord, schema, readerContext.getRecordContext(), orderingFieldNames, isDelete); + T transformedNextRecord = schemaTransformer.getLeft().apply(nextRecord); + boolean isDelete = readerContext.getRecordContext().isDeleteRecord(transformedNextRecord, deleteContext); + BufferedRecord bufferedRecord = BufferedRecords.fromEngineRecord(transformedNextRecord, schema, readerContext.getRecordContext(), orderingFieldNames, isDelete); processNextDataRecord(bufferedRecord, recordPosition); } } From 78a81a970b7c4691a2319cd0d3da252c50794483 Mon Sep 17 00:00:00 2001 From: voon Date: Mon, 8 Jun 2026 13:55:20 +0800 Subject: [PATCH 2/3] refactor(variant): address review nits on log-block variant projection - HoodieSparkParquetReader: rename variant-overload parameter to structSchema and drop the no-op alias. - SparkFileFormatInternalRowReaderContext: extract isPayloadBased and drop the double negation in shouldProjectVariants. --- .../apache/hudi/io/storage/HoodieSparkParquetReader.java | 5 ++--- .../hudi/SparkFileFormatInternalRowReaderContext.scala | 7 +++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java index d35d713a8ef1c..5accd40b32718 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java @@ -148,16 +148,15 @@ public ClosableIterator getUnsafeRowIterator(HoodieSchema requestedSc } /** - * Variant overload. {@code projectedRequestedStruct} is the requested Spark schema, which may carry + * Variant overload. {@code structSchema} is the requested Spark schema, which may carry * a Spark 4.1 PushVariantIntoScan variant projection (per-field {@code VariantMetadata}) that * {@link HoodieSchema} cannot represent. Using it as the requested schema makes parquet-mr decode * variant columns into the projected struct shape natively (mirroring the base-file read path) * rather than returning the full {@code VariantType}. {@code requestedSchema} is still used for * vector-column detection (orthogonal to variants) and the timestamp-repair MessageType. */ - public ClosableIterator getUnsafeRowIterator(HoodieSchema requestedSchema, StructType projectedRequestedStruct, List readFilters) throws IOException { + public ClosableIterator getUnsafeRowIterator(HoodieSchema requestedSchema, StructType structSchema, List readFilters) throws IOException { HoodieSchema nonNullSchema = requestedSchema.getNonNullType(); - StructType structSchema = projectedRequestedStruct; // Detect vector columns: ordinal → Vector schema Map vectorColumnInfo = HoodieVectorUtils.detectVectorColumns(nonNullSchema); diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala index 08eea094781e0..f6657493466d0 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala @@ -115,10 +115,9 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR case st: StructType => sparkAdapter.isVariantProjectionStruct(st) case _ => false })) - hasVariantProjection && { - val merger = getRecordMerger() - !(merger != null && merger.isPresent && merger.get.getMergingStrategy == PAYLOAD_BASED_MERGE_STRATEGY_UUID) - } + val merger = getRecordMerger() + val isPayloadBased = merger != null && merger.isPresent && merger.get.getMergingStrategy == PAYLOAD_BASED_MERGE_STRATEGY_UUID + hasVariantProjection && !isPayloadBased } // Aligns avro log-block records with the PushVariantIntoScan-projected variant shape before From 30929396d38c589ae93ddf0779cd0ac6d4fb5cf0 Mon Sep 17 00:00:00 2001 From: voon Date: Mon, 8 Jun 2026 18:50:52 +0800 Subject: [PATCH 3/3] refactor(variant): address review nits on parquet reader param and merger guard - Rename getUnsafeRowIterator param structSchema to projectedStructSchema - Add inline comment explaining the merger != null guard --- .../hudi/io/storage/HoodieSparkParquetReader.java | 10 +++++----- .../hudi/SparkFileFormatInternalRowReaderContext.scala | 2 ++ 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java index 5accd40b32718..7e505b2f049e8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java @@ -148,14 +148,14 @@ public ClosableIterator getUnsafeRowIterator(HoodieSchema requestedSc } /** - * Variant overload. {@code structSchema} is the requested Spark schema, which may carry + * Variant overload. {@code projectedStructSchema} is the requested Spark schema, which may carry * a Spark 4.1 PushVariantIntoScan variant projection (per-field {@code VariantMetadata}) that * {@link HoodieSchema} cannot represent. Using it as the requested schema makes parquet-mr decode * variant columns into the projected struct shape natively (mirroring the base-file read path) * rather than returning the full {@code VariantType}. {@code requestedSchema} is still used for * vector-column detection (orthogonal to variants) and the timestamp-repair MessageType. */ - public ClosableIterator getUnsafeRowIterator(HoodieSchema requestedSchema, StructType structSchema, List readFilters) throws IOException { + public ClosableIterator getUnsafeRowIterator(HoodieSchema requestedSchema, StructType projectedStructSchema, List readFilters) throws IOException { HoodieSchema nonNullSchema = requestedSchema.getNonNullType(); // Detect vector columns: ordinal → Vector schema @@ -164,8 +164,8 @@ public ClosableIterator getUnsafeRowIterator(HoodieSchema requestedSc // For vector columns, replace ArrayType(FloatType) with BinaryType in the read schema // so SparkBasicSchemaEvolution sees matching types (file has FIXED_LEN_BYTE_ARRAY → BinaryType) StructType readStructSchema = vectorColumnInfo.isEmpty() - ? structSchema - : VectorConversionUtils.replaceVectorColumnsWithBinary(structSchema, vectorColumnInfo); + ? projectedStructSchema + : VectorConversionUtils.replaceVectorColumnsWithBinary(projectedStructSchema, vectorColumnInfo); Option messageSchema = Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema)); boolean enableTimestampFieldRepair = storage.getConf().getBoolean(ENABLE_LOGICAL_TIMESTAMP_REPAIR, true); @@ -215,7 +215,7 @@ public ClosableIterator getUnsafeRowIterator(HoodieSchema requestedSc if (!vectorColumnInfo.isEmpty()) { // Post-process: convert binary VECTOR columns back to typed arrays - UnsafeProjection vectorProjection = UnsafeProjection.create(structSchema); + UnsafeProjection vectorProjection = UnsafeProjection.create(projectedStructSchema); Function mapper = VectorConversionUtils.buildRowMapper(readStructSchema, vectorColumnInfo, vectorProjection::apply); CloseableMappingIterator vectorIterator = diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala index f6657493466d0..d7ddb1f9f5964 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala @@ -115,6 +115,8 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR case st: StructType => sparkAdapter.isVariantProjectionStruct(st) case _ => false })) + // getRecordMerger() is a Lombok getter over a field initialized to null (not Option.empty()); + // it stays null until setRecordMerger() runs during reader init, so the null guard is required. val merger = getRecordMerger() val isPayloadBased = merger != null && merger.isPresent && merger.get.getMergingStrategy == PAYLOAD_BASED_MERGE_STRATEGY_UUID hasVariantProjection && !isPayloadBased