Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,9 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
SerDeHelper.parseSchemas(historySchemaStr));
}
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(schema.toAvroSchema(), internalSchema, config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(schema.toAvroSchema(), internalSchema,
config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS),
config.getBooleanOrDefault(HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION));
if (evolvedSchema.equals(internalSchema)) {
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(evolvedSchema));
//TODO save history schema by metaTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ public void doMerge() {
boolean usePosition = config.getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
Option<InternalSchema> internalSchemaOption = SerDeHelper.fromJson(config.getInternalSchema())
.map(internalSchema -> AvroSchemaEvolutionUtils.reconcileSchema(writeSchemaWithMetaFields.toAvroSchema(), internalSchema,
config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)));
config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS),
config.getBooleanOrDefault(HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION)));
long maxMemoryPerCompaction = getMaxMemoryForMerge();
props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), String.valueOf(maxMemoryPerCompaction));
Option<Stream<HoodieLogFile>> logFilesStreamOpt = compactionOperation.map(op -> op.getDeltaFileNames().stream().map(logFileName ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ private Option<Function<HoodieRecord, HoodieRecord>> composeSchemaEvolutionTrans
if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) {
// check implicitly add columns, and position reorder(spark sql may change cols order)
InternalSchema querySchema = AvroSchemaEvolutionUtils.reconcileSchema(writerSchema.toAvroSchema(),
querySchemaOpt.get(), writeConfig.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
querySchemaOpt.get(), writeConfig.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS),
writeConfig.getBooleanOrDefault(HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION));
long commitInstantTime = Long.parseLong(baseFile.getCommitTime());
InternalSchema fileSchema = InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, metaClient);
if (fileSchema.isEmptySchema() && writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ public class HoodieCommonConfig extends HoodieConfig {
+ " operation will fail schema compatibility check. Set this option to true will make the missing "
+ " column be filled with null values to successfully complete the write operation.");

public static final ConfigProperty<Boolean> ALLOW_TIMESTAMP_PRECISION_EVOLUTION = ConfigProperty
.key("hoodie.write.schema.allow.timestamp.precision.evolution")
.defaultValue(false)
.markAdvanced()
.sinceVersion("1.3.0")
.withDocumentation("Controls whether schema evolution may change a column between timestamp-millis and "

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This omits the third behavior the flag gates: attaching a logical type to a bare long (long -> local-timestamp-millis/micros), the logical-type-loss repair the PR description calls a primary motivation. A user whose 0.x table stored the column as bare long would not learn from "precision-only evolution between these logical types" that this flag applies. Suggest documenting the long -> local-timestamp attach case explicitly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long -> local-timestamp is backward compatible for readers. The logical type local-timestamp is now supported on master, so we can keep this part out for simplicity.

+ "timestamp-micros (and between the local-timestamp variants). Default false rejects such changes. "
+ "Set to true to permit precision-only evolution between these logical types.");

public static final ConfigProperty<ExternalSpillableMap.DiskMapType> SPILLABLE_DISK_MAP_TYPE = ConfigProperty
.key("hoodie.common.spillable.diskmap.type")
.defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,11 @@ public static class ColumnUpdateChange extends TableChange.BaseColumnChange {

@Getter
private final Map<Integer, Types.Field> updates = new HashMap<>();
private final boolean allowTimestampPrecisionEvolution;

private ColumnUpdateChange(InternalSchema schema) {
super(schema, false);
}

private ColumnUpdateChange(InternalSchema schema, boolean caseSensitive) {
private ColumnUpdateChange(InternalSchema schema, boolean caseSensitive, boolean allowTimestampPrecisionEvolution) {
super(schema, caseSensitive);
this.allowTimestampPrecisionEvolution = allowTimestampPrecisionEvolution;
}

@Override
Expand Down Expand Up @@ -96,7 +94,7 @@ public ColumnUpdateChange updateColumnType(String name, Type newType) {
throw new SchemaCompatibilityException(String.format("Cannot update type for column '%s' because it does not exist in the schema", name));
}

if (!SchemaChangeUtils.isTypeUpdateAllow(field.type(), newType)) {
if (!SchemaChangeUtils.isTypeUpdateAllow(field.type(), newType, allowTimestampPrecisionEvolution)) {
throw new SchemaCompatibilityException(String.format(
"Cannot update column '%s' from type '%s' to incompatible type '%s'.", name, field.type(), newType));
}
Expand Down Expand Up @@ -232,11 +230,11 @@ protected Integer findIdByFullName(String fullName) {
}

public static ColumnUpdateChange get(InternalSchema schema) {
return new ColumnUpdateChange(schema);
return new ColumnUpdateChange(schema, false, false);
}

public static ColumnUpdateChange get(InternalSchema schema, boolean caseSensitive) {
return new ColumnUpdateChange(schema, caseSensitive);
public static ColumnUpdateChange get(InternalSchema schema, boolean caseSensitive, boolean allowTimestampPrecisionEvolution) {
return new ColumnUpdateChange(schema, caseSensitive, allowTimestampPrecisionEvolution);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public class AvroSchemaEvolutionUtils {
* nullable in the result. Otherwise, no updates will be made to those fields.
* @return reconcile Schema
*/
public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSchema oldTableSchema, boolean makeMissingFieldsNullable) {
public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSchema oldTableSchema,
boolean makeMissingFieldsNullable, boolean allowTimestampPrecisionEvolution) {
/* If incoming schema is null, we fall back on table schema. */
if (incomingSchema.getType() == Schema.Type.NULL) {
return oldTableSchema;
Expand Down Expand Up @@ -119,7 +120,8 @@ public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSche

// do type evolution.
InternalSchema internalSchemaAfterAddColumns = SchemaChangeUtils.applyTableChanges2Schema(oldTableSchema, addChange);
TableChanges.ColumnUpdateChange typeChange = TableChanges.ColumnUpdateChange.get(internalSchemaAfterAddColumns);
TableChanges.ColumnUpdateChange typeChange = TableChanges.ColumnUpdateChange.get(
internalSchemaAfterAddColumns, false, allowTimestampPrecisionEvolution);
typeChangeColumns.stream().filter(f -> !inComingInternalSchema.findType(f).isNestedType()).forEach(col -> {
typeChange.updateColumnType(col, inComingInternalSchema.findType(col));
});
Expand Down Expand Up @@ -149,8 +151,10 @@ public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSche
return evolvedSchema;
}

public static Schema reconcileSchema(Schema incomingSchema, Schema oldTableSchema, boolean makeMissingFieldsNullable) {
return convert(reconcileSchema(incomingSchema, convert(HoodieSchema.fromAvroSchema(oldTableSchema)), makeMissingFieldsNullable), oldTableSchema.getFullName()).toAvroSchema();
public static Schema reconcileSchema(Schema incomingSchema, Schema oldTableSchema, boolean makeMissingFieldsNullable,
boolean allowTimestampPrecisionEvolution) {
return convert(reconcileSchema(incomingSchema, convert(HoodieSchema.fromAvroSchema(oldTableSchema)), makeMissingFieldsNullable, allowTimestampPrecisionEvolution),
oldTableSchema.getFullName()).toAvroSchema();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,35 @@ public class SchemaChangeUtils {
* @param dst new column type.
* @return whether to allow the column type to be updated.
*/
public static boolean isTypeUpdateAllow(Type src, Type dst) {
public static boolean isTypeUpdateAllow(Type src, Type dst, boolean allowTimestampPrecisionEvolution) {
if (src.isNestedType() || dst.isNestedType()) {
throw new IllegalArgumentException("only support update primitive type");
}
if (src.equals(dst)) {
return true;
}
return isTypeUpdateAllowInternal(src, dst);
return isTypeUpdateAllowInternal(src, dst, allowTimestampPrecisionEvolution);
}

public static boolean shouldPromoteType(Type src, Type dst) {
if (src.equals(dst) || src.isNestedType() || dst.isNestedType()) {
return false;
}
return isTypeUpdateAllowInternal(src, dst);
return isTypeUpdateAllowInternal(src, dst, false);
}

private static boolean isTypeUpdateAllowInternal(Type src, Type dst) {
private static boolean isTypeUpdateAllowInternal(Type src, Type dst, boolean allowTimestampPrecisionEvolution) {
switch (src.typeId()) {
case INT:
return dst == Types.LongType.get() || dst == Types.FloatType.get()
|| dst == Types.DoubleType.get() || dst == Types.StringType.get() || dst.typeId() == Type.TypeID.DECIMAL || dst.typeId() == Type.TypeID.DECIMAL_FIXED;
case LONG:
if (allowTimestampPrecisionEvolution
&& (dst.typeId() == Type.TypeID.LOCAL_TIMESTAMP_MILLIS || dst.typeId() == Type.TypeID.LOCAL_TIMESTAMP_MICROS)) {
// Forward-fix path: 0.x stored local-timestamp columns as bare long because its converter
// did not recognize the logical type. Allow attaching the logical type when the gate is open.
return true;
}
return dst == Types.FloatType.get() || dst == Types.DoubleType.get() || dst == Types.StringType.get() || dst.typeId() == Type.TypeID.DECIMAL || dst.typeId() == Type.TypeID.DECIMAL_FIXED;
case FLOAT:
return dst == Types.DoubleType.get() || dst == Types.StringType.get() || dst.typeId() == Type.TypeID.DECIMAL || dst.typeId() == Type.TypeID.DECIMAL_FIXED;
Expand All @@ -90,6 +96,18 @@ private static boolean isTypeUpdateAllowInternal(Type src, Type dst) {
return isDecimalFixedUpdateAllowInternal(src, dst);
case STRING:
return dst == Types.DateType.get() || dst.typeId() == Type.TypeID.DECIMAL || dst.typeId() == Type.TypeID.DECIMAL_FIXED || dst == Types.BinaryType.get();
case TIMESTAMP:
case TIMESTAMP_MILLIS:
if (!allowTimestampPrecisionEvolution) {
return false;
}
return dst.typeId() == Type.TypeID.TIMESTAMP || dst.typeId() == Type.TypeID.TIMESTAMP_MILLIS;
case LOCAL_TIMESTAMP_MILLIS:
case LOCAL_TIMESTAMP_MICROS:
if (!allowTimestampPrecisionEvolution) {
return false;
}
return dst.typeId() == Type.TypeID.LOCAL_TIMESTAMP_MILLIS || dst.typeId() == Type.TypeID.LOCAL_TIMESTAMP_MICROS;
default:
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ public void testEvolutionSchemaFromNewAvroSchema() {
);
evolvedRecord = (Types.RecordType)InternalSchemaBuilder.getBuilder().refreshNewId(evolvedRecord, new AtomicInteger(0));
HoodieSchema evolvedSchema = InternalSchemaConverter.convert(evolvedRecord, "test1");
InternalSchema result = AvroSchemaEvolutionUtils.reconcileSchema(evolvedSchema.getAvroSchema(), oldSchema, false);
InternalSchema result = AvroSchemaEvolutionUtils.reconcileSchema(evolvedSchema.getAvroSchema(), oldSchema, false, false);
Types.RecordType checkedRecord = Types.RecordType.get(
Types.Field.get(0, false, "id", Types.IntType.get()),
Types.Field.get(1, true, "data", Types.StringType.get()),
Expand Down Expand Up @@ -541,7 +541,7 @@ public void testReconcileSchema() {
+ "{\"name\":\"d2\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}]}");

HoodieSchema simpleReconcileSchema = InternalSchemaConverter.convert(AvroSchemaEvolutionUtils
.reconcileSchema(incomingSchema.getAvroSchema(), InternalSchemaConverter.convert(schema), false), "schemaNameFallback");
.reconcileSchema(incomingSchema.getAvroSchema(), InternalSchemaConverter.convert(schema), false, false), "schemaNameFallback");
Assertions.assertEquals(simpleCheckSchema, simpleReconcileSchema);
}

Expand All @@ -563,8 +563,68 @@ public void testNotEvolveSchemaIfReconciledSchemaUnchanged() {
InternalSchema oldInternalSchema = InternalSchemaConverter.convert(oldSchema);
// set a non-default schema id for old table schema, e.g., 2.
oldInternalSchema.setSchemaId(2);
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(incomingSchema.getAvroSchema(), oldInternalSchema, false);
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(incomingSchema.getAvroSchema(), oldInternalSchema, false, false);
// the evolved schema should be the old table schema, since there is no type change at all.
Assertions.assertEquals(oldInternalSchema, evolvedSchema);
}

@Test
public void testReconcileSchemaTimestampPrecisionEvolution() {
// Strict by default: changing a column's logical type between timestamp-millis and timestamp-micros throws.
// Opt-in via the allowTimestampPrecisionEvolution=true overload permits the precision change.
Schema tableSchemaMicros = Schema.createRecord("trip", null, null, false, Arrays.asList(
new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
new Schema.Field("ts", LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)), null, null)));
Schema incomingSchemaMillis = Schema.createRecord("trip", null, null, false, Arrays.asList(
new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
new Schema.Field("ts", LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)), null, null)));

// Default strict behavior rejects the precision change in either direction.
Throwable rejectedMicrosToMillis = assertThrows(Exception.class,
() -> AvroSchemaEvolutionUtils.reconcileSchema(incomingSchemaMillis, tableSchemaMicros, false, false));
assertTrue(rejectedMicrosToMillis.getMessage().contains("incompatible type"));
Throwable rejectedMillisToMicros = assertThrows(Exception.class,
() -> AvroSchemaEvolutionUtils.reconcileSchema(tableSchemaMicros, incomingSchemaMillis, false, false));
assertTrue(rejectedMillisToMicros.getMessage().contains("incompatible type"));

// With the gate opened, both directions succeed and produce a schema labeled with the incoming precision.
Schema reconciledToMillis = AvroSchemaEvolutionUtils.reconcileSchema(
incomingSchemaMillis, tableSchemaMicros, false, true);
Assertions.assertEquals("timestamp-millis", reconciledToMillis.getField("ts").schema().getLogicalType().getName());

Schema reconciledToMicros = AvroSchemaEvolutionUtils.reconcileSchema(
tableSchemaMicros, incomingSchemaMillis, false, true);
Assertions.assertEquals("timestamp-micros", reconciledToMicros.getField("ts").schema().getLogicalType().getName());

// The same gate applies to the local-timestamp variants.
Schema tableLocalMicros = Schema.createRecord("trip", null, null, false, Arrays.asList(
new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
new Schema.Field("ts", LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG)), null, null)));
Schema incomingLocalMillis = Schema.createRecord("trip", null, null, false, Arrays.asList(
new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
new Schema.Field("ts", LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG)), null, null)));
assertThrows(Exception.class,
() -> AvroSchemaEvolutionUtils.reconcileSchema(incomingLocalMillis, tableLocalMicros, false, false));
Schema reconciledLocal = AvroSchemaEvolutionUtils.reconcileSchema(
incomingLocalMillis, tableLocalMicros, false, true);
Assertions.assertEquals("local-timestamp-millis", reconciledLocal.getField("ts").schema().getLogicalType().getName());

// 0.x did not recognize the local-timestamp logical types, so affected tables persisted those
// columns as bare long. The gate must also allow attaching the logical type on forward-fix.
Schema tableBareLong = Schema.createRecord("trip", null, null, false, Arrays.asList(
new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
new Schema.Field("ts", Schema.create(Schema.Type.LONG), null, null)));
assertThrows(Exception.class,
() -> AvroSchemaEvolutionUtils.reconcileSchema(incomingLocalMillis, tableBareLong, false, false));
Schema repairedToLocalMillis = AvroSchemaEvolutionUtils.reconcileSchema(
incomingLocalMillis, tableBareLong, false, true);
Assertions.assertEquals("local-timestamp-millis", repairedToLocalMillis.getField("ts").schema().getLogicalType().getName());

Schema incomingLocalMicros = Schema.createRecord("trip", null, null, false, Arrays.asList(
new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
new Schema.Field("ts", LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG)), null, null)));
Schema repairedToLocalMicros = AvroSchemaEvolutionUtils.reconcileSchema(
incomingLocalMicros, tableBareLong, false, true);
Assertions.assertEquals("local-timestamp-micros", repairedToLocalMicros.getField("ts").schema().getLogicalType().getName());
}
}
Loading
Loading