Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -265,6 +267,104 @@ public static void convertRowVectorColumns(InternalRow row, GenericInternalRow r
}
}

// ---------------------------------------------------------------------------
// Blob descriptor-mode helpers (Parquet DESCRIPTOR read path)
// ---------------------------------------------------------------------------

/**
* Detects BLOB columns from Spark StructType metadata annotations.
*
* @param schema Spark StructType (may be null)
* @return set of field ordinals that are BLOB columns; empty set if none found
*/
public static Set<Integer> detectBlobColumnsFromMetadata(StructType schema) {
Set<Integer> blobColumnIndices = new LinkedHashSet<>();
if (schema == null) {
return blobColumnIndices;
}
StructField[] fields = schema.fields();
for (int i = 0; i < fields.length; i++) {
StructField field = fields[i];
if (field.metadata().contains(HoodieSchema.TYPE_METADATA_FIELD)) {
String typeStr = field.metadata().getString(HoodieSchema.TYPE_METADATA_FIELD);
HoodieSchema parsed = HoodieSchema.parseTypeDescriptor(typeStr);
if (parsed != null && parsed.getType() == HoodieSchemaType.BLOB) {
blobColumnIndices.add(i);
}
}
}
return blobColumnIndices;
}

/**
* Strips the {@code data} sub-field from BLOB struct columns so the Parquet reader
* skips the binary column chunk entirely (genuine I/O savings).
*
* <p>The returned schema has 2-field blob structs: {@code {type, reference}} instead of
* the full {@code {type, data, reference}}. Use {@link #buildBlobNullPadRowMapper} to
* re-insert null at the {@code data} position after reading.
*
* @param schema the original Spark schema
* @param blobColumns ordinals of blob columns (from {@link #detectBlobColumnsFromMetadata})
* @return a new StructType with the {@code data} sub-field removed from blob structs
*/
public static StructType stripBlobDataField(StructType schema, Set<Integer> blobColumns) {
StructField[] fields = schema.fields();
StructField[] newFields = new StructField[fields.length];
for (int i = 0; i < fields.length; i++) {
if (blobColumns.contains(i) && fields[i].dataType() instanceof StructType) {
StructType blobStruct = (StructType) fields[i].dataType();
List<StructField> kept = new ArrayList<>();
for (StructField sub : blobStruct.fields()) {
if (!sub.name().equals(HoodieSchema.Blob.INLINE_DATA_FIELD)) {
kept.add(sub);
}
}
StructType strippedStruct = new StructType(kept.toArray(new StructField[0]));
newFields[i] = new StructField(fields[i].name(), strippedStruct, fields[i].nullable(), fields[i].metadata());
} else {
newFields[i] = fields[i];
}
}
return new StructType(newFields);
}

/**
* Returns a {@link Function} that expands 2-field blob structs {@code {type, reference}}
* back to 3-field structs {@code {type, null, reference}} by inserting null at the
* {@code data} position, then applies the projection callback.
*
* @param readSchema the Spark schema of incoming rows (blob structs have 2 fields)
* @param blobColumns ordinals of blob columns in {@code readSchema}
* @param projectionCallback called with the expanded row; must copy any data it needs to retain
* @return a function that converts one row and returns the projected result
*/
public static Function<InternalRow, InternalRow> buildBlobNullPadRowMapper(
StructType readSchema,
Set<Integer> blobColumns,
Function<InternalRow, InternalRow> projectionCallback) {
int numFields = readSchema.fields().length;
GenericInternalRow buffer = new GenericInternalRow(numFields);
return row -> {
for (int i = 0; i < numFields; i++) {
if (row.isNullAt(i)) {
buffer.setNullAt(i);
} else if (blobColumns.contains(i)) {
InternalRow blobStruct = row.getStruct(i, 2);
// Expand {type, reference} → {type, null, reference}
GenericInternalRow expanded = new GenericInternalRow(3);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new GenericInternalRow(3) allocates per blob column per row on a hot path. For a 1M-row scan with 2 blob columns that's 2M short-lived heap allocations.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the hardcoded 3 is fragile.

expanded.update(0, blobStruct.isNullAt(0) ? null : blobStruct.getUTF8String(0));
expanded.setNullAt(1);
expanded.update(2, blobStruct.isNullAt(1) ? null : blobStruct.getStruct(1, HoodieSchema.Blob.getReferenceFieldCount()));
buffer.update(i, expanded);
} else {
buffer.update(i, row.get(i, readSchema.apply(i).dataType()));
}
}
return projectionCallback.apply(buffer);
};
}

