diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java index d242dcbfb9d60..e2406b829693e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieAvroSchemaCache; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaField; import org.apache.hudi.common.table.HoodieTableConfig; @@ -125,7 +126,7 @@ public RowData getDeleteRow(String recordKey) { @Override public RowData convertAvroRecord(IndexedRecord avroRecord) { Schema recordSchema = avroRecord.getSchema(); - AvroToRowDataConverters.AvroToRowDataConverter converter = RowDataQueryContexts.fromSchema(HoodieSchema.fromAvroSchema(recordSchema), utcTimezone).getAvroToRowDataConverter(); + AvroToRowDataConverters.AvroToRowDataConverter converter = RowDataQueryContexts.fromSchema(HoodieAvroSchemaCache.intern(recordSchema), utcTimezone).getAvroToRowDataConverter(); RowData rowData = (RowData) converter.convert(avroRecord); Schema.Field operationField = recordSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD); if (operationField != null) { diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala index 3a4cf4642bb8e..edf0a4eee3dbd 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala @@ -21,7 +21,7 @@ package org.apache.hudi import org.apache.avro.generic.{GenericRecord, IndexedRecord} import org.apache.hudi.common.engine.RecordContext -import org.apache.hudi.common.schema.HoodieSchema +import org.apache.hudi.common.schema.{HoodieAvroSchemaCache, HoodieSchema} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.spark.sql.HoodieInternalRowUtils import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer} @@ -47,7 +47,7 @@ trait SparkFileFormatInternalRecordContext extends BaseSparkInternalRecordContex * @return An [[InternalRow]]. */ override def convertAvroRecord(avroRecord: IndexedRecord): InternalRow = { - val schema = HoodieSchema.fromAvroSchema(avroRecord.getSchema) + val schema = HoodieAvroSchemaCache.intern(avroRecord.getSchema) val structType = HoodieInternalRowUtils.getCachedSchema(schema) val deserializer = deserializerMap.getOrElseUpdate(schema, { sparkAdapter.createAvroDeserializer(schema, structType) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java index def6e6a7003fc..fceeeaf84b97a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieEmptyRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieAvroSchemaCache; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaField; import org.apache.hudi.common.table.HoodieTableConfig; @@ -70,7 +71,10 @@ public AvroRecordContext() { public static Object getFieldValueFromIndexedRecord( IndexedRecord record, String fieldName) { - HoodieSchema currentSchema = HoodieSchema.fromAvroSchema(record.getSchema()); + // Interning returns the canonical wrapper for this schema, whose lazily built field list and + // field map survive across calls, so the per-record cost is a cache hit instead of an + // O(schema width) wrapper rebuild. + HoodieSchema currentSchema = HoodieAvroSchemaCache.intern(record.getSchema()); IndexedRecord currentRecord = record; String[] path = fieldName.split("\\."); for (int i = 0; i < path.length; i++) { diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 7c55f441d2009..c52960c4abb06 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -19,6 +19,7 @@ package org.apache.hudi.avro; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieAvroSchemaCache; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.util.DateTimeUtils; @@ -834,7 +835,7 @@ public static Object[] getRecordColumnValues(HoodieRecord record, Schema schema, boolean consistentLogicalTimestampEnabled) { try { - GenericRecord genericRecord = (GenericRecord) (record.toIndexedRecord(HoodieSchema.fromAvroSchema(schema), new Properties()).get()).getData(); + GenericRecord genericRecord = (GenericRecord) (record.toIndexedRecord(HoodieAvroSchemaCache.intern(schema), new Properties()).get()).getData(); List list = new ArrayList<>(); for (String col : columns) { list.add(HoodieAvroUtils.getNestedFieldVal(genericRecord, col, true, consistentLogicalTimestampEnabled)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java index 7f6a68310be68..463f773b82150 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java @@ -20,7 +20,7 @@ import org.apache.hudi.avro.MercifulJsonConverter; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieAvroSchemaCache; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.util.FileIOUtils; @@ -65,7 +65,7 @@ public Option combineAndGetUpdateValue(IndexedRecord oldRec, Sche @Override public Option getInsertValue(Schema schema) throws IOException { MercifulJsonConverter jsonConverter = new MercifulJsonConverter(); - return Option.of(jsonConverter.convert(getJsonData(), HoodieSchema.fromAvroSchema(schema))); + return Option.of(jsonConverter.convert(getJsonData(), HoodieAvroSchemaCache.intern(schema))); } private String getJsonData() throws IOException { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieAvroSchemaCache.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieAvroSchemaCache.java new file mode 100644 index 0000000000000..8b153eb4487e4 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieAvroSchemaCache.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.schema; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import org.apache.avro.Schema; + +/** + * A global cache mapping Avro {@link Schema} instances to their canonical {@link HoodieSchema}. + * + *

This is an Avro-schema-keyed view onto {@link HoodieSchemaCache} for per-record call sites: + * {@code weakKeys} gives identity-based lookups (records of one file share the same {@link Schema} + * instance), so the hot path is a single cache hit with no wrapper allocation or type dispatch. + * Misses convert and then value-intern through {@link HoodieSchemaCache}, so equal but distinct Avro + * schema instances still converge on one canonical {@link HoodieSchema}. + * + *

This is a global cache which works for a JVM lifecycle. + */ +public class HoodieAvroSchemaCache { + + private static final LoadingCache AVRO_SCHEMA_CACHE = + Caffeine.newBuilder().weakKeys().maximumSize(1024) + .build(avroSchema -> HoodieSchemaCache.intern(HoodieSchema.fromAvroSchema(avroSchema))); + + /** + * Returns the canonical {@link HoodieSchema} wrapping the given Avro schema, converting and + * interning it on first use. + * + * @param avroSchema Avro schema to look up + * @return the canonical HoodieSchema for the given Avro schema + */ + public static HoodieSchema intern(Schema avroSchema) { + return AVRO_SCHEMA_CACHE.get(avroSchema); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java index 0b0978d552a23..f38b9e2c93ed1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java @@ -313,8 +313,11 @@ private static void addVectorColumnName(String s, int start, int end, Set fields; - private transient Map fieldMap; + // interned instances are shared across threads, so the lazily built caches use a benign racy + // single-check (see getFields()/getFieldMap()): lock-free volatile reads, and volatile gives + // safe publication of the immutable, deterministic result + private transient volatile List fields; + private transient volatile Map fieldMap; // Register the Variant logical type with Avro static { @@ -1152,6 +1155,8 @@ public List getFields() { if (!hasFields()) { throw new IllegalStateException("Cannot get fields from schema type: " + type); } + // Benign race: the result is an immutable, deterministic view of avroSchema's fields, so concurrent + // callers may each build it once but converge on equal lists; the volatile field makes publication safe. if (fields == null) { fields = Collections.unmodifiableList(avroSchema.getFields().stream().map(HoodieSchemaField::new).collect(Collectors.toList())); } @@ -1195,9 +1200,10 @@ public Option getField(String name) { } private Map getFieldMap() { + // Benign race, same rationale as getFields(): deterministic immutable result, volatile for safe publication. if (fieldMap == null) { - fieldMap = getFields().stream() - .collect(Collectors.toMap(HoodieSchemaField::name, field -> field)); + fieldMap = Collections.unmodifiableMap(getFields().stream() + .collect(Collectors.toMap(HoodieSchemaField::name, field -> field))); } return fieldMap; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java index 9f265d4f26878..764528bfc3f06 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; +import org.apache.hudi.common.schema.HoodieAvroSchemaCache; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaCache; import org.apache.hudi.common.util.CollectionUtils; @@ -507,9 +508,11 @@ public byte[] getBytes(Schema schema) throws IOException { output.writeInt(records.size()); // 3. Write the records + // schema is loop-invariant; intern it once (shared, cached) instead of rebuilding the HoodieSchema per record + HoodieSchema hoodieSchema = HoodieAvroSchemaCache.intern(schema); Iterator> itr = records.iterator(); while (itr.hasNext()) { - IndexedRecord s = itr.next().toIndexedRecord(HoodieSchema.fromAvroSchema(schema), new Properties()).get().getData(); + IndexedRecord s = itr.next().toIndexedRecord(hoodieSchema, new Properties()).get().getData(); ByteArrayOutputStream temp = new ByteArrayOutputStream(); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(temp, encoderCache.get()); encoderCache.set(encoder); diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroRecordContext.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroRecordContext.java index b84738684b16c..04c7ae3c2bda7 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroRecordContext.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestAvroRecordContext.java @@ -19,14 +19,21 @@ package org.apache.hudi.avro; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import java.util.stream.Stream; +import static org.apache.hudi.avro.AvroRecordContext.getFieldValueFromIndexedRecord; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; class TestAvroRecordContext { @@ -44,4 +51,65 @@ void testConvertValueToEngineType(Comparable input, Comparable expected) { Comparable actual = AvroRecordContext.getFieldAccessorInstance().convertValueToEngineType(input); assertEquals(expected, actual); } + + private static final Schema RECORD_SCHEMA = new Schema.Parser().parse( + "{\"type\":\"record\",\"name\":\"top\",\"fields\":[" + + "{\"name\":\"id\",\"type\":\"int\"}," + + "{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}," + + "{\"name\":\"address\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"address\",\"fields\":[" + + "{\"name\":\"city\",\"type\":\"string\"}," + + "{\"name\":\"zip\",\"type\":[\"null\",\"int\"],\"default\":null}]}],\"default\":null}," + + "{\"name\":\"multi\",\"type\":[\"null\",\"string\",\"int\"],\"default\":null}]}"); + + private static GenericRecord buildRecord() { + GenericRecord address = new GenericData.Record(RECORD_SCHEMA.getField("address").schema().getTypes().get(1)); + address.put("city", new Utf8("sf")); + address.put("zip", 94105); + GenericRecord record = new GenericData.Record(RECORD_SCHEMA); + record.put("id", 1); + record.put("name", new Utf8("alice")); + record.put("address", address); + return record; + } + + @Test + void testGetFieldValueTopLevel() { + GenericRecord record = buildRecord(); + assertEquals(1, getFieldValueFromIndexedRecord(record, "id")); + assertEquals(new Utf8("alice"), getFieldValueFromIndexedRecord(record, "name")); + assertNull(getFieldValueFromIndexedRecord(record, "multi")); + assertNull(getFieldValueFromIndexedRecord(record, "missing")); + } + + @Test + void testGetFieldValueNested() { + GenericRecord record = buildRecord(); + // intermediate segment unwraps the [null, record] union + assertEquals(new Utf8("sf"), getFieldValueFromIndexedRecord(record, "address.city")); + assertEquals(94105, getFieldValueFromIndexedRecord(record, "address.zip")); + assertNull(getFieldValueFromIndexedRecord(record, "address.missing")); + assertNull(getFieldValueFromIndexedRecord(record, "missing.nested")); + } + + @Test + void testGetFieldValueErrorCases() { + GenericRecord record = buildRecord(); + // a union that is not [null, T] does not support field lookups + assertThrows(IllegalStateException.class, () -> getFieldValueFromIndexedRecord(record, "multi.sub")); + assertThrows(IllegalArgumentException.class, () -> getFieldValueFromIndexedRecord(record, "")); + } + + @Test + void testGetFieldValueAcrossEqualSchemaInstances() { + // records from different files carry equal but distinct schema instances; both must intern to + // the same canonical wrapper and resolve identically + Schema schemaCopy = new Schema.Parser().parse(RECORD_SCHEMA.toString()); + GenericRecord record = buildRecord(); + GenericRecord recordWithCopy = new GenericData.Record(schemaCopy); + for (Schema.Field field : RECORD_SCHEMA.getFields()) { + recordWithCopy.put(field.pos(), record.get(field.pos())); + } + assertEquals(getFieldValueFromIndexedRecord(record, "id"), getFieldValueFromIndexedRecord(recordWithCopy, "id")); + assertEquals(getFieldValueFromIndexedRecord(record, "address.city"), getFieldValueFromIndexedRecord(recordWithCopy, "address.city")); + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index 9f598b507b797..74ee3f8491846 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieAvroRecordMerger; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.schema.HoodieAvroSchemaCache; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.read.BufferedRecord; @@ -204,8 +205,8 @@ private Option mergeRecord(HoodieRecord newRecord, A // once presto on hudi have its own mor reader, we can remove the rewrite logical. GenericRecord genericRecord = HiveAvroSerializer.rewriteRecordIgnoreResultCheck(oldRecord, getLogScannerReaderSchema()); RecordContext recordContext = AvroRecordContext.getFieldAccessorInstance(); - BufferedRecord record = BufferedRecords.fromEngineRecord(genericRecord, HoodieSchema.fromAvroSchema(genericRecord.getSchema()), recordContext, orderingFields, newRecord.getRecordKey(), false); - BufferedRecord newBufferedRecord = BufferedRecords.fromHoodieRecord(newRecord, HoodieSchema.fromAvroSchema(getLogScannerReaderSchema().toAvroSchema()), + BufferedRecord record = BufferedRecords.fromEngineRecord(genericRecord, HoodieAvroSchemaCache.intern(genericRecord.getSchema()), recordContext, orderingFields, newRecord.getRecordKey(), false); + BufferedRecord newBufferedRecord = BufferedRecords.fromHoodieRecord(newRecord, HoodieAvroSchemaCache.intern(getLogScannerReaderSchema().toAvroSchema()), recordContext, payloadProps, orderingFields, deleteContext); BufferedRecord mergeResult = merger.merge(record, newBufferedRecord, recordContext, payloadProps); if (mergeResult.isDelete()) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala index 7fab437b294f9..a33fe583a6dbf 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala @@ -22,7 +22,7 @@ import org.apache.hudi.HoodieSchemaConversionUtils.{convertHoodieSchemaToDataTyp import org.apache.hudi.SparkAdapterSupport.sparkAdapter import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodiePayloadProps, HoodieRecord, HoodieRecordPayload, OverwriteWithLatestAvroPayload} -import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaUtils} +import org.apache.hudi.common.schema.{HoodieAvroSchemaCache, HoodieSchema, HoodieSchemaUtils} import org.apache.hudi.common.util.{BinaryUtil, ConfigUtils, HoodieRecordUtils, Option => HOption, OrderingValues, StringUtils, ValidationUtils} import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.config.HoodieWriteConfig @@ -116,7 +116,7 @@ class ExpressionPayload(@transient record: GenericRecord, // Get the Evaluator for each condition and update assignments. val updateConditionAndAssignments = - getEvaluator(updateConditionAndAssignmentsText.toString, HoodieSchema.fromAvroSchema(inputRecord.asAvro.getSchema)) + getEvaluator(updateConditionAndAssignmentsText.toString, HoodieAvroSchemaCache.intern(inputRecord.asAvro.getSchema)) for ((conditionEvaluator, assignmentEvaluator) <- updateConditionAndAssignments if resultRecordOpt == null) { @@ -145,7 +145,7 @@ class ExpressionPayload(@transient record: GenericRecord, // Process delete val deleteConditionText = properties.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION) if (deleteConditionText != null) { - val (deleteConditionEvaluator, _) = getEvaluator(deleteConditionText.toString, HoodieSchema.fromAvroSchema(inputRecord.asAvro.getSchema)).head + val (deleteConditionEvaluator, _) = getEvaluator(deleteConditionText.toString, HoodieAvroSchemaCache.intern(inputRecord.asAvro.getSchema)).head val deleteConditionEvalResult = deleteConditionEvaluator.apply(inputRecord.asRow) .get(0, BooleanType) .asInstanceOf[Boolean] @@ -206,7 +206,7 @@ class ExpressionPayload(@transient record: GenericRecord, * multiple times for different expression evaluation invocations */ case class ConvertibleRecord(private val avro: GenericRecord) extends Logging { - private lazy val row: InternalRow = getAvroDeserializerFor(HoodieSchema.fromAvroSchema(avro.getSchema)).deserialize(avro) match { + private lazy val row: InternalRow = getAvroDeserializerFor(HoodieAvroSchemaCache.intern(avro.getSchema)).deserialize(avro) match { case Some(row) => row.asInstanceOf[InternalRow] case None => logError(s"Failed to deserialize Avro record `${avro.toString}` as Catalyst row") @@ -231,7 +231,7 @@ class ExpressionPayload(@transient record: GenericRecord, properties.get(ExpressionPayload.PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS).toString // Get the evaluator for each condition and insert assignment. val insertConditionAndAssignments = - ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText, HoodieSchema.fromAvroSchema(inputRecord.asAvro.getSchema)) + ExpressionPayload.getEvaluator(insertConditionAndAssignmentsText, HoodieAvroSchemaCache.intern(inputRecord.asAvro.getSchema)) var resultRecordOpt: HOption[IndexedRecord] = null for ((conditionEvaluator, assignmentEvaluator) <- insertConditionAndAssignments if resultRecordOpt == null) { @@ -243,7 +243,7 @@ class ExpressionPayload(@transient record: GenericRecord, if (conditionEvalResult) { val writerSchema = getWriterSchema(properties, false) val resultingRow = assignmentEvaluator.apply(inputRecord.asRow) - val resultingAvroRecord = getAvroSerializerFor(HoodieSchema.fromAvroSchema(writerSchema.getAvroSchema)) + val resultingAvroRecord = getAvroSerializerFor(HoodieAvroSchemaCache.intern(writerSchema.getAvroSchema)) .serialize(resultingRow) .asInstanceOf[GenericRecord] @@ -315,7 +315,7 @@ class ExpressionPayload(@transient record: GenericRecord, */ private def joinRecord(sourceRecord: IndexedRecord, targetRecord: IndexedRecord, props: Properties): GenericRecord = { val leftSchema = sourceRecord.getSchema - val joinSchema = getMergedSchema(HoodieSchema.fromAvroSchema(leftSchema), HoodieSchema.fromAvroSchema(targetRecord.getSchema)) + val joinSchema = getMergedSchema(HoodieAvroSchemaCache.intern(leftSchema), HoodieAvroSchemaCache.intern(targetRecord.getSchema)) // TODO rebase onto JoinRecord val values = new Array[AnyRef](joinSchema.getFields.size())