Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,28 @@ public ClosableIterator<UnsafeRow> getUnsafeRowIterator(HoodieSchema requestedSc
*/
public ClosableIterator<UnsafeRow> getUnsafeRowIterator(HoodieSchema requestedSchema, List<Filter> readFilters) throws IOException {
HoodieSchema nonNullSchema = requestedSchema.getNonNullType();
StructType structSchema = HoodieInternalRowUtils.getCachedSchema(nonNullSchema);
return getUnsafeRowIterator(nonNullSchema, HoodieInternalRowUtils.getCachedSchema(nonNullSchema), readFilters);
}

/**
* Variant overload. {@code projectedStructSchema} is the requested Spark schema, which may carry
* a Spark 4.1 PushVariantIntoScan variant projection (per-field {@code VariantMetadata}) that
* {@link HoodieSchema} cannot represent. Using it as the requested schema makes parquet-mr decode
* variant columns into the projected struct shape natively (mirroring the base-file read path)
* rather than returning the full {@code VariantType}. {@code requestedSchema} is still used for
* vector-column detection (orthogonal to variants) and the timestamp-repair MessageType.
*/
public ClosableIterator<UnsafeRow> getUnsafeRowIterator(HoodieSchema requestedSchema, StructType projectedStructSchema, List<Filter> readFilters) throws IOException {
HoodieSchema nonNullSchema = requestedSchema.getNonNullType();

// Detect vector columns: ordinal → Vector schema
Map<Integer, HoodieSchema.Vector> vectorColumnInfo = HoodieVectorUtils.detectVectorColumns(nonNullSchema);

// For vector columns, replace ArrayType(FloatType) with BinaryType in the read schema
// so SparkBasicSchemaEvolution sees matching types (file has FIXED_LEN_BYTE_ARRAY → BinaryType)
StructType readStructSchema = vectorColumnInfo.isEmpty()
? structSchema
: VectorConversionUtils.replaceVectorColumnsWithBinary(structSchema, vectorColumnInfo);
? projectedStructSchema
: VectorConversionUtils.replaceVectorColumnsWithBinary(projectedStructSchema, vectorColumnInfo);

Option<MessageType> messageSchema = Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema));
boolean enableTimestampFieldRepair = storage.getConf().getBoolean(ENABLE_LOGICAL_TIMESTAMP_REPAIR, true);
Expand Down Expand Up @@ -203,7 +215,7 @@ public ClosableIterator<UnsafeRow> getUnsafeRowIterator(HoodieSchema requestedSc

if (!vectorColumnInfo.isEmpty()) {
// Post-process: convert binary VECTOR columns back to typed arrays
UnsafeProjection vectorProjection = UnsafeProjection.create(structSchema);
UnsafeProjection vectorProjection = UnsafeProjection.create(projectedStructSchema);
Function<InternalRow, InternalRow> mapper =
VectorConversionUtils.buildRowMapper(readStructSchema, vectorColumnInfo, vectorProjection::apply);
CloseableMappingIterator<UnsafeRow, UnsafeRow> vectorIterator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForB
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID
import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaUtils}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
import org.apache.hudi.common.util.HoodieVectorUtils
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.collection.{CachingIterator, ClosableIterator, Pair => HPair}
import org.apache.hudi.common.util.collection.{CachingIterator, ClosableIterator, CloseableMappingIterator, Pair => HPair}
import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, HoodieSparkParquetReader, VectorConversionUtils}
import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath}
import org.apache.hudi.util.CloseableInternalRowIterator
Expand Down Expand Up @@ -86,6 +87,14 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR

