type:story Add Parquet DESCRIPTOR mode for blob inline reading#18683
type:story Add Parquet DESCRIPTOR mode for blob inline reading#18683rahil-c wants to merge 2 commits into
Conversation
When hoodie.read.blob.inline.mode=DESCRIPTOR is set with Parquet base files, leverage Parquet's nested column projection to skip reading the blob 'data' sub-column entirely (genuine I/O savings). Previously the config only affected Lance reads; Parquet still materialized the bytes. Approach mirrors the existing VECTOR column rewrite pattern in HoodieFileGroupReaderBasedFileFormat: 1. Detect blob columns via schema metadata 2. Strip the 'data' sub-field from blob structs in the read schema 3. Post-read null-pad the 'data' field back into output rows Both COW (HoodieFileGroupReaderBasedFileFormat.readBaseFile) and MOR (SparkFileFormatInternalRowReaderContext.getFileRecordIterator) paths are covered. Also adds defensive null check in BatchedBlobReader. read_blob() on Parquet DESCRIPTOR rows returns null since Parquet has no byte-range blob access like Lance — documented as known limitation. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR adds Parquet DESCRIPTOR mode for blob inline reading by stripping the data sub-field from blob structs and null-padding it back post-read, with a ReadBlobRule mechanism to downgrade to CONTENT when read_blob() is in scope. The mechanism for sharing the flip across queries against the same temp view raises a couple of correctness questions worth a closer look — please see the inline comments. Once those are addressed, this should be ready for a Hudi committer or PMC member to take it from here. A couple of naming nits in ReadBlobRule — containsReadBlobAnywhere looks too similar to the existing containsReadBlobInAnyExpression (different scopes, look-alike names), and syncDescriptorMode reads like a thread-sync operation rather than a policy-apply step.
| // Mutable so ReadBlobRule can flip DESCRIPTOR→CONTENT when read_blob() appears in a query. | ||
| // We mutate in place because Spark's planner/AQE retains a reference to this FileFormat | ||
| // instance even after the optimizer rewrites the LogicalRelation, so swapping wouldn't stick. | ||
| @volatile private var _isBlobDescriptorMode: Boolean = initialBlobDescriptorMode |
There was a problem hiding this comment.
🤖 I'm worried this mutable flag races across concurrent queries that share a FileFormat instance via a temp view. If query A (uses read_blob()) sets the flag to false during its optimization, and query B (no read_blob()) runs concurrently and calls restoreBlobDescriptorMode() before A's tasks are serialized, A's executors will deserialize the FileFormat with flag=true — the executor-side readBaseFile (line 565) reads isBlobDescriptorMode and would strip the data column, causing read_blob() to silently return null. Same hazard in the opposite direction for B. Async query submission (e.g. Futures, Airflow, multi-cell notebooks) makes this realistic. Have you considered passing the per-query mode through the LogicalRelation's options map / Hadoop conf at rule time instead of mutating shared state? @nsivabalan @yihua thoughts on whether this concurrency surface is acceptable here?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| SparkFileFormatInternalRowReaderContext.wrapWithVectorConversion(blobPaddedIterator, vectorColumnInfo, if (blobColumnIndices.nonEmpty) parquetReadStructType else readSchema) | ||
| } else { | ||
| blobPaddedIterator | ||
| } |
There was a problem hiding this comment.
🤖 For MOR tables with log files, the log-file branch (line 123-126) reads with the full requiredSchema, so log records keep their populated data field, while base-file records under DESCRIPTOR get data=null. After merge the user sees a mix: records updated via log have bytes, records still in base have null. Is this the intended semantics, or should DESCRIPTOR also null the data on log-file rows for consistency?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| // uses read_blob(), and restore DESCRIPTOR for queries that don't (the FileFormat instance | ||
| // is shared across queries against the same temp view). | ||
| val needsContentMode = containsReadBlobAnywhere(plan) || plan.exists(_.isInstanceOf[BatchedBlobRead]) | ||
| syncDescriptorMode(plan, forceContentMode = needsContentMode) |
There was a problem hiding this comment.
🤖 syncDescriptorMode is a side effect inside an optimizer rule's apply. Spark's optimizer batches typically run rules to a fixed point, so this can fire multiple times per query. It's idempotent within a single query (good), but the side effect on a shared FileFormat ordering between concurrent queries' optimizer passes is what creates the race I flagged in HoodieFileGroupReaderBasedFileFormat. Have you considered mapChildren / structural rewrites to encode the mode in the plan (e.g. as a hint or option) rather than mutating the FileFormat?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| plan.expressions.exists(containsReadBlobInExpression) | ||
| } | ||
|
|
||
| private def containsReadBlobAnywhere(plan: LogicalPlan): Boolean = { |
There was a problem hiding this comment.
🤖 nit: containsReadBlobAnywhere is dangerously close to the existing containsReadBlobInAnyExpression just above it, but does something meaningfully different — it walks the entire plan tree rather than checking only the current node's expressions. A name like planTreeContainsReadBlob or treeContainsReadBlob would make the scope distinction immediately obvious to the next person who touches this rule.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
|
|
||
| // Lance scans are skipped because their hoodieFileFormat != PARQUET and Lance already | ||
| // supports read_blob() under DESCRIPTOR via byte-range references. | ||
| private def syncDescriptorMode(plan: LogicalPlan, forceContentMode: Boolean): Unit = { |
There was a problem hiding this comment.
🤖 nit: syncDescriptorMode reads like a concurrency-sync operation rather than a policy-apply step. Something like applyDescriptorModePolicy or updateDescriptorModeForPlan would make it clearer that this is deciding whether to downgrade DESCRIPTOR→CONTENT based on query content, not synchronizing threads.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
da8bdba to
fd7c3da
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The change extends DESCRIPTOR mode to Parquet base files with a clean strip-and-null-pad pattern that mirrors the existing vector rewrite, plus per-relation option routing in ReadBlobRule to preserve read_blob() semantics. One inline comment about a test annotation that may not be doing what's intended. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small readability suggestions — a boolean naming inconsistency and a duplicated method body across two files.
| * BLOB_INLINE_READ_FORCE_CONTENT_COLUMNS option must land on the | ||
| * left table's LogicalRelation only, and the right table's payload | ||
| * must come back with DESCRIPTOR's null `data`. A bug where the | ||
| * rule writes the option to every Hudi LogicalRelation, or to |
There was a problem hiding this comment.
🤖 Heads-up: @Ignore is from JUnit 4 (org.junit.Ignore), but the surrounding test class uses JUnit 5 (@Test from org.junit.jupiter.api). The Jupiter engine doesn't honor JUnit 4's @Ignore, so this test will actually execute rather than be skipped — the rest of the file uses @Disabled from org.junit.jupiter.api.Disabled for that purpose. Given the TODO to re-enable comment, was the intent to skip this? If so, switching to @Disabled("reason") would do it.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| @@ -237,11 +237,16 @@ abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext, | |||
|
|
|||
| override def buildFileFormat(): FileFormat = { | |||
| val tableConfig = metaClient.getTableConfig | |||
There was a problem hiding this comment.
🤖 nit: could you rename blobDescriptorMode to isBlobDescriptorMode? It's a boolean and the constructor parameter it's passed into (isBlobDescriptorMode) already uses the is prefix — the local val not following suit is a small but jarring inconsistency.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| * Wraps an iterator to re-insert null {@code data} fields into blob structs | ||
| * after Parquet DESCRIPTOR mode read (expanding 2-field → 3-field structs). | ||
| */ | ||
| private def wrapWithBlobNullPadding(iter: Iterator[InternalRow], |
There was a problem hiding this comment.
🤖 nit: the body of this wrapWithBlobNullPadding (create projection → create mapper → wrap iterator) is almost identical to SparkFileFormatInternalRowReaderContext.wrapWithBlobNullPadding at line 421 of that file — only the iterator type differs. Have you considered consolidating the shared setup into VectorConversionUtils or a small shared utility so the two call sites can't silently diverge if buildBlobNullPadRowMapper's signature changes?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
|
File ticket to have default inline mode to descriptor, currently its CONTENT (as for compaction we always need to materialize). |
| + "Lance file with the INLINE payload's position and size, so callers can defer " | ||
| + "the byte read via read_blob()."); | ||
| .withDocumentation("How Hudi interprets INLINE BLOB values on read for plain column access " | ||
| + "(e.g. SELECT *). " |
There was a problem hiding this comment.
| + "(e.g. SELECT *). " | |
| + "(e.g. SELECT blob_column FROM table). " |
| + "the byte read via read_blob()."); | ||
| .withDocumentation("How Hudi interprets INLINE BLOB values on read for plain column access " | ||
| + "(e.g. SELECT *). " | ||
| + "CONTENT (default) returns the raw inline bytes in the data field. " |
There was a problem hiding this comment.
We should flip the default to DESCRIPTOR and let all table services to still use CONTENT mode.
| public static final String BLOB_INLINE_READ_FORCE_CONTENT_COLUMNS = | ||
| "hoodie.internal.read.blob.inline.force.content.columns"; |
There was a problem hiding this comment.
My understanding is that this internal config intends to differentiate the blob column reading of read_blob(col1) vs col2 in SELECT read_blob(col1), col2 FROM table so that col1 is read out with CONTENT and col2 is read out with DESCRIPTOR. However, Lance reader can only be configured with CONTENT or DESCRIPTOR mode for all blob columns. Should we revisit the expected behavior of hoodie.read.blob.inline.mode?
To simplify the experience, it makes sense for user to only have read_blob(col) or col in the same query where col is a blob column. If a mix of both exist, all blob columns are read out with content.
There was a problem hiding this comment.
Thus, all blob columns are read in DESCRIPTOR mode if no read_blob(col) is used. All blob columns are read in CONTENT mode if any read_blob(col) exists in the query.
| } | ||
| } | ||
|
|
||
| @transient private var cachedBlobDetection: (StructType, Set[Int]) = _ |
There was a problem hiding this comment.
Should we consider thread-safe?
| // Case 1: Inline — bytes are in field 1 | ||
| val bytes = accessor.getBytes(blobStruct, 1) | ||
| // Case 1: Inline — bytes are in field 1 (may be null in DESCRIPTOR mode) | ||
| val bytes = if (accessor.isNullAt(blobStruct, 1)) null else accessor.getBytes(blobStruct, 1) |
There was a problem hiding this comment.
Is there a case that read_blob() silently returns null to the user instead of throwing where it should not?
| } else if (blobColumns.contains(i)) { | ||
| InternalRow blobStruct = row.getStruct(i, 2); | ||
| // Expand {type, reference} → {type, null, reference} | ||
| GenericInternalRow expanded = new GenericInternalRow(3); |
There was a problem hiding this comment.
new GenericInternalRow(3) allocates per blob column per row on a hot path. For a 1M-row scan with 2 blob columns that's 2M short-lived heap allocations.
| } else if (blobColumns.contains(i)) { | ||
| InternalRow blobStruct = row.getStruct(i, 2); | ||
| // Expand {type, reference} → {type, null, reference} | ||
| GenericInternalRow expanded = new GenericInternalRow(3); |
There was a problem hiding this comment.
Also, the hardcoded 3 is fragile.
| case ff: HoodieFileGroupReaderBasedFileFormat | ||
| if ff.hoodieFileFormat == HoodieFileFormat.PARQUET && ff.isBlobDescriptorMode => | ||
| val matched: Set[String] = lr.output.collect { | ||
| case a: AttributeReference if readBlobAttrIds.contains(a.exprId) => a.name |
There was a problem hiding this comment.
Matching on a.name is fragile if users alias blob columns (SELECT read_blob(payload) AS p ...) or if attribute names are qualified differently between the plan and the file schema. The reader side compares against parquetReadStructType.fields(idx).name (literal Parquet column name) in SparkFileFormatInternalRowReaderContext.scala:123 and HoodieFileGroupReaderBasedFileFormat.scala:510. Any divergence silently misses the override and read_blob() returns null.
| } else { | ||
| val newOptions = rel.options + | ||
| (HoodieReaderConfig.BLOB_INLINE_READ_FORCE_CONTENT_COLUMNS -> matched.mkString(",")) | ||
| val newRel = HadoopFsRelation( |
There was a problem hiding this comment.
HadoopFsRelation's constructor signature has varied across Spark 3.3/3.4/3.5 (e.g. userSpecifiedSchema was added in 3.4). If Hudi still supports the older Spark profile this will fail to compile cross-version. Verify against pom.xml's Spark profile matrix, or use rel.copy(options = newOptions) if Scala's case-class copy is available on this Spark version.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18683 +/- ##
=============================================
- Coverage 68.08% 53.96% -14.12%
+ Complexity 28940 12449 -16491
=============================================
Files 2519 1434 -1085
Lines 140646 72208 -68438
Branches 17427 8259 -9168
=============================================
- Hits 95757 38968 -56789
+ Misses 37030 29738 -7292
+ Partials 7859 3502 -4357
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
When
hoodie.read.blob.inline.mode=DESCRIPTORis set with Parquet base files, leverage Parquet's nested column projection to skip reading the blobdatasub-column entirely (genuine I/O savings). Previously the config only affected Lance reads; Parquet still materialized the bytes.Approach mirrors the existing VECTOR column rewrite pattern in
HoodieFileGroupReaderBasedFileFormat:datasub-field from blob structs in the read schema.datafield back into output rows.Both COW (
HoodieFileGroupReaderBasedFileFormat.readBaseFile) and MOR (SparkFileFormatInternalRowReaderContext.getFileRecordIterator) paths are covered. Also adds a defensive null check inBatchedBlobReader.A naive DESCRIPTOR +
read_blob()would silently return null on Parquet (no byte-range channel like Lance). To keep the API consistent,ReadBlobRulenow downgrades any Parquet scan to CONTENT for queries that containread_blob(), while sibling queries on the same FileFormat instance keep DESCRIPTOR's I/O savings.Summary and Changelog
User-visible behavior
hoodie.read.blob.inline.mode=DESCRIPTORnow works on Parquet base files (COW + MOR), not just Lance, and skips the blobdataParquet column for real I/O savings.read_blob()keeps working under DESCRIPTOR on Parquet — the engine automatically downgrades the affected scan to CONTENT so bytes are materialized; sibling queries that don't useread_blob()still benefit from DESCRIPTOR.Detailed changelog
DESCRIPTOR-on-Parquet:
VectorConversionUtils: new helpersdetectBlobColumnsFromMetadata,stripBlobDataField,buildBlobNullPadRowMapper.HoodieFileGroupReaderBasedFileFormat:supportBatchreturns false when DESCRIPTOR is active and blob columns are present (row-level access required for null-padding).readBaseFilestrips thedatasub-field from the read schema and wraps the iterator withwrapWithBlobNullPadding.SparkFileFormatInternalRowReaderContext.getFileRecordIterator: same rewrite/pad on the MOR base-file path, driven by the Hadoop conf entry.HoodieReaderConfig.BLOB_INLINE_READ_MODE: docstring updated to describe Parquet semantics.BatchedBlobReader: defensive null check on thedatarow.HoodieHadoopFsRelationFactory: pass through the configured DESCRIPTOR flag.read_blob()override:HoodieFileGroupReaderBasedFileFormat: constructor flag renamedisBlobDescriptorMode→initialBlobDescriptorMode; new mutable_isBlobDescriptorModewithsetBlobDescriptorMode/restoreBlobDescriptorMode.buildReaderWithPartitionValuessyncs the Hadoop conf entry from the mutable flag so the MOR path agrees with the COW path after a flip.ReadBlobRule: walks each plan it sees; ifread_blob()(or an already-injectedBatchedBlobRead) is present, flips DESCRIPTOR→CONTENT on every Hudi ParquetLogicalRelation's FileFormat. Otherwise restores the construction-time value (handles shared FileFormat instances across queries against the same temp view). Lance scans are skipped.HoodieReaderConfig.BLOB_INLINE_READ_MODE: docstring updated to note the automatic downgrade.Tests added
TestReadBlobSQL.testParquetDescriptorSkipsDataColumn—@ParameterizedTestoverHoodieTableType(COW + MOR): asserts INLINE type preserved,datanull, reference null.TestReadBlobSQL.testReadBlobSupersedesDescriptorOnParquet—read_blob()materializes bytes despite DESCRIPTOR, then a follow-up query on the same view restores DESCRIPTOR's null-pad.TestReadBlobSQL.testReadBlobInWhereClauseUnderDescriptor— override engages whenread_blob()is inWHERE.TestReadBlobSQL.testMultiBlobColumnsDescriptorWholeScanDowngrade—read_blob()on one blob column also materializes bytes for unrelated blob columns in the same scan.TestReadBlobSQL.testDescriptorOnTableWithoutBlobColumns— DESCRIPTOR on a non-blob table is a no-op.TestVectorConversionUtilsBlob— unit tests fordetectBlobColumnsFromMetadata,stripBlobDataField, andbuildBlobNullPadRowMapper.Impact
hoodie.read.blob.inline.mode=DESCRIPTORnow skip the blobdataParquet column on reads, reducing I/O for tables with large inline blobs whose payload bytes aren't needed.read_blob()continues to work under DESCRIPTOR on Parquet (auto-downgraded per scan).HoodieReaderConfig.BLOB_INLINE_READ_MODEkeeps its key, default, and valid values; only the docstring is updated.read_blob(), the blob bytes column is no longer read or decoded. For DESCRIPTOR queries that do useread_blob(), behavior is the same as CONTENT mode (no regression).Risk Level
low
BLOB_INLINE_READ_MODEdefaults to CONTENT, so existing reads are unaffected. The DESCRIPTOR rewrite is gated on a metadata marker plus the user's explicit opt-in. Theread_blob()override mutates a per-FileFormat flag in place, which is single-JVM-safe; concurrent queries against the same temp view are sequenced through the optimizer rule. Coverage includes COW and MOR base-file paths plusWHERE-clause and multi-column scenarios.Documentation Update
HoodieReaderConfig.BLOB_INLINE_READ_MODEdocstring rewritten to describe Parquet semantics and the automaticread_blob()downgrade. No website doc changes are required since this is a refinement of an existing config; if the Hudi reader-config reference page is regenerated from the source javadoc, it picks up the new text automatically.Contributor's checklist