perf(common): Avoid per-record HoodieSchema rebuilds on Avro read/merge paths#18967
perf(common): Avoid per-record HoodieSchema rebuilds on Avro read/merge paths#18967voonhous wants to merge 11 commits into
Conversation
…t field access getFieldValueFromIndexedRecord wrapped record.getSchema() in a fresh HoodieSchema on every call, which rebuilt the full field list and field map (one HoodieSchemaField per column plus a HashMap collect) and split the field path, per record per accessed field in the file group reader merge path. Intern the wrapper through HoodieSchemaCache instead, so the canonical instance's lazily built field list and field map are reused across calls and the per-record cost drops to a cache hit. Single-segment field names, the overwhelmingly common case, also skip the path split. Lookup semantics are unchanged since the traversal still goes through HoodieSchema#getNonNullType and #getField, keeping HoodieSchema as the type system facade. Since interned instances are shared across executor task threads, HoodieSchema's lazily built field list and field map are now published through immutable wrappers (final-field freeze) so a racing reader can never observe a non-null map with invisible entries and silently miss an existing field.
f458f0d to
2e89442
Compare
…ecked locking Addresses review feedback on the safe-publication change: make the cache fields volatile and take the monitor only on the miss path, so reads stay lock-free on the hot path while initialization is lock-guarded (no duplicate builds). Same pattern as org.apache.hudi.common.util.Lazy#get.
String.split already fast-paths the two-character pattern, so after interning the fast path only saved one small array allocation per call; not worth the extra branch.
The cache fields are volatile and write-once, so reading them directly after the null check is safe; the locals only saved a volatile re-read.
…rapper construction HoodieSchema.fromAvroSchema still ran per record to build the intern probe key. HoodieSchemaCache.intern(Schema) is backed by a weak identity-keyed cache: records of one file share the same Avro Schema instance, so the per-record path becomes a single cache hit with no wrapper allocation or type dispatch. Misses convert and value-intern, so equal but distinct Avro schema instances still converge on one canonical HoodieSchema.
…hemaCache Move the Avro Schema -> HoodieSchema cache out of HoodieSchemaCache into a dedicated AvroToHoodieSchemaCache class; misses still value-intern through HoodieSchemaCache. AvroRecordContext now uses the new class. HoodieSchemaCache is back to interning HoodieSchema only.
…cy single-check getFields()/getFieldMap() build an immutable, deterministic view of the schema's fields, so concurrent first-callers can each build it once and converge on equal results. Keep the volatile fields so the unmodifiable collections (wrapping non-final ArrayList/HashMap) are still published safely; drop the synchronized blocks.
…ema sites Audit of all HoodieSchema.fromAvroSchema(...) call sites for per-record rebuilds (follow-up to the AvroRecordContext change). Switch the genuinely per-record sites to AvroToHoodieSchemaCache.intern(...): - SparkFileFormatInternalRecordContext.convertAvroRecord - FlinkRecordContext.convertAvroRecord - RealtimeCompactedRecordReader.mergeRecord (two calls) - HoodieAvroUtils.getRecordColumnValues - HoodieJsonPayload.getInsertValue - ExpressionPayload MERGE-INTO eval paths And hoist the loop-invariant fromAvroSchema(schema) out of the per-record write loop in HoodieAvroDataBlock#getBytes. Interning returns an equal canonical HoodieSchema and improves downstream schema-keyed cache hit rates; cold/one-time and per-block sites are left unchanged.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR avoids per-record HoodieSchema rebuilds on Avro read/merge hot paths by introducing an Avro-schema-keyed intern cache (AvroToHoodieSchemaCache) and making the lazily built fields/fieldMap caches on HoodieSchema safely publishable across threads. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One comment on a misleading field-level comment in HoodieSchema.java; the rest of the changes are clean.
cc @yihua
| // Interning returns the canonical wrapper for this schema, whose lazily built field list and | ||
| // field map survive across calls, so the per-record cost is a cache hit instead of an | ||
| // O(schema width) wrapper rebuild. | ||
| HoodieSchema currentSchema = AvroToHoodieSchemaCache.intern(record.getSchema()); |
There was a problem hiding this comment.
do you think it makes sense we move the cache inside HoodieSchema just for purposes of call HoodieSchema.fromAvroSchema, it is risky to expose it since not everyone can really call this intern explicitly, then we can remove the class AvroToHoodieSchemaCache I think.
There was a problem hiding this comment.
Yeah, this make sense. It makes future maintenance easier too when we want to remove AvroSchema entirely as the conversion is centralise in a location.
HoodieSchema class is a little too big. I will prolly put it in HoodieSchemaCache.
There was a problem hiding this comment.
Done - moved the cache into HoodieSchema.fromAvroSchema (raw conversion split into a private convertFromAvroSchema; the loader value-interns via HoodieSchemaCache) and removed AvroToHoodieSchemaCache. All call sites just use fromAvroSchema now, so the interning is transparent.
There was a problem hiding this comment.
I tried folding the cache into HoodieSchema.fromAvroSchema, but it breaks TestHoodieSchemaUtils#testIllegalPromotionsBetweenPrimitives, so I'm reverting it and keeping AvroToHoodieSchemaCache as the explicit intern at the per-record call sites.
org.opentest4j.AssertionFailedError:
Expected :true
Actual :false
at org.apache.hudi.TestHoodieSchemaUtils.testIllegalPromotionsBetweenPrimitives(TestHoodieSchemaUtils.java:167)
That test deduces the writer schema for a reader record carrying bytes at four field locations (rec.simpleField, rec.arrayField.element, rec.mapField.value, rec.nestedField.nested) against a writer carrying int at the same four, and asserts the resulting SchemaBackwardsCompatibilityException names all four paths.
HoodieSchemaCompatibilityChecker memoizes results in mMemoizeMap keyed by ReaderWriter, which compares the HoodieSchema instances by reference:
public int hashCode() { return System.identityHashCode(mReader) ^ System.identityHashCode(mWriter); }
public boolean equals(Object obj) { ... return (this.mReader == that.mReader) && (this.mWriter == that.mWriter); }With fromAvroSchema interning, all four bytes resolve to one canonical instance and all four int to one, so the four locations become the same (bytes, int) identity pair. The first one (rec.simpleField) is computed and memoized; the other three hit the memo and reuse that result without recording their own locations, so the message only lists rec.simpleField and the assertion fails for the other three. (The debug log shows four "reader bytes with writer int" lines because the log sits above the memo check, but calculateCompatibility only runs once.)
The checker's recursion memo relies on a distinct schema instance per occurrence - that is how it tells genuine recursion from a sibling subschema of the same shape. The interning is only needed for the per-record read path, not this one-shot compatibility check, so keeping it at the explicit call sites avoids the problem. We could instead make the checker track locations independently of instance identity, but that is a riskier change to a core path (the memo also guards against exponential recomputation on shared-subschema DAGs) - happy to go that route if you would prefer it.
2c0796b to
89c508b
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the iteration here — folding the cache into HoodieSchema.fromAvroSchema keeps the public surface area minimal and reads cleanly. However, the latest commit that dropped AvroToHoodieSchemaCache looks like it missed two Scala call sites that still import and call it; flagged inline. Worth a quick build to confirm. Please take a look at the inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of comments around naming consistency and an inaccurate concurrency note.
danny0405
left a comment
There was a problem hiding this comment.
+1, let's make the CI green.
| if (avroSchema == null) { | ||
| return null; | ||
| } | ||
| return AVRO_SCHEMA_CACHE.get(avroSchema); |
There was a problem hiding this comment.
looks like we got a lot of test failures, not sure if there are some thread-safety issues.
There was a problem hiding this comment.
Test failures from this: #18967 (comment)
I've reverted this suggested improvement, it will introduce a behaviour change.
There was a problem hiding this comment.
what's the difference, lookes like just some code moving around?
There was a problem hiding this comment.
After we shifted the code to use AVRO_SCHEMA_CACHE in HoodieSchema, the test:
org.apache.hudi.TestHoodieSchemaUtils#testIllegalPromotionsBetweenPrimitives is failing with the error:
769 [main] DEBUG org.apache.hudi.common.schema.HoodieSchemaCompatibilityChecker [] - Checking compatibility of reader {"type":"record","name":"rec","fields":[{"name":"simpleField","type":"bytes"},{"name":"arrayField","type":{"type":"array","items":"bytes"}},{"name":"mapField","type":{"type":"map","values":"bytes"}},{"name":"nestedField","type":{"type":"record","name":"nestedField","fields":[{"name":"nested","type":"bytes"}]}}]} with writer {"type":"record","name":"rec","fields":[{"name":"simpleField","type":"int"},{"name":"arrayField","type":{"type":"array","items":"int"}},{"name":"mapField","type":{"type":"map","values":"int"}},{"name":"nestedField","type":{"type":"record","name":"nestedField","fields":[{"name":"nested","type":"int"}]}}]}
775 [main] DEBUG org.apache.hudi.common.schema.HoodieSchemaCompatibilityChecker [] - Checking compatibility of reader "bytes" with writer "int"
777 [main] DEBUG org.apache.hudi.common.schema.HoodieSchemaCompatibilityChecker [] - Checking compatibility of reader {"type":"array","items":"bytes"} with writer {"type":"array","items":"int"}
777 [main] DEBUG org.apache.hudi.common.schema.HoodieSchemaCompatibilityChecker [] - Checking compatibility of reader "bytes" with writer "int"
777 [main] DEBUG org.apache.hudi.common.schema.HoodieSchemaCompatibilityChecker [] - Checking compatibility of reader {"type":"map","values":"bytes"} with writer {"type":"map","values":"int"}
777 [main] DEBUG org.apache.hudi.common.schema.HoodieSchemaCompatibilityChecker [] - Checking compatibility of reader "bytes" with writer "int"
777 [main] DEBUG org.apache.hudi.common.schema.HoodieSchemaCompatibilityChecker [] - Checking compatibility of reader {"type":"record","name":"nestedField","fields":[{"name":"nested","type":"bytes"}]} with writer {"type":"record","name":"nestedField","fields":[{"name":"nested","type":"int"}]}
777 [main] DEBUG org.apache.hudi.common.schema.HoodieSchemaCompatibilityChecker [] - Checking compatibility of reader "bytes" with writer "int"
org.opentest4j.AssertionFailedError:
Expected :true
Actual :false
<Click to see difference>
Why it breaks (it is not just code moving around)
The change flips HoodieSchema.fromAvroSchema from returning a fresh wrapper on every call to returning a shared, interned instance. HoodieSchemaCompatibilityChecker depends on that per-call freshness, so its result changes even though the call sites look the same.
The memo key, ReaderWriter, compares the HoodieSchema instances by pointer identity:
public int hashCode() { return System.identityHashCode(mReader) ^ System.identityHashCode(mWriter); }
public boolean equals(Object obj) { ... return (this.mReader == that.mReader) && (this.mWriter == that.mWriter); }The checker memoizes results in mMemoizeMap keyed by ReaderWriter: it computes a (reader, writer) pair once, stores the result (with the field location captured on that first visit), and returns that same result for any later pair that is ==.
The test builds a reader record with bytes at four field locations (simpleField, arrayField.element, mapField.value, nestedField.nested) and a writer with int at the same four. Each bytes vs int pair is a TYPE_MISMATCH that must be reported with its own field path, and TestHoodieSchemaUtils.java:167 asserts all four paths appear in the exception message.
Before the change: fromAvroSchema returned a fresh HoodieSchema per call, so the bytes at simpleField and the bytes at arrayField.element were distinct instances. Four distinct (bytes, int) identity pairs, so calculateCompatibility runs four times, producing four incompatibilities (one per field path); the message lists all four and the test passes.
After the change: fromAvroSchema interns, so all four bytes collapse to one canonical instance and all four int to one. Now all four locations are the same (canonicalBytes, canonicalInt) identity pair. The first one (rec.simpleField) is computed and memoized; the other three hit the memo and reuse that result without recording their own locations. The message ends up with only rec.simpleField, so the assertion fails for the other three.
That is also why the debug log shows four "reader bytes with writer int" lines while the message is short: log.debug runs at the top of getCompatibility on every call, but calculateCompatibility (which records the location) only runs once; the other three short-circuit on the memo.
In short: the checker's recursion memo assumes a distinct schema instance per occurrence (that is how it tells genuine recursion from a sibling subschema of the same shape). Interning inside fromAvroSchema makes structurally-equal sibling subschemas share identity, so the checker conflates them and under-reports. The interning is only needed on the per-record read path, not this one-shot compatibility check, so keeping it at the explicit AvroToHoodieSchemaCache.intern call sites avoids the problem.
There was a problem hiding this comment.
still feel we could cache the schema just inside HoodieSchema, the behavior is correct right? is it possible we fix the tests.
89c508b to
31824fe
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR interns Avro Schema → HoodieSchema conversions on hot read/merge paths via a new AvroToHoodieSchemaCache, makes HoodieSchema's lazy field list/field map caches thread-safe with volatile publication + unmodifiable wrappers, and hoists schema conversion out of the per-record loop in HoodieAvroDataBlock. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of naming/consistency nits below.
cc @yihua
Switch the hoisted HoodieSchema.fromAvroSchema(schema) in getBytes to AvroToHoodieSchemaCache.intern(schema). This matches every other site touched in this PR and reuses one cached, value-interned instance across getBytes calls, keeping HoodieSchema identity stable for the downstream per-record caches instead of rebuilding per block.
Match the Hoodie-prefix convention used by every other class in the org.apache.hudi.common.schema package (HoodieSchema, HoodieSchemaCache, HoodieSchemaField, ...). Pure rename of the class and its 8 referencing files; no behavior change.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18967 +/- ##
============================================
- Coverage 68.25% 67.63% -0.63%
- Complexity 29509 29780 +271
============================================
Files 2542 2563 +21
Lines 142632 145157 +2525
Branches 17789 18338 +549
============================================
+ Hits 97354 98173 +819
- Misses 37271 38767 +1496
- Partials 8007 8217 +210
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR caches the Avro → HoodieSchema conversion to avoid per-record wrapper rebuilds on hot read/merge paths, with volatile field/fieldMap publication in HoodieSchema for safe lazy initialization. No issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review.
cc @yihua
Describe the issue this Pull Request addresses
Closes #18966
HoodieSchema.fromAvroSchema(record.getSchema())is called on the hottest Avro read/merge paths - once per record (and, for field access, per accessed field). Each call allocates a freshHoodieSchemaand, on first field access, rebuilds the full field list and field map (oneHoodieSchemaFieldper column plus a HashMap collect, i.e. O(schema width) allocations), with nothing cached between calls.AvroRecordContext#getFieldValueFromIndexedRecord(the field accessor behindRecordContext#getValuefor the Avro engine) is the worst case, but the same per-record rebuild shows up on several other read/merge paths.Summary and Changelog
Intern the Avro-schema ->
HoodieSchemaconversion so the canonical wrapper's lazily built field list and field map are reused across calls; the per-record cost drops to a cache hit. This keepsHoodieSchemaas the type-system facade rather than bypassing it with raw Avro traversal.AvroToHoodieSchemaCache(inorg.apache.hudi.common.schema): an Avro-Schema-keyed cache (weakKeys, identity lookups - records of one file share the sameSchemainstance) that on a miss converts and value-interns throughHoodieSchemaCache, so equal-but-distinct Avro schema instances still converge on one canonicalHoodieSchema. Kept separate fromHoodieSchemaCache(which internsHoodieSchema) and distinct from the existingorg.apache.hudi.avro.AvroSchemaCache(Avro -> Avro).HoodieSchema#getFields/#getFieldMap: the lazily built field list/map are cached involatilefields and published with a benign racy single-check. Previously the field map was a plainHashMapon a non-volatile field, so a racing reader could observe a non-null map with invisible entries and silently miss an existing field;volatileplus the immutableCollections.unmodifiable*wrappers fix that, while the harmless duplicate-build race under contention remains by design.fromAvroSchema(...)call sites:AvroRecordContext#getFieldValueFromIndexedRecord,SparkFileFormatInternalRecordContext#convertAvroRecord,FlinkRecordContext#convertAvroRecord,RealtimeCompactedRecordReader#mergeRecord,HoodieAvroUtils#getRecordColumnValues,HoodieJsonPayload#getInsertValue, and theExpressionPayloadMERGE-INTO evaluator / deserializer / serializer / joinRecords paths.HoodieAvroDataBlock#getBytes: thefromAvroSchema(schema)was loop-invariant, so it is hoisted out of the per-record write loop.FileGroupRecordBuffer#composeEvolvedSchemaTransformerare left unchanged.TestAvroRecordContext: top-level and nested access, nullable record unions, missing fields, non-unwrappable unions, the empty-name guard, and intern canonicalization across equal-but-distinct schema instances.Impact
Performance: removes O(schema width) allocations per record from the hottest Avro read/merge paths; for a 200-column table this was hundreds of allocations per record (per accessed field in the
AvroRecordContextcase). Interning also makes the resultingHoodieSchemacanonical, improving hit rates for the downstream schema-keyed caches (deserializer / serializer / evaluator maps). Results and exceptions are unchanged. No public API change.Risk Level
Low. The lookup paths are the same
HoodieSchemacode as before, with caching layered on via interning; interning returns an equal canonical instance, and the safe-publication change is covered by the existingTestHoodieSchemasuite and the extendedTestAvroRecordContext.Documentation Update
None.
Contributor's checklist