// For each field of `target`, replace its dataType with the matching field's projected
// variant struct from `source` (when present). Non-matching fields pass through.
//
// Why a parallel `sparkRequiredSchema` overlay exists at all (#18739 sub-task 4): a Spark 4.1
// PushVariantIntoScan projection carries per-field `VariantMetadata` (extraction path / timezone /
// failOnError) that `HoodieSchema` has nowhere to store — `HoodieSparkSchemaConverters` collapses
// the projected struct back to a plain VARIANT and the reverse can't reconstruct the metadata. So
// the engine must keep the projected Spark schema alongside the HoodieSchema; it cannot be
// recovered from a HoodieSchema round-trip. Kept Spark-side on purpose so the engine-neutral schema
// model stays free of Spark-4.1 variant concepts.
private def overlayVariantProjections(target: StructType, source: StructType): StructType = {
StructType(target.fields.map { f =>
SparkFileFormatInternalRowReaderContext.findFieldByName(source, f.name).map(_.dataType) match {
Expand All @@ -96,29 +105,46 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
})
}

// Aligns log-block records with the PushVariantIntoScan-projected variant shape before
// they reach the merger. Preserves merger metadata cols (_hoodie_record_key,
// _tmp_metadata_row_index) which the merger reads by ordinal — projecting down to the
// bare required schema would drop them and the merger would read garbage offsets.
override def getLogBlockRecordProjection(
dataBlockSchema: HoodieSchema): HOption[JFunction[InternalRow, InternalRow]] = {
val needsProjection = sparkRequiredSchema.exists(_.fields.exists(f => f.dataType match {
// True only when there is a Spark 4.1 PushVariantIntoScan projection to apply AND the table is
// not using a custom (payload-based) merger. Payload-based tables round-trip records through
// PayloadUpdateProcessor.convertToAvroRecord against a schema that still types variant fields as
// VariantType, so a row already rewritten into the projected struct shape would be mis-decoded.
// Single source of truth for both reader paths (parquet native projection + avro rewrite).
private def shouldProjectVariants: Boolean = {
val hasVariantProjection = sparkRequiredSchema.exists(_.fields.exists(_.dataType match {
case st: StructType => sparkAdapter.isVariantProjectionStruct(st)
case _ => false
Comment thread
voonhous marked this conversation as resolved.
}))
if (!needsProjection) {
return HOption.empty[JFunction[InternalRow, InternalRow]]()
// getRecordMerger() is a Lombok getter over a field initialized to null (not Option.empty());
// it stays null until setRecordMerger() runs during reader init, so the null guard is required.
val merger = getRecordMerger()
val isPayloadBased = merger != null && merger.isPresent && merger.get.getMergingStrategy == PAYLOAD_BASED_MERGE_STRATEGY_UUID
hasVariantProjection && !isPayloadBased
}

// Aligns avro log-block records with the PushVariantIntoScan-projected variant shape before
// they reach the merger. Preserves merger metadata cols (_hoodie_record_key,
// _tmp_metadata_row_index) which the merger reads by ordinal — projecting down to the bare
// required schema would drop them and the merger would read garbage offsets. Parquet log blocks
// project natively in getFileRecordIterator, so only the avro log reader calls this hook.
override def projectLogBlockRecords(
recordIterator: ClosableIterator[InternalRow],
dataBlockSchema: HoodieSchema): ClosableIterator[InternalRow] = {
if (!shouldProjectVariants) {
return recordIterator
}
val req = sparkRequiredSchema.get

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: req is a bit terse here — the adjacent variables (dataStruct, targetStruct) are more descriptive. Could you rename it to requiredStruct to stay consistent?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

val dataStruct = HoodieInternalRowUtils.getCachedSchema(dataBlockSchema)
val targetStruct = overlayVariantProjections(dataStruct, req)
sparkAdapter.buildVariantProjector(dataStruct, targetStruct) match {
case Some(p) => HOption.of(new JFunction[InternalRow, InternalRow] {
// .copy() because the buffer stores rows into ExternalSpillableMap and
// UnsafeProjection reuses a single output buffer.
override def apply(r: InternalRow): InternalRow = p(r).copy()
})
case None => HOption.empty[JFunction[InternalRow, InternalRow]]()
case Some(p) =>
new CloseableMappingIterator[InternalRow, InternalRow](recordIterator,
new JFunction[InternalRow, InternalRow] {
// .copy() because the buffer stores rows into ExternalSpillableMap and
// UnsafeProjection reuses a single output buffer.
override def apply(r: InternalRow): InternalRow = p(r).copy()
})
case None => recordIterator
}
}

Expand Down Expand Up @@ -159,11 +185,18 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR

val (readSchema, readFilters) = getSchemaAndFiltersForRead(parquetReadStructType, hasRowIndexField)
if (FSUtils.isLogFile(filePath)) {
// NOTE: now only primary key based filtering is supported for log files
// Variant alignment happens later via getLogBlockRecordProjection in the merge buffer.
new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath)
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(requiredSchema, readFilters.asJava)
.asInstanceOf[ClosableIterator[InternalRow]]
// NOTE: now only primary key based filtering is supported for log files.
val reader = new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath)
.asInstanceOf[HoodieSparkParquetReader]
val rawIterator = if (shouldProjectVariants) {
// Thread the variant-overlaid struct (carrying VariantMetadata that HoodieSchema can't
// represent) so parquet-mr decodes variants into the projected struct shape natively,
// mirroring the base-file branch below. Gated so payload-based tables keep full variants.
reader.getUnsafeRowIterator(requiredSchema, structType, readFilters.asJava)
} else {
reader.getUnsafeRowIterator(requiredSchema, readFilters.asJava)
}
rawIterator.asInstanceOf[ClosableIterator[InternalRow]]
} else {
// partition value is empty because the spark parquet reader will append the partition columns to
// each row if they are given. That is the only usage of the partition values in the reader.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY;
import static org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY;
Expand Down Expand Up @@ -317,12 +316,18 @@ public abstract ClosableIterator<T> mergeBootstrapReaders(ClosableIterator<T> sk
List<Pair<String, Object>> requiredPartitionFieldAndValues);

/**
* Optional per-row transformer applied to log-block records before they reach the merger.
* Engines override this to align records with a projected read schema (e.g. Spark 4.1's
* PushVariantIntoScan). Default is no projection.
* Wraps a log-block record iterator to align its rows with a projected read schema before they
* reach the merger (e.g. Spark 4.1's PushVariantIntoScan rewrites variant columns into struct
* extractions on the base-file side, so avro log rows must be rewritten to match). Default is a
* no-op pass-through; the parquet log path projects natively via the reader, so only engines
* whose avro log reader needs an explicit row rewrite (currently the Spark reader context)
* override this.
*
* @param recordIterator the deserialized log-block records in {@code dataBlockSchema} shape
* @param dataBlockSchema the schema the records are in
*/
public Option<Function<T, T>> getLogBlockRecordProjection(HoodieSchema dataBlockSchema) {
return Option.empty();
public ClosableIterator<T> projectLogBlockRecords(ClosableIterator<T> recordIterator, HoodieSchema dataBlockSchema) {
return recordIterator;
}

public Option<Pair<String, String>> getPayloadClasses(TypedProperties props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ protected <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(
protected <T> ClosableIterator<T> deserializeRecords(HoodieReaderContext<T> readerContext, byte[] content) throws IOException {
checkState(this.readerSchema != null, "Reader's schema has to be non-null");
RecordIterator iterator = RecordIterator.getInstance(this, content, readerContext.enableLogicalTimestampFieldRepair());
return new CloseableMappingIterator<>(iterator, data -> readerContext.getRecordContext().convertAvroRecord(data));
ClosableIterator<T> records = new CloseableMappingIterator<>(iterator, data -> readerContext.getRecordContext().convertAvroRecord(data));
// Align records with the engine's projected read schema (e.g. Spark 4.1 PushVariantIntoScan).
// No-op for engines/queries that don't need it. Parquet log blocks project natively in the reader.
return readerContext.projectLogBlockRecords(records, this.readerSchema);
}

private static class RecordIterator implements ClosableIterator<IndexedRecord> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ protected Pair<ClosableIterator<T>, HoodieSchema> getRecordsIterator(HoodieDataB
} else {
blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext);
}
Pair<Function<T, T>, HoodieSchema> projectedTransformer = getProjectedTransformer(dataBlock);
return Pair.of(new CloseableMappingIterator<>(blockRecordsIterator, projectedTransformer.getLeft()), projectedTransformer.getRight());
Pair<Function<T, T>, HoodieSchema> schemaTransformer = getSchemaTransformerWithEvolvedSchema(dataBlock);
Comment thread
wombatu-kun marked this conversation as resolved.
return Pair.of(new CloseableMappingIterator<>(blockRecordsIterator, schemaTransformer.getLeft()), schemaTransformer.getRight());
} catch (IOException e) {
throw new HoodieIOException("Failed to deser records from log files ", e);
}
Expand Down Expand Up @@ -280,28 +280,6 @@ protected Pair<Function<T, T>, HoodieSchema> getSchemaTransformerWithEvolvedSche
return Pair.of(transformer, evolvedSchema);
}

/**
* Composes schema evolution then the engine's optional log-block record projection
* (currently only Spark 4.1's PushVariantIntoScan). Returns the evolved data-block schema
* — the projector preserves field shape, only rewriting variant fields, so merger
* metadata cols (read by ordinal) stay intact.
*
* <p>Skipped when a custom payload class is configured: {@code PayloadUpdateProcessor}
* round-trips through {@code convertToAvroRecord} against a schema that still types
* variant fields as {@code VariantType}, which would mis-decode rewritten rows.
*/
protected Pair<Function<T, T>, HoodieSchema> getProjectedTransformer(HoodieDataBlock dataBlock) {
Pair<Function<T, T>, HoodieSchema> evolved = getSchemaTransformerWithEvolvedSchema(dataBlock);
if (payloadClasses.isPresent()) {
return evolved;
}
Option<Function<T, T>> logProjOpt = readerContext.getLogBlockRecordProjection(evolved.getRight());
if (!logProjOpt.isPresent()) {
return evolved;
}
return Pair.of(evolved.getLeft().andThen(logProjOpt.get()), evolved.getRight());
}

private static class LogRecordIterator<T> implements ClosableIterator<BufferedRecord<T>> {
private final FileGroupRecordBuffer<T> fileGroupRecordBuffer;
private final Iterator<BufferedRecord<T>> logRecordIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecO
partialUpdateModeOpt);
}

Pair<Function<T, T>, HoodieSchema> projectedTransformer = getProjectedTransformer(dataBlock);
Pair<Function<T, T>, HoodieSchema> schemaTransformer = getSchemaTransformerWithEvolvedSchema(dataBlock);

HoodieSchema schema = HoodieSchemaCache.intern(projectedTransformer.getRight());
HoodieSchema schema = HoodieSchemaCache.intern(schemaTransformer.getRight());

// TODO: Return an iterator that can generate sequence number with the record.
// Then we can hide this logic into data block.
Expand All @@ -144,9 +144,9 @@ public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecO
}

long recordPosition = recordPositions.get(recordIndex++);
T projectedNextRecord = projectedTransformer.getLeft().apply(nextRecord);
boolean isDelete = readerContext.getRecordContext().isDeleteRecord(projectedNextRecord, deleteContext);
BufferedRecord<T> bufferedRecord = BufferedRecords.fromEngineRecord(projectedNextRecord, schema, readerContext.getRecordContext(), orderingFieldNames, isDelete);
T transformedNextRecord = schemaTransformer.getLeft().apply(nextRecord);
boolean isDelete = readerContext.getRecordContext().isDeleteRecord(transformedNextRecord, deleteContext);
BufferedRecord<T> bufferedRecord = BufferedRecords.fromEngineRecord(transformedNextRecord, schema, readerContext.getRecordContext(), orderingFieldNames, isDelete);
processNextDataRecord(bufferedRecord, recordPosition);
}
}
Expand Down
Loading