Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieAvroSchemaCache;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.table.HoodieTableConfig;
Expand Down Expand Up @@ -125,7 +126,7 @@ public RowData getDeleteRow(String recordKey) {
@Override
public RowData convertAvroRecord(IndexedRecord avroRecord) {
Schema recordSchema = avroRecord.getSchema();
AvroToRowDataConverters.AvroToRowDataConverter converter = RowDataQueryContexts.fromSchema(HoodieSchema.fromAvroSchema(recordSchema), utcTimezone).getAvroToRowDataConverter();
AvroToRowDataConverters.AvroToRowDataConverter converter = RowDataQueryContexts.fromSchema(HoodieAvroSchemaCache.intern(recordSchema), utcTimezone).getAvroToRowDataConverter();
RowData rowData = (RowData) converter.convert(avroRecord);
Schema.Field operationField = recordSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD);
if (operationField != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.hudi

import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hudi.common.engine.RecordContext
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.schema.{HoodieAvroSchemaCache, HoodieSchema}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
Expand All @@ -47,7 +47,7 @@ trait SparkFileFormatInternalRecordContext extends BaseSparkInternalRecordContex
* @return An [[InternalRow]].
*/
override def convertAvroRecord(avroRecord: IndexedRecord): InternalRow = {
val schema = HoodieSchema.fromAvroSchema(avroRecord.getSchema)
val schema = HoodieAvroSchemaCache.intern(avroRecord.getSchema)
val structType = HoodieInternalRowUtils.getCachedSchema(schema)
val deserializer = deserializerMap.getOrElseUpdate(schema, {
sparkAdapter.createAvroDeserializer(schema, structType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieAvroSchemaCache;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.table.HoodieTableConfig;
Expand Down Expand Up @@ -70,7 +71,10 @@ public AvroRecordContext() {
public static Object getFieldValueFromIndexedRecord(
IndexedRecord record,
String fieldName) {
HoodieSchema currentSchema = HoodieSchema.fromAvroSchema(record.getSchema());
// Interning returns the canonical wrapper for this schema, whose lazily built field list and
// field map survive across calls, so the per-record cost is a cache hit instead of an
// O(schema width) wrapper rebuild.
HoodieSchema currentSchema = HoodieAvroSchemaCache.intern(record.getSchema());
IndexedRecord currentRecord = record;
String[] path = fieldName.split("\\.");
for (int i = 0; i < path.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.avro;

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieAvroSchemaCache;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.util.DateTimeUtils;
Expand Down Expand Up @@ -834,7 +835,7 @@ public static Object[] getRecordColumnValues(HoodieRecord record,
Schema schema,
boolean consistentLogicalTimestampEnabled) {
try {
GenericRecord genericRecord = (GenericRecord) (record.toIndexedRecord(HoodieSchema.fromAvroSchema(schema), new Properties()).get()).getData();
GenericRecord genericRecord = (GenericRecord) (record.toIndexedRecord(HoodieAvroSchemaCache.intern(schema), new Properties()).get()).getData();
List<Object> list = new ArrayList<>();
for (String col : columns) {
list.add(HoodieAvroUtils.getNestedFieldVal(genericRecord, col, true, consistentLogicalTimestampEnabled));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.hudi.avro.MercifulJsonConverter;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieAvroSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.util.FileIOUtils;
Expand Down Expand Up @@ -65,7 +65,7 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord oldRec, Sche
@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
return Option.of(jsonConverter.convert(getJsonData(), HoodieSchema.fromAvroSchema(schema)));
return Option.of(jsonConverter.convert(getJsonData(), HoodieAvroSchemaCache.intern(schema)));
}

private String getJsonData() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.schema;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.avro.Schema;

/**
* A global cache mapping Avro {@link Schema} instances to their canonical {@link HoodieSchema}.
*
* <p>This is an Avro-schema-keyed view onto {@link HoodieSchemaCache} for per-record call sites:
* {@code weakKeys} gives identity-based lookups (records of one file share the same {@link Schema}
* instance), so the hot path is a single cache hit with no wrapper allocation or type dispatch.
* Misses convert and then value-intern through {@link HoodieSchemaCache}, so equal but distinct Avro
* schema instances still converge on one canonical {@link HoodieSchema}.
*
* <p>This is a global cache which works for a JVM lifecycle.
*/
public class HoodieAvroSchemaCache {

private static final LoadingCache<Schema, HoodieSchema> AVRO_SCHEMA_CACHE =
Comment thread
voonhous marked this conversation as resolved.
Caffeine.newBuilder().weakKeys().maximumSize(1024)
.build(avroSchema -> HoodieSchemaCache.intern(HoodieSchema.fromAvroSchema(avroSchema)));

/**
* Returns the canonical {@link HoodieSchema} wrapping the given Avro schema, converting and
* interning it on first use.
*
* @param avroSchema Avro schema to look up
* @return the canonical HoodieSchema for the given Avro schema
*/
public static HoodieSchema intern(Schema avroSchema) {
return AVRO_SCHEMA_CACHE.get(avroSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,11 @@ private static void addVectorColumnName(String s, int start, int end, Set<String

private Schema avroSchema;
private HoodieSchemaType type;
private transient List<HoodieSchemaField> fields;
private transient Map<String, HoodieSchemaField> fieldMap;
// interned instances are shared across threads, so the lazily built caches use a benign racy
// single-check (see getFields()/getFieldMap()): lock-free volatile reads, and volatile gives
// safe publication of the immutable, deterministic result
private transient volatile List<HoodieSchemaField> fields;
private transient volatile Map<String, HoodieSchemaField> fieldMap;

// Register the Variant logical type with Avro
static {
Expand Down Expand Up @@ -1152,6 +1155,8 @@ public List<HoodieSchemaField> getFields() {
if (!hasFields()) {
throw new IllegalStateException("Cannot get fields from schema type: " + type);
}
// Benign race: the result is an immutable, deterministic view of avroSchema's fields, so concurrent
// callers may each build it once but converge on equal lists; the volatile field makes publication safe.
if (fields == null) {
fields = Collections.unmodifiableList(avroSchema.getFields().stream().map(HoodieSchemaField::new).collect(Collectors.toList()));
}
Expand Down Expand Up @@ -1195,9 +1200,10 @@ public Option<HoodieSchemaField> getField(String name) {
}

private Map<String, HoodieSchemaField> getFieldMap() {
// Benign race, same rationale as getFields(): deterministic immutable result, volatile for safe publication.
if (fieldMap == null) {
fieldMap = getFields().stream()
.collect(Collectors.toMap(HoodieSchemaField::name, field -> field));
fieldMap = Collections.unmodifiableMap(getFields().stream()
.collect(Collectors.toMap(HoodieSchemaField::name, field -> field)));
}
return fieldMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.schema.HoodieAvroSchemaCache;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaCache;
import org.apache.hudi.common.util.CollectionUtils;
Expand Down Expand Up @@ -507,9 +508,11 @@ public byte[] getBytes(Schema schema) throws IOException {
output.writeInt(records.size());

// 3. Write the records
// schema is loop-invariant; intern it once (shared, cached) instead of rebuilding the HoodieSchema per record
HoodieSchema hoodieSchema = HoodieAvroSchemaCache.intern(schema);
Iterator<HoodieRecord<?>> itr = records.iterator();
while (itr.hasNext()) {
IndexedRecord s = itr.next().toIndexedRecord(HoodieSchema.fromAvroSchema(schema), new Properties()).get().getData();
IndexedRecord s = itr.next().toIndexedRecord(hoodieSchema, new Properties()).get().getData();
ByteArrayOutputStream temp = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(temp, encoderCache.get());
encoderCache.set(encoder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@

package org.apache.hudi.avro;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.stream.Stream;

import static org.apache.hudi.avro.AvroRecordContext.getFieldValueFromIndexedRecord;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

class TestAvroRecordContext {

Expand All @@ -44,4 +51,65 @@ void testConvertValueToEngineType(Comparable input, Comparable expected) {
Comparable actual = AvroRecordContext.getFieldAccessorInstance().convertValueToEngineType(input);
assertEquals(expected, actual);
}

private static final Schema RECORD_SCHEMA = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"top\",\"fields\":["
+ "{\"name\":\"id\",\"type\":\"int\"},"
+ "{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},"
+ "{\"name\":\"address\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"address\",\"fields\":["
+ "{\"name\":\"city\",\"type\":\"string\"},"
+ "{\"name\":\"zip\",\"type\":[\"null\",\"int\"],\"default\":null}]}],\"default\":null},"
+ "{\"name\":\"multi\",\"type\":[\"null\",\"string\",\"int\"],\"default\":null}]}");

private static GenericRecord buildRecord() {
GenericRecord address = new GenericData.Record(RECORD_SCHEMA.getField("address").schema().getTypes().get(1));
address.put("city", new Utf8("sf"));
address.put("zip", 94105);
GenericRecord record = new GenericData.Record(RECORD_SCHEMA);
record.put("id", 1);
record.put("name", new Utf8("alice"));
record.put("address", address);
return record;
}

@Test
void testGetFieldValueTopLevel() {
GenericRecord record = buildRecord();
assertEquals(1, getFieldValueFromIndexedRecord(record, "id"));
assertEquals(new Utf8("alice"), getFieldValueFromIndexedRecord(record, "name"));
assertNull(getFieldValueFromIndexedRecord(record, "multi"));
assertNull(getFieldValueFromIndexedRecord(record, "missing"));
}

@Test
void testGetFieldValueNested() {
GenericRecord record = buildRecord();
// intermediate segment unwraps the [null, record] union
assertEquals(new Utf8("sf"), getFieldValueFromIndexedRecord(record, "address.city"));
assertEquals(94105, getFieldValueFromIndexedRecord(record, "address.zip"));
assertNull(getFieldValueFromIndexedRecord(record, "address.missing"));
assertNull(getFieldValueFromIndexedRecord(record, "missing.nested"));
}

@Test
void testGetFieldValueErrorCases() {
GenericRecord record = buildRecord();
// a union that is not [null, T] does not support field lookups
assertThrows(IllegalStateException.class, () -> getFieldValueFromIndexedRecord(record, "multi.sub"));
assertThrows(IllegalArgumentException.class, () -> getFieldValueFromIndexedRecord(record, ""));
}

@Test
void testGetFieldValueAcrossEqualSchemaInstances() {
// records from different files carry equal but distinct schema instances; both must intern to
// the same canonical wrapper and resolve identically
Schema schemaCopy = new Schema.Parser().parse(RECORD_SCHEMA.toString());
GenericRecord record = buildRecord();
GenericRecord recordWithCopy = new GenericData.Record(schemaCopy);
for (Schema.Field field : RECORD_SCHEMA.getFields()) {
recordWithCopy.put(field.pos(), record.get(field.pos()));
}
assertEquals(getFieldValueFromIndexedRecord(record, "id"), getFieldValueFromIndexedRecord(recordWithCopy, "id"));
assertEquals(getFieldValueFromIndexedRecord(record, "address.city"), getFieldValueFromIndexedRecord(recordWithCopy, "address.city"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.schema.HoodieAvroSchemaCache;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.read.BufferedRecord;
Expand Down Expand Up @@ -204,8 +205,8 @@ private Option<HoodieAvroIndexedRecord> mergeRecord(HoodieRecord<?> newRecord, A
// once presto on hudi have its own mor reader, we can remove the rewrite logical.
GenericRecord genericRecord = HiveAvroSerializer.rewriteRecordIgnoreResultCheck(oldRecord, getLogScannerReaderSchema());
RecordContext<IndexedRecord> recordContext = AvroRecordContext.getFieldAccessorInstance();
BufferedRecord record = BufferedRecords.fromEngineRecord(genericRecord, HoodieSchema.fromAvroSchema(genericRecord.getSchema()), recordContext, orderingFields, newRecord.getRecordKey(), false);
BufferedRecord newBufferedRecord = BufferedRecords.fromHoodieRecord(newRecord, HoodieSchema.fromAvroSchema(getLogScannerReaderSchema().toAvroSchema()),
BufferedRecord record = BufferedRecords.fromEngineRecord(genericRecord, HoodieAvroSchemaCache.intern(genericRecord.getSchema()), recordContext, orderingFields, newRecord.getRecordKey(), false);
BufferedRecord newBufferedRecord = BufferedRecords.fromHoodieRecord(newRecord, HoodieAvroSchemaCache.intern(getLogScannerReaderSchema().toAvroSchema()),
recordContext, payloadProps, orderingFields, deleteContext);
BufferedRecord mergeResult = merger.merge(record, newBufferedRecord, recordContext, payloadProps);
if (mergeResult.isDelete()) {
Expand Down
Loading
Loading