feat(variant): [DNM] auto-infer per-file variant shredding schemas on shredding inference#18961
feat(variant): [DNM] auto-infer per-file variant shredding schemas on shredding inference#18961voonhous wants to merge 18 commits into
Conversation
6e26ed9 to
7e260ad
Compare
|
Heads up on a latent bug inherited from the #18938 read path, found while testing inference here and fixed in this PR:
Fixed in this PR with shape-based detection anchored on the requested column: the requested side (from the table schema, logical type intact) must be a variant, and the on-disk side is matched by the shredded shape Worth noting why #18938 did not catch it: the new COW inference test here is the first to read a shredded base file end to end through the AVRO reader. The existing MOR compaction test compacts a log-only file group (no shredded base to read), and post-compaction queries go through the Spark native reader. |
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds per-file variant shredding schema inference for both the Avro record write path and the row writer, with a decorator-based file writer, an Avro<->VariantSchema bridge for write/rebuild, and classpath-detection of the Spark 4.1 InferVariantShreddingSchema provider. The implementation is generally careful (defensive copies, null/missing-field handling, identifier sanitization, precedence-aware schema splicing), but a few spots are worth a second look: a session-level SQLConf mutation inside HoodieHadoopFsRelationFactory that could affect other queries, a redundant delegate.close() pattern in VariantShreddingInferenceFileWriter, and an ambiguity in the Avro timestamp/local-timestamp logical-type mapping in Spark4VariantShreddingProvider. Please take a look at the inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of naming and ordering nits below, code is otherwise clean and well-documented. a couple of minor readability nits in Spark4VariantShreddingProvider — rest of the code is clean and well-commented.
| } | ||
| if ("timestamp-micros".equals(name)) { | ||
| return new VariantSchema.TimestampType(); | ||
| } |
There was a problem hiding this comment.
🤖 Mapping both timestamp-millis and timestamp-micros (and similarly local-* variants) to the same VariantSchema.TimestampType()/TimestampNTZType() looks suspicious — if Spark's VariantSchema.TimestampType has fixed micros precision (variant binary format stores µs), then a millis Long would be interpreted as µs and be off by 1000x. In the inferred-write flow this branch is dead (Hudi always generates timestamp-micros via HoodieSchema.createTimestampMicros()), but the read/rebuild path could in principle hit an Avro schema with millis. Is this branch intended as a true equivalence, or should millis schemas decline shredding (return null) so rebuild doesn't silently misinterpret values?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
There was a problem hiding this comment.
Good catch -- it's currently unreachable but a real latent trap, so I hardened it. The Variant binary spec stores timestamps in microseconds, so a timestamp-millis typed_value can't represent a variant timestamp; mapping it to the micros TimestampType would scale the value 1000x on rebuild. Made millis decline instead, so the value stays in the residual unshredded binary:
// The Variant binary spec stores timestamps in microseconds, so a millisecond-precision
// typed_value cannot represent a variant timestamp. Decline to shred it as a scalar (the value
// stays in the residual unshredded binary) rather than mapping it to the micros TimestampType,
// which would silently scale the value by 1000x.
if ("timestamp-millis".equals(name) || "local-timestamp-millis".equals(name)) {
return null;
}In practice this never fires (Hudi writes typed_value via createTimestampMicros, the read path only reads Hudi-shredded files, and the Spark inferrer produces micros from micros-only variant samples), so behavior is unchanged -- this just removes the silent-corruption path.
avroTypeToScalarType is shared write/read code and the concern is sharpest on the rebuild path, so this landed in the base PR #18938; it'll flow into this PR on the next rebase.
…pache#18931) Compaction/clustering reading an already-shredded base file via the AVRO record path now rebuilds the unshredded {metadata, value} variant before records reach the merger/writer, replacing the prior fail-fast guard. - VariantShreddingProvider: add rebuildVariantRecord (inverse of shredVariantRecord); Spark4VariantShreddingProvider implements it via Spark's ShreddingUtils.rebuild over Avro-backed ShreddedRow rows. - HoodieAvroParquetReader: detect shredded variant columns, read them at the file's shredded schema, and reconstruct to unshredded per record (VariantReconstruction); provider loaded via config or classpath, gated on hoodie.parquet.variant.allow.reading.shredded. - Extract stripVariantShredding into VariantSchemaUtils (shared by reader/writer). - Remove the read-then-reshred guard from HoodieAvroWriteSupport and its unit test. - Extend the MOR compaction test in TestVariantDataType to write shredded and read back (AVRO reconstruction + SPARK native via withRecordType).
The compaction base file is written by the AVRO shredding writer as [metadata, value, typed_value]. Spark 4.0's reorderVariantFields rebuilds that group as [value, metadata] and drops typed_value, so the native read after compaction fails with MALFORMED_VARIANT. Spark 4.1+ reads variant fields by name (SPARK-54410) and reconstructs correctly. Fixes the spark4.0 leg of apache#18931.
withSchema(replacement) rebuilds the field from replacement, which already mirrors the original nullability, so the intermediate makeNullable() was a no-op (and could reset field order for non-nullable fields).
VariantReconstruction and HoodieAvroFileWriterFactory each hardcoded the provider FQN and a Class.forName lookup. Move the candidate list and detection into a shared VariantShreddingProvider.detectProviderClassOnClasspath(), so a new provider impl is registered in one place.
…ruction Every other class in org.apache.hudi.io.storage.hadoop carries the Hoodie prefix (HoodieAvroParquetReader, HoodieHadoopIOFactory, ...); this was the lone exception. Package-private with a single caller, so the rename is contained.
…onstruction The helper unwraps a nullable union type, not a null-assertion guard; the new name reflects what it does at each call site.
…on provider When a requested column is shredded in the base file and reading shredded variants is enabled but no provider is loadable, create() returned null and the reader fell back to the unshredded requested schema. parquet-avro then reads value/metadata and drops typed_value, silently corrupting variants whose payload lived in typed_value. Throw HoodieException instead, mirroring the write path. Removes the now-unused logger.
metadata is REQUIRED in the shredded parquet schema, so a null metadata on the read path means a malformed base file. Returning null let the caller null out the whole variant column, silently dropping data; throw HoodieException instead. The separate null-record guard stays (genuine null variant passes through).
…back comment The Spark SQL read-back is gated on Spark 4.1+ because Spark 4.0's native parquet reader rejects the 3-field shredded layout (SPARK-54410) - not because of apache#18931. apache#18931 is the AVRO reader reconstruction and does not affect the Spark native read, so drop the misattributed pointer.
5e10346 to
86061dd
Compare
…d_value directly The scalar getters ignore ordinal and read typed_value directly while isNullAt/getBinary go through fieldNameFor(ordinal). Spark only calls the scalar getters for the scalar typed_value, so the asymmetry is intentional; add a comment so it does not read as an oversight.
6ebf27f to
0f7027b
Compare
…ype mapping avroTypeToScalarType mapped timestamp-millis / local-timestamp-millis to the micros-based TimestampType / TimestampNTZType. The Variant binary spec stores timestamps in microseconds, so a millis-precision typed_value would be read back as micros and scaled 1000x. Unreachable today (Hudi/Spark only produce micros typed_value), but decline to shred millis as a scalar so it can never silently corrupt; the value stays in the residual unshredded binary.
Today typed_value comes only from an explicit table schema or the test-only force-shredding DDL, so production tables never shred variants. This adds per-file inference of the shredding schema from the first records of each base file, for both HoodieRecordType paths and the bulk-insert row writer, reusing Spark 4.1's InferVariantShreddingSchema heuristics verbatim (SPARK-53659). Gated by hoodie.parquet.variant.shredding.schema.inference.enabled (default off); on Spark 4.0/Flink/Java the inferrer is absent from the classpath and writes silently stay unshredded. A buffering HoodieFileWriter decorator (mirroring Spark's ParquetOutputWriterWithVariantShredding: 4096 records / 64MB, infer once, replay in order) defers the parquet writer until the schema is known. The AVRO factory splices the inferred typed_value into the schema argument; the SPARK and row-writer factories splice a copied config because the row write support resolves its schema from hoodie.write.schema/hoodie.avro.schema. Inference failures decline to unshredded (a throwing inference must not fail compaction); writer-creation/replay failures latch and rethrow through close() so buffered records cannot be dropped silently. The Spark 4.1 inferrer batches all variant columns into one call (global width budget) and drops avro-illegal object keys, which legally fall back to the residual value column. Also fixes latent issues this feature would trip: recursive Variant.getPlainTypedValueSchema (depth>=2 objects, arrays, value-only wrappers), avro field-reuse (Field already used) in stripVariantShredding and VariantReconstruction, and the table-schema footer fallback now strips typed_value by shape so per-file layouts never leak into the resolved table schema. Stacked on apache#18938 (read-side reconstruction); part of apache#18937.
…i-variant tables
Fixes surfaced by the auto-inference COW test, which is the first to read a
shredded base file end to end:
- VariantReconstruction never engaged on real files: the reader's file schema
comes from converting the parquet footer MessageType, which loses the
variant logical type, so the shredded group was projected down to
{metadata, value} and typed_value was silently dropped on the AVRO
read-then-rewrite path. Detect the on-disk side by shape, anchored on the
requested column being a variant.
- The inferred-shredding config splice aliased every variant column: columns
share one record type named 'variant', which Avro serializes as name
references after the first occurrence, so replacing one column's record
with a same-named shredded definition shredded all of them on re-parse.
Spliced records now get a per-column unique name.
- getPlainTypedValueSchema named every nesting level '<name>_plain' with a
null namespace; for nested objects (every spec level is named typed_value)
the inner record became an Avro self-reference of its ancestor, which
Spark rejects as recursion. Names now chain the field path.
- HoodieRowParquetWriteSupport warned 'no corresponding HoodieSchema' for
every nullable variant column because the top-level check did not unwrap
the field's nullable union; shredding still happened via the nested
fallthrough, so this only silenced a misleading warning.
- Test fixes: cast(variant as string) on a string-typed variant extracts the
raw string (not its JSON form), and the decline column now uses per-row
empty objects: inference is per file and a multi-row insert can fan out to
one file per row, so cross-row type conflicts never reach one inference
call and cannot decline deterministically.
…ence is off resolveConfigSchema parses the avro schema string; gate it behind the inference flag so the default-off path adds no per-file cost.
…rk version PR CI runs on the merge ref against current master, whose spark4.2 profile satisfies gteqSpark4_1 but builds only hudi-spark4.2.x, which has no shredding-schema inferrer yet; the three inference tests then ran without one and failed their typed_value assertions on silently-unshredded files. Gate them on VariantShreddingRuntime.lookupInferrer() instead so any profile without an inferrer cancels rather than fails, and starts running again as soon as that version module ships its inferrer.
…riantShreddingRuntime The inference work centralizes engine-component classpath detection in VariantShreddingRuntime (provider + schema inferrer, memoized). The merge converged both call sites on it, leaving the interface helper added earlier in this stack unused; remove it and its CLASSPATH_CANDIDATES constant.
… assertions The inference tests cherry-picked here called the groupContainsField test helper, which the merged apache#18065 inlined to GroupType.containsField. The helper no longer exists, so the calls failed to compile; switch them to the inlined form.
…ose() On the success path the try already called delegate.close(); if it threw, the catch closed it again, relying on delegate close() being idempotent. Track a delegateClosed flag (set before the try-path close) so the catch only cleans up a delegate that materialize() created but never closed. Applied to both VariantShreddingInferenceFileWriter and its InternalRow sibling.
9aa0ac1 to
1593306
Compare
Describe the issue this Pull Request addresses
Closes #18937
Closes #18038
Hudi can write and read shredded variants, but
typed_valueonly ever comes from an explicit table schema or the test-only force-shredding DDL, so production tables never shred. Spark 4.1 infers a per-file shredding schema from the data (SPARK-53659, on by default), but that lives in Spark's own writer stack which Hudi bypasses.Stacked on #18938 (read-side reconstruction), which stacks on #18065. Do not merge until both land.
Summary and Changelog
When
hoodie.parquet.variant.shredding.schema.inference.enabledis set (defaultfalse), Hudi infers a shredding schema per base file from a sample of the records written to it, for both record types (SPARK, AVRO) and the bulk-insert row writer. Requires Spark 4.1+ on the writer classpath; Spark 4.0/Flink/Java silently keep writing unshredded.hoodie.parquet.variant.shredding.schema.inference.enabledinHoodieStorageConfig.VariantShreddingSchemaInferrerSPI in hudi-common, loaded by classpath detection (VariantShreddingRuntime, which also consolidates the duplicated provider-candidate arrays).Spark41VariantShreddingSchemaInferrer(hudi-spark4.1.x) delegates to Spark'sInferVariantShreddingSchema, so Hudi inherits Spark's heuristics verbatim (no code copied). One call per file covers all variant columns (preserves the global width budget); object keys that are not valid Avro names are dropped and legally fall back to the residualvaluecolumn.VariantShreddingInferenceFileWriter(+ a row-writer sibling): buffers up to 4096 records / 64MB (mirrors Spark'sParquetOutputWriterWithVariantShredding), infers once, creates the real writer with the inferredtyped_valuespliced in, then replays in order. Inference failures fall back to unshredded (a throwing inference must not fail compaction); writer-creation or replay failures latch and rethrow throughclose()so buffered records cannot be dropped silently.HoodieRowParquetWriteSupportresolves its schema fromhoodie.write.schema/hoodie.avro.schemarather than the factory argument.Variant.getPlainTypedValueSchemais now recursive (nested objects, arrays, value-only wrappers), Avro "Field already used" instripVariantShredding/VariantReconstruction, and the table-schema footer fallback now stripstyped_valueby shape so per-file layouts never leak into the resolved table schema.HoodieSchemarecursion; functional tests inTestVariantDataTypefor COW (multi-column with declines, update over a shredded base), MOR inline compaction, and the bulk-insert row writer, all gated on Spark 4.1+.Impact
No behavior change unless the new config is enabled. When enabled on Spark 4.1+, base files carry a per-file inferred
typed_value; readers already handle shredded files (#18938 on the AVRO path, Spark native otherwise). MOR log files always stay unshredded; shredding materializes at compaction. Flipping the default on is a follow-up tracked in #18937.Risk Level
Low. The feature is off by default and engines without the Spark 4.1 inferrer on the classpath are unaffected even when it is on. Verified with new unit and functional tests across both record types and all three write paths, plus compile checks under the spark3.5, spark4.0 and spark4.1 profiles.
Documentation Update
New config documented via its
withDocumentationtext (picked up by the generated config reference). Website updates deferred to the default-flip follow-up.Contributor's checklist