/**
* Re-attaches {@link HoodieSchema#TYPE_METADATA_FIELD} to Spark fields that are
* Arrow {@code FixedSizeList<Float32|Float64, dim>} in the Lance file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,41 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
structType
}

val (readSchema, readFilters) = getSchemaAndFiltersForRead(parquetReadStructType, hasRowIndexField)
// Blob DESCRIPTOR mode: strip `data` sub-field from blob structs for Parquet base files.
// Applied after vector rewrite; not applied to Lance base files or log files.
// Columns referenced by read_blob() in the current query (carried in the Hadoop conf via
// BLOB_INLINE_READ_FORCE_CONTENT_COLUMNS, set by ReadBlobRule per-query) are excluded from
// the strip set so the bytes survive for BatchedBlobReader to materialize.
val isParquetBaseFile = FSUtils.isBaseFile(filePath) && !isLanceBaseFile
import org.apache.hudi.common.config.HoodieReaderConfig
val hadoopConf = storageConfiguration.unwrapAs(classOf[Configuration])
val isBlobDescriptorMode = isParquetBaseFile && {
val modeValue = hadoopConf.get(HoodieReaderConfig.BLOB_INLINE_READ_MODE.key(),
HoodieReaderConfig.BLOB_INLINE_READ_MODE.defaultValue())
modeValue.equalsIgnoreCase(HoodieReaderConfig.BLOB_INLINE_READ_MODE_DESCRIPTOR)
}
val forceContentCols: Set[String] = if (isBlobDescriptorMode) {
Option(hadoopConf.get(HoodieReaderConfig.BLOB_INLINE_READ_FORCE_CONTENT_COLUMNS))
.map(_.split(",").iterator.map(_.trim).filter(_.nonEmpty).toSet)
.getOrElse(Set.empty)
} else {
Set.empty
}
val blobColumnIndices: Set[Int] = if (isBlobDescriptorMode) {
val detected = VectorConversionUtils.detectBlobColumnsFromMetadata(parquetReadStructType).asScala.map(_.intValue()).toSet
if (forceContentCols.isEmpty) detected
else detected.filterNot(idx => forceContentCols.contains(parquetReadStructType.fields(idx).name))
} else {
Set.empty
}
val blobReadStructType = if (blobColumnIndices.nonEmpty) {
val javaBlobCols: java.util.Set[Integer] = blobColumnIndices.map(Integer.valueOf).asJava
VectorConversionUtils.stripBlobDataField(parquetReadStructType, javaBlobCols)
} else {
parquetReadStructType
}

val (readSchema, readFilters) = getSchemaAndFiltersForRead(blobReadStructType, hasRowIndexField)
if (FSUtils.isLogFile(filePath)) {
// NOTE: now only primary key based filtering is supported for log files
new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath)
Expand All @@ -120,12 +154,18 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
readSchema, StructType(Seq.empty), getSchemaHandler.getInternalSchemaOpt,
readFilters, storage.getConf.asInstanceOf[StorageConfiguration[Configuration]], tableSchemaOpt))

// Post-process: convert binary VECTOR columns back to typed arrays
if (vectorColumnInfo.nonEmpty) {
SparkFileFormatInternalRowReaderContext.wrapWithVectorConversion(rawIterator, vectorColumnInfo, readSchema)
// Post-process: re-insert null `data` field into blob structs, then convert vectors
val blobPaddedIterator = if (blobColumnIndices.nonEmpty) {
SparkFileFormatInternalRowReaderContext.wrapWithBlobNullPadding(rawIterator, blobColumnIndices, readSchema, parquetReadStructType)
} else {
rawIterator
}

if (vectorColumnInfo.nonEmpty) {
SparkFileFormatInternalRowReaderContext.wrapWithVectorConversion(blobPaddedIterator, vectorColumnInfo, if (blobColumnIndices.nonEmpty) parquetReadStructType else readSchema)
} else {
blobPaddedIterator
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 For MOR tables with log files, the log-file branch (line 123-126) reads with the full requiredSchema, so log records keep their populated data field, while base-file records under DESCRIPTOR get data=null. After merge the user sees a mix: records updated via log have bytes, records still in base have null. Is this the intended semantics, or should DESCRIPTOR also null the data on log-file rows for consistency?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

}
}

Expand Down Expand Up @@ -375,4 +415,23 @@ object SparkFileFormatInternalRowReaderContext {
}
}

/**
* Wraps a closable iterator to re-insert null {@code data} fields into blob structs
* after Parquet DESCRIPTOR mode read (expanding 2-field → 3-field structs).
*/
private[hudi] def wrapWithBlobNullPadding(
iterator: ClosableIterator[InternalRow],
blobColumnIndices: Set[Int],
readSchema: StructType,
targetSchema: StructType): ClosableIterator[InternalRow] = {
val javaBlobCols: java.util.Set[Integer] = blobColumnIndices.map(Integer.valueOf).asJava
val projection = UnsafeProjection.create(targetSchema)
val mapper = VectorConversionUtils.buildBlobNullPadRowMapper(readSchema, javaBlobCols, projection.apply(_))
new ClosableIterator[InternalRow] {
override def hasNext: Boolean = iterator.hasNext
override def next(): InternalRow = mapper.apply(iterator.next())
override def close(): Unit = iterator.close()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,21 @@ public class HoodieReaderConfig extends HoodieConfig {
.markAdvanced()
.sinceVersion("1.2.0")
.withValidValues(BLOB_INLINE_READ_MODE_CONTENT, BLOB_INLINE_READ_MODE_DESCRIPTOR)
.withDocumentation("How Hudi interprets INLINE BLOB values on read. "
+ "CONTENT (default) returns the raw inline bytes. "
+ "DESCRIPTOR returns an OUT_OF_LINE-shaped reference pointing at the backing "
+ "Lance file with the INLINE payload's position and size, so callers can defer "
+ "the byte read via read_blob().");
.withDocumentation("How Hudi interprets INLINE BLOB values on read for plain column access "
+ "(e.g. SELECT *). "

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
+ "(e.g. SELECT *). "
+ "(e.g. SELECT blob_column FROM table). "

+ "CONTENT (default) returns the raw inline bytes in the data field. "

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should flip the default to DESCRIPTOR and let all table services to still use CONTENT mode.

+ "DESCRIPTOR suppresses the inline bytes (data field is null) so direct column reads "
+ "avoid the I/O cost of materializing large binary payloads. "
+ "For Lance files, the reference struct is populated with blob stream coordinates. "
+ "For Parquet files, the data column is skipped via nested column projection and the "
+ "reference struct is null. "
+ "read_blob() is the canonical bytes-materializing API and always returns bytes "
+ "regardless of this setting; under DESCRIPTOR mode the engine reads the data column "
+ "only for the blob columns referenced by read_blob() in the query.");

// Internal-only key set by ReadBlobRule on a per-query HadoopFsRelation.options to instruct
// the reader to skip the DESCRIPTOR data-column strip for the listed blob columns, so that
// read_blob() sees the materialized bytes. Comma-separated top-level column names. Not user-facing.
public static final String BLOB_INLINE_READ_FORCE_CONTENT_COLUMNS =
"hoodie.internal.read.blob.inline.force.content.columns";
Comment on lines +129 to +130

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that this internal config intends to differentiate the blob column reading of read_blob(col1) vs col2 in SELECT read_blob(col1), col2 FROM table so that col1 is read out with CONTENT and col2 is read out with DESCRIPTOR. However, Lance reader can only be configured with CONTENT or DESCRIPTOR mode for all blob columns. Should we revisit the expected behavior of hoodie.read.blob.inline.mode?

To simplify the experience, it makes sense for user to only have read_blob(col) or col in the same query where col is a blob column. If a mix of both exist, all blob columns are read out with content.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thus, all blob columns are read in DESCRIPTOR mode if no read_blob(col) is used. All blob columns are read in CONTENT mode if any read_blob(col) exists in the query.

}
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,16 @@ abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext,

override def buildFileFormat(): FileFormat = {
val tableConfig = metaClient.getTableConfig

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you rename blobDescriptorMode to isBlobDescriptorMode? It's a boolean and the constructor parameter it's passed into (isBlobDescriptorMode) already uses the is prefix — the local val not following suit is a small but jarring inconsistency.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

val blobDescriptorMode = optParams.getOrElse(
HoodieReaderConfig.BLOB_INLINE_READ_MODE.key(),
HoodieReaderConfig.BLOB_INLINE_READ_MODE.defaultValue()
).equalsIgnoreCase(HoodieReaderConfig.BLOB_INLINE_READ_MODE_DESCRIPTOR)
new HoodieFileGroupReaderBasedFileFormat(basePath.toString,
HoodieTableSchema(tableStructSchema, tableSchema, internalSchemaOpt),
tableConfig.getTableName, queryTimestamp.get, getMandatoryFields, isMOR, isBootstrap,
isIncremental, validCommits, shouldUseRecordPosition, getRequiredFilters,
tableConfig.isMultipleBaseFileFormatsEnabled, tableConfig.getBaseFileFormat)
tableConfig.isMultipleBaseFileFormatsEnabled, tableConfig.getBaseFileFormat,
isBlobDescriptorMode = blobDescriptorMode)
}

override def buildBucketSpec(): Option[BucketSpec] = None
Expand Down
Loading
Loading