From 2e89442dbe084ab903abb1b4741433ba2d460830 Mon Sep 17 00:00:00 2001 From: voon Date: Wed, 10 Jun 2026 22:43:00 +0800 Subject: [PATCH 01/11] perf(common): Avoid per-call HoodieSchema rebuild in AvroRecordContext field access getFieldValueFromIndexedRecord wrapped record.getSchema() in a fresh HoodieSchema on every call, which rebuilt the full field list and field map (one HoodieSchemaField per column plus a HashMap collect) and split the field path, per record per accessed field in the file group reader merge path. Intern the wrapper through HoodieSchemaCache instead, so the canonical instance's lazily built field list and field map are reused across calls and the per-record cost drops to a cache hit. Single-segment field names, the overwhelmingly common case, also skip the path split. Lookup semantics are unchanged since the traversal still goes through HoodieSchema#getNonNullType and #getField, keeping HoodieSchema as the type system facade. Since interned instances are shared across executor task threads, HoodieSchema's lazily built field list and field map are now published through immutable wrappers (final-field freeze) so a racing reader can never observe a non-null map with invisible entries and silently miss an existing field. --- .../apache/hudi/avro/AvroRecordContext.java | 11 ++- .../hudi/common/schema/HoodieSchema.java | 26 ++++--- .../hudi/avro/TestAvroRecordContext.java | 68 +++++++++++++++++++ 3 files changed, 96 insertions(+), 9 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java index def6e6a7003fc..3d34e5f2d5f1e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaCache; import org.apache.hudi.common.schema.HoodieSchemaField; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.read.BufferedRecord; @@ -70,8 +71,16 @@ public AvroRecordContext() { public static Object getFieldValueFromIndexedRecord( IndexedRecord record, String fieldName) { - HoodieSchema currentSchema = HoodieSchema.fromAvroSchema(record.getSchema()); + // Interning returns the canonical wrapper for this schema, whose lazily built field list and + // field map survive across calls, so the per-record cost is a cache hit instead of an + // O(schema width) wrapper rebuild. + HoodieSchema currentSchema = HoodieSchemaCache.intern(HoodieSchema.fromAvroSchema(record.getSchema())); IndexedRecord currentRecord = record; + if (fieldName.indexOf('.') < 0) { + // single-segment field names are the overwhelmingly common case; skip the path split + Option fieldOpt = currentSchema.getNonNullType().getField(fieldName); + return fieldOpt.isEmpty() ? null : currentRecord.get(fieldOpt.get().pos()); + } String[] path = fieldName.split("\\."); for (int i = 0; i < path.length; i++) { currentSchema = currentSchema.getNonNullType(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java index 0b0978d552a23..d793b285fa212 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java @@ -1152,10 +1152,15 @@ public List getFields() { if (!hasFields()) { throw new IllegalStateException("Cannot get fields from schema type: " + type); } - if (fields == null) { - fields = Collections.unmodifiableList(avroSchema.getFields().stream().map(HoodieSchemaField::new).collect(Collectors.toList())); + // interned instances are shared across threads, so publish through an immutable wrapper + // (final-field freeze) and read/write the non-volatile cache field exactly once; a racy + // duplicate build is benign + List localFields = fields; + if (localFields == null) { + localFields = Collections.unmodifiableList(avroSchema.getFields().stream().map(HoodieSchemaField::new).collect(Collectors.toList())); + fields = localFields; } - return fields; + return localFields; } /** @@ -1195,11 +1200,16 @@ public Option getField(String name) { } private Map getFieldMap() { - if (fieldMap == null) { - fieldMap = getFields().stream() - .collect(Collectors.toMap(HoodieSchemaField::name, field -> field)); - } - return fieldMap; + // same publication pattern as getFields(): without the immutable wrapper, a thread racing on + // the plain HashMap could see a non-null map whose entries are not yet visible and miss a + // field that exists + Map localFieldMap = fieldMap; + if (localFieldMap == null) { + localFieldMap = Collections.unmodifiableMap(getFields().stream() + .collect(Collectors.toMap(HoodieSchemaField::name, field -> field))); + fieldMap = localFieldMap; + } + return localFieldMap; } /** diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroRecordContext.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroRecordContext.java index b84738684b16c..04c7ae3c2bda7 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroRecordContext.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroRecordContext.java @@ -19,14 +19,21 @@ package org.apache.hudi.avro; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import java.util.stream.Stream; +import static org.apache.hudi.avro.AvroRecordContext.getFieldValueFromIndexedRecord; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; class TestAvroRecordContext { @@ -44,4 +51,65 @@ void testConvertValueToEngineType(Comparable input, Comparable expected) { Comparable actual = AvroRecordContext.getFieldAccessorInstance().convertValueToEngineType(input); assertEquals(expected, actual); } + + private static final Schema RECORD_SCHEMA = new Schema.Parser().parse( + "{\"type\":\"record\",\"name\":\"top\",\"fields\":[" + + "{\"name\":\"id\",\"type\":\"int\"}," + + "{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}," + + "{\"name\":\"address\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"address\",\"fields\":[" + + "{\"name\":\"city\",\"type\":\"string\"}," + + "{\"name\":\"zip\",\"type\":[\"null\",\"int\"],\"default\":null}]}],\"default\":null}," + + "{\"name\":\"multi\",\"type\":[\"null\",\"string\",\"int\"],\"default\":null}]}"); + + private static GenericRecord buildRecord() { + GenericRecord address = new GenericData.Record(RECORD_SCHEMA.getField("address").schema().getTypes().get(1)); + address.put("city", new Utf8("sf")); + address.put("zip", 94105); + GenericRecord record = new GenericData.Record(RECORD_SCHEMA); + record.put("id", 1); + record.put("name", new Utf8("alice")); + record.put("address", address); + return record; + } + + @Test + void testGetFieldValueTopLevel() { + GenericRecord record = buildRecord(); + assertEquals(1, getFieldValueFromIndexedRecord(record, "id")); + assertEquals(new Utf8("alice"), getFieldValueFromIndexedRecord(record, "name")); + assertNull(getFieldValueFromIndexedRecord(record, "multi")); + assertNull(getFieldValueFromIndexedRecord(record, "missing")); + } + + @Test + void testGetFieldValueNested() { + GenericRecord record = buildRecord(); + // intermediate segment unwraps the [null, record] union + assertEquals(new Utf8("sf"), getFieldValueFromIndexedRecord(record, "address.city")); + assertEquals(94105, getFieldValueFromIndexedRecord(record, "address.zip")); + assertNull(getFieldValueFromIndexedRecord(record, "address.missing")); + assertNull(getFieldValueFromIndexedRecord(record, "missing.nested")); + } + + @Test + void testGetFieldValueErrorCases() { + GenericRecord record = buildRecord(); + // a union that is not [null, T] does not support field lookups + assertThrows(IllegalStateException.class, () -> getFieldValueFromIndexedRecord(record, "multi.sub")); + assertThrows(IllegalArgumentException.class, () -> getFieldValueFromIndexedRecord(record, "")); + } + + @Test + void testGetFieldValueAcrossEqualSchemaInstances() { + // records from different files carry equal but distinct schema instances; both must intern to + // the same canonical wrapper and resolve identically + Schema schemaCopy = new Schema.Parser().parse(RECORD_SCHEMA.toString()); + GenericRecord record = buildRecord(); + GenericRecord recordWithCopy = new GenericData.Record(schemaCopy); + for (Schema.Field field : RECORD_SCHEMA.getFields()) { + recordWithCopy.put(field.pos(), record.get(field.pos())); + } + assertEquals(getFieldValueFromIndexedRecord(record, "id"), getFieldValueFromIndexedRecord(recordWithCopy, "id")); + assertEquals(getFieldValueFromIndexedRecord(record, "address.city"), getFieldValueFromIndexedRecord(recordWithCopy, "address.city")); + } } From fe685b1f13d04909bb665b33ca7c58ef7048bd89 Mon Sep 17 00:00:00 2001 From: voon Date: Wed, 10 Jun 2026 23:56:06 +0800 Subject: [PATCH 02/11] review: Guard lazy field list/field map initialization with double-checked locking Addresses review feedback on the safe-publication change: make the cache fields volatile and take the monitor only on the miss path, so reads stay lock-free on the hot path while initialization is lock-guarded (no duplicate builds). Same pattern as org.apache.hudi.common.util.Lazy#get. --- .../hudi/common/schema/HoodieSchema.java | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java index d793b285fa212..1c998bf960fad 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java @@ -313,8 +313,10 @@ private static void addVectorColumnName(String s, int start, int end, Set fields; - private transient Map fieldMap; + // interned instances are shared across threads, so the lazily built caches use double-checked + // locking: lock-free volatile reads on the hot path, initialization guarded by the monitor + private transient volatile List fields; + private transient volatile Map fieldMap; // Register the Variant logical type with Avro static { @@ -1152,13 +1154,15 @@ public List getFields() { if (!hasFields()) { throw new IllegalStateException("Cannot get fields from schema type: " + type); } - // interned instances are shared across threads, so publish through an immutable wrapper - // (final-field freeze) and read/write the non-volatile cache field exactly once; a racy - // duplicate build is benign List localFields = fields; if (localFields == null) { - localFields = Collections.unmodifiableList(avroSchema.getFields().stream().map(HoodieSchemaField::new).collect(Collectors.toList())); - fields = localFields; + synchronized (this) { + localFields = fields; + if (localFields == null) { + localFields = Collections.unmodifiableList(avroSchema.getFields().stream().map(HoodieSchemaField::new).collect(Collectors.toList())); + fields = localFields; + } + } } return localFields; } @@ -1200,14 +1204,16 @@ public Option getField(String name) { } private Map getFieldMap() { - // same publication pattern as getFields(): without the immutable wrapper, a thread racing on - // the plain HashMap could see a non-null map whose entries are not yet visible and miss a - // field that exists Map localFieldMap = fieldMap; if (localFieldMap == null) { - localFieldMap = Collections.unmodifiableMap(getFields().stream() - .collect(Collectors.toMap(HoodieSchemaField::name, field -> field))); - fieldMap = localFieldMap; + synchronized (this) { + localFieldMap = fieldMap; + if (localFieldMap == null) { + localFieldMap = Collections.unmodifiableMap(getFields().stream() + .collect(Collectors.toMap(HoodieSchemaField::name, field -> field))); + fieldMap = localFieldMap; + } + } } return localFieldMap; } From 87e1dc28ae79e634bbcea718a31608967f68a00d Mon Sep 17 00:00:00 2001 From: voon Date: Thu, 11 Jun 2026 13:16:19 +0800 Subject: [PATCH 03/11] review: Drop the single-segment fast path String.split already fast-paths the two-character pattern, so after interning the fast path only saved one small array allocation per call; not worth the extra branch. --- .../main/java/org/apache/hudi/avro/AvroRecordContext.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java index 3d34e5f2d5f1e..96a243fbf9c0e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java @@ -76,11 +76,6 @@ public static Object getFieldValueFromIndexedRecord( // O(schema width) wrapper rebuild. HoodieSchema currentSchema = HoodieSchemaCache.intern(HoodieSchema.fromAvroSchema(record.getSchema())); IndexedRecord currentRecord = record; - if (fieldName.indexOf('.') < 0) { - // single-segment field names are the overwhelmingly common case; skip the path split - Option fieldOpt = currentSchema.getNonNullType().getField(fieldName); - return fieldOpt.isEmpty() ? null : currentRecord.get(fieldOpt.get().pos()); - } String[] path = fieldName.split("\\."); for (int i = 0; i < path.length; i++) { currentSchema = currentSchema.getNonNullType(); From 41c7d36ac8a73f88aa148780cc06fd591f841df3 Mon Sep 17 00:00:00 2001 From: voon Date: Thu, 11 Jun 2026 13:20:47 +0800 Subject: [PATCH 04/11] review: Drop the DCL local variables The cache fields are volatile and write-once, so reading them directly after the null check is safe; the locals only saved a volatile re-read. --- .../hudi/common/schema/HoodieSchema.java | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java index 1c998bf960fad..f1c7726e74c91 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java @@ -1154,17 +1154,14 @@ public List getFields() { if (!hasFields()) { throw new IllegalStateException("Cannot get fields from schema type: " + type); } - List localFields = fields; - if (localFields == null) { + if (fields == null) { synchronized (this) { - localFields = fields; - if (localFields == null) { - localFields = Collections.unmodifiableList(avroSchema.getFields().stream().map(HoodieSchemaField::new).collect(Collectors.toList())); - fields = localFields; + if (fields == null) { + fields = Collections.unmodifiableList(avroSchema.getFields().stream().map(HoodieSchemaField::new).collect(Collectors.toList())); } } } - return localFields; + return fields; } /** @@ -1204,18 +1201,15 @@ public Option getField(String name) { } private Map getFieldMap() { - Map localFieldMap = fieldMap; - if (localFieldMap == null) { + if (fieldMap == null) { synchronized (this) { - localFieldMap = fieldMap; - if (localFieldMap == null) { - localFieldMap = Collections.unmodifiableMap(getFields().stream() + if (fieldMap == null) { + fieldMap = Collections.unmodifiableMap(getFields().stream() .collect(Collectors.toMap(HoodieSchemaField::name, field -> field))); - fieldMap = localFieldMap; } } } - return localFieldMap; + return fieldMap; } /** From b7d8a13f39378f6e163f96b29f608fdd304c6985 Mon Sep 17 00:00:00 2001 From: voon Date: Fri, 12 Jun 2026 13:35:38 +0800 Subject: [PATCH 05/11] review: Add Avro-schema-keyed intern overload so the hot path skips wrapper construction HoodieSchema.fromAvroSchema still ran per record to build the intern probe key. HoodieSchemaCache.intern(Schema) is backed by a weak identity-keyed cache: records of one file share the same Avro Schema instance, so the per-record path becomes a single cache hit with no wrapper allocation or type dispatch. Misses convert and value-intern, so equal but distinct Avro schema instances still converge on one canonical HoodieSchema. --- .../apache/hudi/avro/AvroRecordContext.java | 2 +- .../hudi/common/schema/HoodieSchemaCache.java | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java index 96a243fbf9c0e..42e9f413d8ec2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java @@ -74,7 +74,7 @@ public static Object getFieldValueFromIndexedRecord( // Interning returns the canonical wrapper for this schema, whose lazily built field list and // field map survive across calls, so the per-record cost is a cache hit instead of an // O(schema width) wrapper rebuild. - HoodieSchema currentSchema = HoodieSchemaCache.intern(HoodieSchema.fromAvroSchema(record.getSchema())); + HoodieSchema currentSchema = HoodieSchemaCache.intern(record.getSchema()); IndexedRecord currentRecord = record; String[] path = fieldName.split("\\."); for (int i = 0; i < path.length; i++) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCache.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCache.java index d96710bf52caf..96f4fe93f694a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCache.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCache.java @@ -20,6 +20,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; +import org.apache.avro.Schema; /** * A global cache for HoodieSchema instances to ensure that there is only one @@ -36,6 +37,14 @@ public class HoodieSchemaCache { private static final LoadingCache SCHEMA_CACHE = Caffeine.newBuilder().weakValues().maximumSize(1024).build(k -> k); + // Avro-schema-keyed view onto the cache above for per-record call sites: weakKeys gives + // identity-based lookups (records of one file share the same Schema instance), so the hot path + // is a single cache hit with no wrapper allocation or type dispatch. Misses convert and then + // value-intern, so equal but distinct Avro schema instances still converge on one canonical + // HoodieSchema. + private static final LoadingCache AVRO_SCHEMA_CACHE = + Caffeine.newBuilder().weakKeys().maximumSize(1024).build(avroSchema -> intern(HoodieSchema.fromAvroSchema(avroSchema))); + /** * Get schema variable from global cache. If not found, put it into the cache and then return it. * @@ -45,4 +54,15 @@ public class HoodieSchemaCache { public static HoodieSchema intern(HoodieSchema schema) { return SCHEMA_CACHE.get(schema); } + + /** + * Returns the canonical {@link HoodieSchema} wrapping the given Avro schema, converting and + * interning it on first use. + * + * @param avroSchema Avro schema to look up + * @return the canonical HoodieSchema for the given Avro schema + */ + public static HoodieSchema intern(Schema avroSchema) { + return AVRO_SCHEMA_CACHE.get(avroSchema); + } } From 462a91a92898904d89e32dc2112006c926432f73 Mon Sep 17 00:00:00 2001 From: voon Date: Sat, 13 Jun 2026 16:19:35 +0800 Subject: [PATCH 06/11] refactor(common): Extract Avro-schema-keyed cache into AvroToHoodieSchemaCache Move the Avro Schema -> HoodieSchema cache out of HoodieSchemaCache into a dedicated AvroToHoodieSchemaCache class; misses still value-intern through HoodieSchemaCache. AvroRecordContext now uses the new class. HoodieSchemaCache is back to interning HoodieSchema only. --- .../apache/hudi/avro/AvroRecordContext.java | 4 +- .../schema/AvroToHoodieSchemaCache.java | 52 +++++++++++++++++++ .../hudi/common/schema/HoodieSchemaCache.java | 20 ------- 3 files changed, 54 insertions(+), 22 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/schema/AvroToHoodieSchemaCache.java diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java index 42e9f413d8ec2..002483a5f4184 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java @@ -24,8 +24,8 @@ import org.apache.hudi.common.model.HoodieEmptyRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.AvroToHoodieSchemaCache; import org.apache.hudi.common.schema.HoodieSchema; -import org.apache.hudi.common.schema.HoodieSchemaCache; import org.apache.hudi.common.schema.HoodieSchemaField; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.read.BufferedRecord; @@ -74,7 +74,7 @@ public static Object getFieldValueFromIndexedRecord( // Interning returns the canonical wrapper for this schema, whose lazily built field list and // field map survive across calls, so the per-record cost is a cache hit instead of an // O(schema width) wrapper rebuild. - HoodieSchema currentSchema = HoodieSchemaCache.intern(record.getSchema()); + HoodieSchema currentSchema = AvroToHoodieSchemaCache.intern(record.getSchema()); IndexedRecord currentRecord = record; String[] path = fieldName.split("\\."); for (int i = 0; i < path.length; i++) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/AvroToHoodieSchemaCache.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/AvroToHoodieSchemaCache.java new file mode 100644 index 0000000000000..bc939eacd6951 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/AvroToHoodieSchemaCache.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.schema; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import org.apache.avro.Schema; + +/** + * A global cache mapping Avro {@link Schema} instances to their canonical {@link HoodieSchema}. + * + *

This is an Avro-schema-keyed view onto {@link HoodieSchemaCache} for per-record call sites: + * {@code weakKeys} gives identity-based lookups (records of one file share the same {@link Schema} + * instance), so the hot path is a single cache hit with no wrapper allocation or type dispatch. + * Misses convert and then value-intern through {@link HoodieSchemaCache}, so equal but distinct Avro + * schema instances still converge on one canonical {@link HoodieSchema}. + * + *

This is a global cache which works for a JVM lifecycle. + */ +public class AvroToHoodieSchemaCache { + + private static final LoadingCache AVRO_SCHEMA_CACHE = + Caffeine.newBuilder().weakKeys().maximumSize(1024) + .build(avroSchema -> HoodieSchemaCache.intern(HoodieSchema.fromAvroSchema(avroSchema))); + + /** + * Returns the canonical {@link HoodieSchema} wrapping the given Avro schema, converting and + * interning it on first use. + * + * @param avroSchema Avro schema to look up + * @return the canonical HoodieSchema for the given Avro schema + */ + public static HoodieSchema intern(Schema avroSchema) { + return AVRO_SCHEMA_CACHE.get(avroSchema); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCache.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCache.java index 96f4fe93f694a..d96710bf52caf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCache.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCache.java @@ -20,7 +20,6 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; -import org.apache.avro.Schema; /** * A global cache for HoodieSchema instances to ensure that there is only one @@ -37,14 +36,6 @@ public class HoodieSchemaCache { private static final LoadingCache SCHEMA_CACHE = Caffeine.newBuilder().weakValues().maximumSize(1024).build(k -> k); - // Avro-schema-keyed view onto the cache above for per-record call sites: weakKeys gives - // identity-based lookups (records of one file share the same Schema instance), so the hot path - // is a single cache hit with no wrapper allocation or type dispatch. Misses convert and then - // value-intern, so equal but distinct Avro schema instances still converge on one canonical - // HoodieSchema. - private static final LoadingCache AVRO_SCHEMA_CACHE = - Caffeine.newBuilder().weakKeys().maximumSize(1024).build(avroSchema -> intern(HoodieSchema.fromAvroSchema(avroSchema))); - /** * Get schema variable from global cache. If not found, put it into the cache and then return it. * @@ -54,15 +45,4 @@ public class HoodieSchemaCache { public static HoodieSchema intern(HoodieSchema schema) { return SCHEMA_CACHE.get(schema); } - - /** - * Returns the canonical {@link HoodieSchema} wrapping the given Avro schema, converting and - * interning it on first use. - * - * @param avroSchema Avro schema to look up - * @return the canonical HoodieSchema for the given Avro schema - */ - public static HoodieSchema intern(Schema avroSchema) { - return AVRO_SCHEMA_CACHE.get(avroSchema); - } } From 8780fc8cc1c8e5e85169686c724e50393b393aa1 Mon Sep 17 00:00:00 2001 From: voon Date: Sat, 13 Jun 2026 18:11:52 +0800 Subject: [PATCH 07/11] review: Drop synchronized DCL for lazy fields in favor of a benign racy single-check getFields()/getFieldMap() build an immutable, deterministic view of the schema's fields, so concurrent first-callers can each build it once and converge on equal results. Keep the volatile fields so the unmodifiable collections (wrapping non-final ArrayList/HashMap) are still published safely; drop the synchronized blocks. --- .../apache/hudi/common/schema/HoodieSchema.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java index f1c7726e74c91..781816a40a49c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java @@ -1154,12 +1154,10 @@ public List getFields() { if (!hasFields()) { throw new IllegalStateException("Cannot get fields from schema type: " + type); } + // Benign race: the result is an immutable, deterministic view of avroSchema's fields, so concurrent + // callers may each build it once but converge on equal lists; the volatile field makes publication safe. if (fields == null) { - synchronized (this) { - if (fields == null) { - fields = Collections.unmodifiableList(avroSchema.getFields().stream().map(HoodieSchemaField::new).collect(Collectors.toList())); - } - } + fields = Collections.unmodifiableList(avroSchema.getFields().stream().map(HoodieSchemaField::new).collect(Collectors.toList())); } return fields; } @@ -1201,13 +1199,10 @@ public Option getField(String name) { } private Map getFieldMap() { + // Benign race, same rationale as getFields(): deterministic immutable result, volatile for safe publication. if (fieldMap == null) { - synchronized (this) { - if (fieldMap == null) { - fieldMap = Collections.unmodifiableMap(getFields().stream() - .collect(Collectors.toMap(HoodieSchemaField::name, field -> field))); - } - } + fieldMap = Collections.unmodifiableMap(getFields().stream() + .collect(Collectors.toMap(HoodieSchemaField::name, field -> field))); } return fieldMap; } From 31824fe9dfb4c0fdee72ff0ddff327db5565a98b Mon Sep 17 00:00:00 2001 From: voon Date: Sat, 13 Jun 2026 18:27:00 +0800 Subject: [PATCH 08/11] perf(common): Intern HoodieSchema at remaining per-record fromAvroSchema sites Audit of all HoodieSchema.fromAvroSchema(...) call sites for per-record rebuilds (follow-up to the AvroRecordContext change). Switch the genuinely per-record sites to AvroToHoodieSchemaCache.intern(...): - SparkFileFormatInternalRecordContext.convertAvroRecord - FlinkRecordContext.convertAvroRecord - RealtimeCompactedRecordReader.mergeRecord (two calls) - HoodieAvroUtils.getRecordColumnValues - HoodieJsonPayload.getInsertValue - ExpressionPayload MERGE-INTO eval paths And hoist the loop-invariant fromAvroSchema(schema) out of the per-record write loop in HoodieAvroDataBlock#getBytes. Interning returns an equal canonical HoodieSchema and improves downstream schema-keyed cache hit rates; cold/one-time and per-block sites are left unchanged. --- .../hudi/table/format/FlinkRecordContext.java | 3 ++- .../SparkFileFormatInternalRecordContext.scala | 4 ++-- .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 3 ++- .../org/apache/hudi/common/HoodieJsonPayload.java | 4 ++-- .../table/log/block/HoodieAvroDataBlock.java | 4 +++- .../realtime/RealtimeCompactedRecordReader.java | 5 +++-- .../hudi/command/payload/ExpressionPayload.scala | 14 +++++++------- 7 files changed, 21 insertions(+), 16 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java index d242dcbfb9d60..a04f2ca3ba609 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.AvroToHoodieSchemaCache; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaField; import org.apache.hudi.common.table.HoodieTableConfig; @@ -125,7 +126,7 @@ public RowData getDeleteRow(String recordKey) { @Override public RowData convertAvroRecord(IndexedRecord avroRecord) { Schema recordSchema = avroRecord.getSchema(); - AvroToRowDataConverters.AvroToRowDataConverter converter = RowDataQueryContexts.fromSchema(HoodieSchema.fromAvroSchema(recordSchema), utcTimezone).getAvroToRowDataConverter(); + AvroToRowDataConverters.AvroToRowDataConverter converter = RowDataQueryContexts.fromSchema(AvroToHoodieSchemaCache.intern(recordSchema), utcTimezone).getAvroToRowDataConverter(); RowData rowData = (RowData) converter.convert(avroRecord); Schema.Field operationField = recordSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD); if (operationField != null) { diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala index 3a4cf4642bb8e..9b40dcf633159 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala @@ -21,7 +21,7 @@ package org.apache.hudi import org.apache.avro.generic.{GenericRecord, IndexedRecord} import org.apache.hudi.common.engine.RecordContext -import org.apache.hudi.common.schema.HoodieSchema +import org.apache.hudi.common.schema.{AvroToHoodieSchemaCache, HoodieSchema} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.spark.sql.HoodieInternalRowUtils import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer} @@ -47,7 +47,7 @@ trait SparkFileFormatInternalRecordContext extends BaseSparkInternalRecordContex * @return An [[InternalRow]]. */ override def convertAvroRecord(avroRecord: IndexedRecord): InternalRow = { - val schema = HoodieSchema.fromAvroSchema(avroRecord.getSchema) + val schema = AvroToHoodieSchemaCache.intern(avroRecord.getSchema) val structType = HoodieInternalRowUtils.getCachedSchema(schema) val deserializer = deserializerMap.getOrElseUpdate(schema, { sparkAdapter.createAvroDeserializer(schema, structType) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 7c55f441d2009..ccfff0c2795d0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -19,6 +19,7 @@ package org.apache.hudi.avro; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.AvroToHoodieSchemaCache; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.util.DateTimeUtils; @@ -834,7 +835,7 @@ public static Object[] getRecordColumnValues(HoodieRecord record, Schema schema, boolean consistentLogicalTimestampEnabled) { try { - GenericRecord genericRecord = (GenericRecord) (record.toIndexedRecord(HoodieSchema.fromAvroSchema(schema), new Properties()).get()).getData(); + GenericRecord genericRecord = (GenericRecord) (record.toIndexedRecord(AvroToHoodieSchemaCache.intern(schema), new Properties()).get()).getData(); List list = new ArrayList<>(); for (String col : columns) { list.add(HoodieAvroUtils.getNestedFieldVal(genericRecord, col, true, consistentLogicalTimestampEnabled)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java index 7f6a68310be68..60c3e2d4d0256 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java @@ -20,7 +20,7 @@ import org.apache.hudi.avro.MercifulJsonConverter; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.AvroToHoodieSchemaCache; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.util.FileIOUtils; @@ -65,7 +65,7 @@ public Option combineAndGetUpdateValue(IndexedRecord oldRec, Sche @Override public Option getInsertValue(Schema schema) throws IOException { MercifulJsonConverter jsonConverter = new MercifulJsonConverter(); - return Option.of(jsonConverter.convert(getJsonData(), HoodieSchema.fromAvroSchema(schema))); + return Option.of(jsonConverter.convert(getJsonData(), AvroToHoodieSchemaCache.intern(schema))); } private String getJsonData() throws IOException { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java index 9f265d4f26878..cce7814992b9f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java @@ -507,9 +507,11 @@ public byte[] getBytes(Schema schema) throws IOException { output.writeInt(records.size()); // 3. Write the records + // schema is loop-invariant; wrap it once instead of rebuilding the HoodieSchema per record + HoodieSchema hoodieSchema = HoodieSchema.fromAvroSchema(schema); Iterator> itr = records.iterator(); while (itr.hasNext()) { - IndexedRecord s = itr.next().toIndexedRecord(HoodieSchema.fromAvroSchema(schema), new Properties()).get().getData(); + IndexedRecord s = itr.next().toIndexedRecord(hoodieSchema, new Properties()).get().getData(); ByteArrayOutputStream temp = new ByteArrayOutputStream(); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(temp, encoderCache.get()); encoderCache.set(encoder); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index 9f598b507b797..e0797ab882d51 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieAvroRecordMerger; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.schema.AvroToHoodieSchemaCache; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.read.BufferedRecord; @@ -204,8 +205,8 @@ private Option mergeRecord(HoodieRecord newRecord, A // once presto on hudi have its own mor reader, we can remove the rewrite logical. GenericRecord genericRecord = HiveAvroSerializer.rewriteRecordIgnoreResultCheck(oldRecord, getLogScannerReaderSchema()); RecordContext recordContext = AvroRecordContext.getFieldAccessorInstance(); - BufferedRecord record = BufferedRecords.fromEngineRecord(genericRecord, HoodieSchema.fromAvroSchema(genericRecord.getSchema()), recordContext, orderingFields, newRecord.getRecordKey(), false); - BufferedRecord newBufferedRecord = BufferedRecords.fromHoodieRecord(newRecord, HoodieSchema.fromAvroSchema(getLogScannerReaderSchema().toAvroSchema()), + BufferedRecord record = BufferedRecords.fromEngineRecord(genericRecord, AvroToHoodieSchemaCache.intern(genericRecord.getSchema()), recordContext, orderingFields, newRecord.getRecordKey(), false); + BufferedRecord newBufferedRecord = BufferedRecords.fromHoodieRecord(newRecord, AvroToHoodieSchemaCache.intern(getLogScannerReaderSchema().toAvroSchema()), recordContext, payloadProps, orderingFields, deleteContext); BufferedRecord mergeResult = merger.merge(record, newBufferedRecord, recordContext, payloadProps); if (mergeResult.isDelete()) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala index 7fab437b294f9..a31ef298d7756 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala @@ -22,7 +22,7 @@ import org.apache.hudi.HoodieSchemaConversionUtils.{convertHoodieSchemaToDataTyp import org.apache.hudi.SparkAdapterSupport.sparkAdapter import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodiePayloadProps, HoodieRecord, HoodieRecordPayload, OverwriteWithLatestAvroPayload} -import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaUtils} +import org.apache.hudi.common.schema.{AvroToHoodieSchemaCache, HoodieSchema, HoodieSchemaUtils} import org.apache.hudi.common.util.{BinaryUtil, ConfigUtils, HoodieRecordUtils, Option => HOption, OrderingValues, StringUtils, ValidationUtils} import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.config.HoodieWriteConfig @@ -116,7 +116,7 @@ class ExpressionPayload(@transient record: GenericRecord, // Get the Evaluator for each condition and update assignments. val updateConditionAndAssignments = - getEvaluator(updateConditionAndAssignmentsText.toString, HoodieSchema.fromAvroSchema(inputRecord.asAvro.getSchema)) + getEvaluator(updateConditionAndAssignmentsText.toString, AvroToHoodieSchemaCache.intern(inputRecord.asAvro.getSchema)) for ((conditionEvaluator, assignmentEvaluator) <- updateConditionAndAssignments if resultRecordOpt == null) { @@ -145,7 +145,7 @@ class ExpressionPayload(@transient record: GenericRecord, // Process delete val deleteConditionText = properties.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION) if (deleteConditionText != null) { - val (deleteConditionEvaluator, _) = getEvaluator(deleteConditionText.toString, HoodieSchema.fromAvroSchema(inputRecord.asAvro.getSchema)).head + val (deleteConditionEvaluator, _) = getEvaluator(deleteConditionText.toString, AvroToHoodieSchemaCache.intern(inputRecord.asAvro.getSchema)).head val deleteConditionEvalResult = deleteConditionEvaluator.apply(inputRecord.asRow) .get(0, BooleanType) .asInstanceOf[Boolean] @@ -206,7 +206,7 @@ class ExpressionPayload(@transient record: GenericRecord, * multiple times for different expression evaluation invocations */ case class ConvertibleRecord(private val avro: GenericRecord) extends Logging { - private lazy val row: InternalRow = getAvroDeserializerFor(HoodieSchema.fromAvroSchema(avro.getSchema)).deserialize(avro) match { + private lazy val row: InternalRow = getAvroDeserializerFor(AvroToHoodieSchemaCache.intern(avro.getSchema)).deserialize(avro) match { case Some(row) => row.asInstanceOf[InternalRow] case None => logError(s"Failed to deserialize Avro record `${avro.toString}` as Catalyst row") @@ -231,7 +231,7 @@ class ExpressionPayload(@transient record: GenericRecord, properties.get(ExpressionPayload.PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS).toString // Get the evaluator for each condition and insert assignment. val insertConditionAndAssignments = - ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText, HoodieSchema.fromAvroSchema(inputRecord.asAvro.getSchema)) + ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText, AvroToHoodieSchemaCache.intern(inputRecord.asAvro.getSchema)) var resultRecordOpt: HOption[IndexedRecord] = null for ((conditionEvaluator, assignmentEvaluator) <- insertConditionAndAssignments if resultRecordOpt == null) { @@ -243,7 +243,7 @@ class ExpressionPayload(@transient record: GenericRecord, if (conditionEvalResult) { val writerSchema = getWriterSchema(properties, false) val resultingRow = assignmentEvaluator.apply(inputRecord.asRow) - val resultingAvroRecord = getAvroSerializerFor(HoodieSchema.fromAvroSchema(writerSchema.getAvroSchema)) + val resultingAvroRecord = getAvroSerializerFor(AvroToHoodieSchemaCache.intern(writerSchema.getAvroSchema)) .serialize(resultingRow) .asInstanceOf[GenericRecord] @@ -315,7 +315,7 @@ class ExpressionPayload(@transient record: GenericRecord, */ private def joinRecord(sourceRecord: IndexedRecord, targetRecord: IndexedRecord, props: Properties): GenericRecord = { val leftSchema = sourceRecord.getSchema - val joinSchema = getMergedSchema(HoodieSchema.fromAvroSchema(leftSchema), HoodieSchema.fromAvroSchema(targetRecord.getSchema)) + val joinSchema = getMergedSchema(AvroToHoodieSchemaCache.intern(leftSchema), AvroToHoodieSchemaCache.intern(targetRecord.getSchema)) // TODO rebase onto JoinRecord val values = new Array[AnyRef](joinSchema.getFields.size()) From 552f726ea47ede26c68c3c3e392f0f0273431702 Mon Sep 17 00:00:00 2001 From: voon Date: Wed, 17 Jun 2026 16:00:46 +0800 Subject: [PATCH 09/11] review(common): correct lazy field-cache comment (benign race, not DCL) --- .../java/org/apache/hudi/common/schema/HoodieSchema.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java index 781816a40a49c..f38b9e2c93ed1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java @@ -313,8 +313,9 @@ private static void addVectorColumnName(String s, int start, int end, Set fields; private transient volatile Map fieldMap; From e109ba986b2bfa2a76c6ca820f9c5fc126bd114c Mon Sep 17 00:00:00 2001 From: voon Date: Wed, 17 Jun 2026 17:08:48 +0800 Subject: [PATCH 10/11] review(common): intern the loop-invariant HoodieSchema in getBytes Switch the hoisted HoodieSchema.fromAvroSchema(schema) in getBytes to AvroToHoodieSchemaCache.intern(schema). This matches every other site touched in this PR and reuses one cached, value-interned instance across getBytes calls, keeping HoodieSchema identity stable for the downstream per-record caches instead of rebuilding per block. --- .../hudi/common/table/log/block/HoodieAvroDataBlock.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java index cce7814992b9f..3037889572070 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; +import org.apache.hudi.common.schema.AvroToHoodieSchemaCache; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaCache; import org.apache.hudi.common.util.CollectionUtils; @@ -507,8 +508,8 @@ public byte[] getBytes(Schema schema) throws IOException { output.writeInt(records.size()); // 3. Write the records - // schema is loop-invariant; wrap it once instead of rebuilding the HoodieSchema per record - HoodieSchema hoodieSchema = HoodieSchema.fromAvroSchema(schema); + // schema is loop-invariant; intern it once (shared, cached) instead of rebuilding the HoodieSchema per record + HoodieSchema hoodieSchema = AvroToHoodieSchemaCache.intern(schema); Iterator> itr = records.iterator(); while (itr.hasNext()) { IndexedRecord s = itr.next().toIndexedRecord(hoodieSchema, new Properties()).get().getData(); From b218b8ff774bd18894edce43c6b4e6b4d19309b7 Mon Sep 17 00:00:00 2001 From: voon Date: Wed, 17 Jun 2026 17:19:05 +0800 Subject: [PATCH 11/11] review(common): rename AvroToHoodieSchemaCache to HoodieAvroSchemaCache Match the Hoodie-prefix convention used by every other class in the org.apache.hudi.common.schema package (HoodieSchema, HoodieSchemaCache, HoodieSchemaField, ...). Pure rename of the class and its 8 referencing files; no behavior change. --- .../hudi/table/format/FlinkRecordContext.java | 4 ++-- .../SparkFileFormatInternalRecordContext.scala | 4 ++-- .../org/apache/hudi/avro/AvroRecordContext.java | 4 ++-- .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 4 ++-- .../org/apache/hudi/common/HoodieJsonPayload.java | 4 ++-- ...SchemaCache.java => HoodieAvroSchemaCache.java} | 2 +- .../table/log/block/HoodieAvroDataBlock.java | 4 ++-- .../realtime/RealtimeCompactedRecordReader.java | 6 +++--- .../hudi/command/payload/ExpressionPayload.scala | 14 +++++++------- 9 files changed, 23 insertions(+), 23 deletions(-) rename hudi-common/src/main/java/org/apache/hudi/common/schema/{AvroToHoodieSchemaCache.java => HoodieAvroSchemaCache.java} (98%) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java index a04f2ca3ba609..e2406b829693e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.schema.AvroToHoodieSchemaCache; +import org.apache.hudi.common.schema.HoodieAvroSchemaCache; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaField; import org.apache.hudi.common.table.HoodieTableConfig; @@ -126,7 +126,7 @@ public RowData getDeleteRow(String recordKey) { @Override public RowData convertAvroRecord(IndexedRecord avroRecord) { Schema recordSchema = avroRecord.getSchema(); - AvroToRowDataConverters.AvroToRowDataConverter converter = RowDataQueryContexts.fromSchema(AvroToHoodieSchemaCache.intern(recordSchema), utcTimezone).getAvroToRowDataConverter(); + AvroToRowDataConverters.AvroToRowDataConverter converter = RowDataQueryContexts.fromSchema(HoodieAvroSchemaCache.intern(recordSchema), utcTimezone).getAvroToRowDataConverter(); RowData rowData = (RowData) converter.convert(avroRecord); Schema.Field operationField = recordSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD); if (operationField != null) { diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala index 9b40dcf633159..edf0a4eee3dbd 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala @@ -21,7 +21,7 @@ package org.apache.hudi import org.apache.avro.generic.{GenericRecord, IndexedRecord} import org.apache.hudi.common.engine.RecordContext -import org.apache.hudi.common.schema.{AvroToHoodieSchemaCache, HoodieSchema} +import org.apache.hudi.common.schema.{HoodieAvroSchemaCache, HoodieSchema} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.spark.sql.HoodieInternalRowUtils import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer} @@ -47,7 +47,7 @@ trait SparkFileFormatInternalRecordContext extends BaseSparkInternalRecordContex * @return An [[InternalRow]]. */ override def convertAvroRecord(avroRecord: IndexedRecord): InternalRow = { - val schema = AvroToHoodieSchemaCache.intern(avroRecord.getSchema) + val schema = HoodieAvroSchemaCache.intern(avroRecord.getSchema) val structType = HoodieInternalRowUtils.getCachedSchema(schema) val deserializer = deserializerMap.getOrElseUpdate(schema, { sparkAdapter.createAvroDeserializer(schema, structType) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java index 002483a5f4184..fceeeaf84b97a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java @@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieEmptyRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.schema.AvroToHoodieSchemaCache; +import org.apache.hudi.common.schema.HoodieAvroSchemaCache; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaField; import org.apache.hudi.common.table.HoodieTableConfig; @@ -74,7 +74,7 @@ public static Object getFieldValueFromIndexedRecord( // Interning returns the canonical wrapper for this schema, whose lazily built field list and // field map survive across calls, so the per-record cost is a cache hit instead of an // O(schema width) wrapper rebuild. - HoodieSchema currentSchema = AvroToHoodieSchemaCache.intern(record.getSchema()); + HoodieSchema currentSchema = HoodieAvroSchemaCache.intern(record.getSchema()); IndexedRecord currentRecord = record; String[] path = fieldName.split("\\."); for (int i = 0; i < path.length; i++) { diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index ccfff0c2795d0..c52960c4abb06 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -19,7 +19,7 @@ package org.apache.hudi.avro; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.schema.AvroToHoodieSchemaCache; +import org.apache.hudi.common.schema.HoodieAvroSchemaCache; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.util.DateTimeUtils; @@ -835,7 +835,7 @@ public static Object[] getRecordColumnValues(HoodieRecord record, Schema schema, boolean consistentLogicalTimestampEnabled) { try { - GenericRecord genericRecord = (GenericRecord) (record.toIndexedRecord(AvroToHoodieSchemaCache.intern(schema), new Properties()).get()).getData(); + GenericRecord genericRecord = (GenericRecord) (record.toIndexedRecord(HoodieAvroSchemaCache.intern(schema), new Properties()).get()).getData(); List list = new ArrayList<>(); for (String col : columns) { list.add(HoodieAvroUtils.getNestedFieldVal(genericRecord, col, true, consistentLogicalTimestampEnabled)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java index 60c3e2d4d0256..463f773b82150 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java @@ -20,7 +20,7 @@ import org.apache.hudi.avro.MercifulJsonConverter; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.schema.AvroToHoodieSchemaCache; +import org.apache.hudi.common.schema.HoodieAvroSchemaCache; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.util.FileIOUtils; @@ -65,7 +65,7 @@ public Option combineAndGetUpdateValue(IndexedRecord oldRec, Sche @Override public Option getInsertValue(Schema schema) throws IOException { MercifulJsonConverter jsonConverter = new MercifulJsonConverter(); - return Option.of(jsonConverter.convert(getJsonData(), AvroToHoodieSchemaCache.intern(schema))); + return Option.of(jsonConverter.convert(getJsonData(), HoodieAvroSchemaCache.intern(schema))); } private String getJsonData() throws IOException { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/AvroToHoodieSchemaCache.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieAvroSchemaCache.java similarity index 98% rename from hudi-common/src/main/java/org/apache/hudi/common/schema/AvroToHoodieSchemaCache.java rename to hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieAvroSchemaCache.java index bc939eacd6951..8b153eb4487e4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/schema/AvroToHoodieSchemaCache.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieAvroSchemaCache.java @@ -33,7 +33,7 @@ * *

This is a global cache which works for a JVM lifecycle. */ -public class AvroToHoodieSchemaCache { +public class HoodieAvroSchemaCache { private static final LoadingCache AVRO_SCHEMA_CACHE = Caffeine.newBuilder().weakKeys().maximumSize(1024) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java index 3037889572070..764528bfc3f06 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java @@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; -import org.apache.hudi.common.schema.AvroToHoodieSchemaCache; +import org.apache.hudi.common.schema.HoodieAvroSchemaCache; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaCache; import org.apache.hudi.common.util.CollectionUtils; @@ -509,7 +509,7 @@ public byte[] getBytes(Schema schema) throws IOException { // 3. Write the records // schema is loop-invariant; intern it once (shared, cached) instead of rebuilding the HoodieSchema per record - HoodieSchema hoodieSchema = AvroToHoodieSchemaCache.intern(schema); + HoodieSchema hoodieSchema = HoodieAvroSchemaCache.intern(schema); Iterator> itr = records.iterator(); while (itr.hasNext()) { IndexedRecord s = itr.next().toIndexedRecord(hoodieSchema, new Properties()).get().getData(); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index e0797ab882d51..74ee3f8491846 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -27,7 +27,7 @@ import org.apache.hudi.common.model.HoodieAvroRecordMerger; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; -import org.apache.hudi.common.schema.AvroToHoodieSchemaCache; +import org.apache.hudi.common.schema.HoodieAvroSchemaCache; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.read.BufferedRecord; @@ -205,8 +205,8 @@ private Option mergeRecord(HoodieRecord newRecord, A // once presto on hudi have its own mor reader, we can remove the rewrite logical. GenericRecord genericRecord = HiveAvroSerializer.rewriteRecordIgnoreResultCheck(oldRecord, getLogScannerReaderSchema()); RecordContext recordContext = AvroRecordContext.getFieldAccessorInstance(); - BufferedRecord record = BufferedRecords.fromEngineRecord(genericRecord, AvroToHoodieSchemaCache.intern(genericRecord.getSchema()), recordContext, orderingFields, newRecord.getRecordKey(), false); - BufferedRecord newBufferedRecord = BufferedRecords.fromHoodieRecord(newRecord, AvroToHoodieSchemaCache.intern(getLogScannerReaderSchema().toAvroSchema()), + BufferedRecord record = BufferedRecords.fromEngineRecord(genericRecord, HoodieAvroSchemaCache.intern(genericRecord.getSchema()), recordContext, orderingFields, newRecord.getRecordKey(), false); + BufferedRecord newBufferedRecord = BufferedRecords.fromHoodieRecord(newRecord, HoodieAvroSchemaCache.intern(getLogScannerReaderSchema().toAvroSchema()), recordContext, payloadProps, orderingFields, deleteContext); BufferedRecord mergeResult = merger.merge(record, newBufferedRecord, recordContext, payloadProps); if (mergeResult.isDelete()) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala index a31ef298d7756..a33fe583a6dbf 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala @@ -22,7 +22,7 @@ import org.apache.hudi.HoodieSchemaConversionUtils.{convertHoodieSchemaToDataTyp import org.apache.hudi.SparkAdapterSupport.sparkAdapter import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodiePayloadProps, HoodieRecord, HoodieRecordPayload, OverwriteWithLatestAvroPayload} -import org.apache.hudi.common.schema.{AvroToHoodieSchemaCache, HoodieSchema, HoodieSchemaUtils} +import org.apache.hudi.common.schema.{HoodieAvroSchemaCache, HoodieSchema, HoodieSchemaUtils} import org.apache.hudi.common.util.{BinaryUtil, ConfigUtils, HoodieRecordUtils, Option => HOption, OrderingValues, StringUtils, ValidationUtils} import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.config.HoodieWriteConfig @@ -116,7 +116,7 @@ class ExpressionPayload(@transient record: GenericRecord, // Get the Evaluator for each condition and update assignments. val updateConditionAndAssignments = - getEvaluator(updateConditionAndAssignmentsText.toString, AvroToHoodieSchemaCache.intern(inputRecord.asAvro.getSchema)) + getEvaluator(updateConditionAndAssignmentsText.toString, HoodieAvroSchemaCache.intern(inputRecord.asAvro.getSchema)) for ((conditionEvaluator, assignmentEvaluator) <- updateConditionAndAssignments if resultRecordOpt == null) { @@ -145,7 +145,7 @@ class ExpressionPayload(@transient record: GenericRecord, // Process delete val deleteConditionText = properties.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION) if (deleteConditionText != null) { - val (deleteConditionEvaluator, _) = getEvaluator(deleteConditionText.toString, AvroToHoodieSchemaCache.intern(inputRecord.asAvro.getSchema)).head + val (deleteConditionEvaluator, _) = getEvaluator(deleteConditionText.toString, HoodieAvroSchemaCache.intern(inputRecord.asAvro.getSchema)).head val deleteConditionEvalResult = deleteConditionEvaluator.apply(inputRecord.asRow) .get(0, BooleanType) .asInstanceOf[Boolean] @@ -206,7 +206,7 @@ class ExpressionPayload(@transient record: GenericRecord, * multiple times for different expression evaluation invocations */ case class ConvertibleRecord(private val avro: GenericRecord) extends Logging { - private lazy val row: InternalRow = getAvroDeserializerFor(AvroToHoodieSchemaCache.intern(avro.getSchema)).deserialize(avro) match { + private lazy val row: InternalRow = getAvroDeserializerFor(HoodieAvroSchemaCache.intern(avro.getSchema)).deserialize(avro) match { case Some(row) => row.asInstanceOf[InternalRow] case None => logError(s"Failed to deserialize Avro record `${avro.toString}` as Catalyst row") @@ -231,7 +231,7 @@ class ExpressionPayload(@transient record: GenericRecord, properties.get(ExpressionPayload.PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS).toString // Get the evaluator for each condition and insert assignment. val insertConditionAndAssignments = - ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText, AvroToHoodieSchemaCache.intern(inputRecord.asAvro.getSchema)) + ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText, HoodieAvroSchemaCache.intern(inputRecord.asAvro.getSchema)) var resultRecordOpt: HOption[IndexedRecord] = null for ((conditionEvaluator, assignmentEvaluator) <- insertConditionAndAssignments if resultRecordOpt == null) { @@ -243,7 +243,7 @@ class ExpressionPayload(@transient record: GenericRecord, if (conditionEvalResult) { val writerSchema = getWriterSchema(properties, false) val resultingRow = assignmentEvaluator.apply(inputRecord.asRow) - val resultingAvroRecord = getAvroSerializerFor(AvroToHoodieSchemaCache.intern(writerSchema.getAvroSchema)) + val resultingAvroRecord = getAvroSerializerFor(HoodieAvroSchemaCache.intern(writerSchema.getAvroSchema)) .serialize(resultingRow) .asInstanceOf[GenericRecord] @@ -315,7 +315,7 @@ class ExpressionPayload(@transient record: GenericRecord, */ private def joinRecord(sourceRecord: IndexedRecord, targetRecord: IndexedRecord, props: Properties): GenericRecord = { val leftSchema = sourceRecord.getSchema - val joinSchema = getMergedSchema(AvroToHoodieSchemaCache.intern(leftSchema), AvroToHoodieSchemaCache.intern(targetRecord.getSchema)) + val joinSchema = getMergedSchema(HoodieAvroSchemaCache.intern(leftSchema), HoodieAvroSchemaCache.intern(targetRecord.getSchema)) // TODO rebase onto JoinRecord val values = new Array[AnyRef](joinSchema.getFields.size())