Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,9 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
SerDeHelper.parseSchemas(historySchemaStr));
}
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(schema.toAvroSchema(), internalSchema, config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(schema.toAvroSchema(), internalSchema,
config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS),
config.getBooleanOrDefault(HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION));
if (evolvedSchema.equals(internalSchema)) {
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(evolvedSchema));
//TODO save history schema by metaTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ public void doMerge() {
boolean usePosition = config.getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
Option<InternalSchema> internalSchemaOption = SerDeHelper.fromJson(config.getInternalSchema())
.map(internalSchema -> AvroSchemaEvolutionUtils.reconcileSchema(writeSchemaWithMetaFields.toAvroSchema(), internalSchema,
config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)));
config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS),
config.getBooleanOrDefault(HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION)));
long maxMemoryPerCompaction = getMaxMemoryForMerge();
props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), String.valueOf(maxMemoryPerCompaction));
Option<Stream<HoodieLogFile>> logFilesStreamOpt = compactionOperation.map(op -> op.getDeltaFileNames().stream().map(logFileName ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ private Option<Function<HoodieRecord, HoodieRecord>> composeSchemaEvolutionTrans
if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) {
// check implicitly add columns, and position reorder(spark sql may change cols order)
InternalSchema querySchema = AvroSchemaEvolutionUtils.reconcileSchema(writerSchema.toAvroSchema(),
querySchemaOpt.get(), writeConfig.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
querySchemaOpt.get(), writeConfig.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS),
writeConfig.getBooleanOrDefault(HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION));
long commitInstantTime = Long.parseLong(baseFile.getCommitTime());
InternalSchema fileSchema = InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, metaClient);
if (fileSchema.isEmptySchema() && writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ public class HoodieCommonConfig extends HoodieConfig {
+ " operation will fail schema compatibility check. Set this option to true will make the missing "
+ " column be filled with null values to successfully complete the write operation.");

public static final ConfigProperty<Boolean> ALLOW_TIMESTAMP_PRECISION_EVOLUTION = ConfigProperty
.key("hoodie.write.schema.allow.timestamp.precision.evolution")
.defaultValue(false)
.markAdvanced()
.sinceVersion("1.3.0")
.withDocumentation("Controls whether schema evolution may change a column between timestamp-millis and "

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Should the docs make it explicit that this only relabels the logical type — it does not rewrite stored values? Since old base files keep their original encoding, enabling the gate when actual values don't match the new precision can lead to misinterpretation on read. A one-liner like "Stored values are not rewritten; only enable when the actual values already match the target precision" would steer users away from misuse.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

+ "timestamp-micros (and between the local-timestamp variants). Default false rejects such changes. "
+ "Set to true to permit precision-only evolution between these logical types.");

public static final ConfigProperty<ExternalSpillableMap.DiskMapType> SPILLABLE_DISK_MAP_TYPE = ConfigProperty
.key("hoodie.common.spillable.diskmap.type")
.defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,19 @@ public static class ColumnUpdateChange extends TableChange.BaseColumnChange {

@Getter
private final Map<Integer, Types.Field> updates = new HashMap<>();
private final boolean allowTimestampPrecisionEvolution;

private ColumnUpdateChange(InternalSchema schema) {
super(schema, false);
this(schema, false, false);
}

private ColumnUpdateChange(InternalSchema schema, boolean caseSensitive) {
this(schema, caseSensitive, false);
}

private ColumnUpdateChange(InternalSchema schema, boolean caseSensitive, boolean allowTimestampPrecisionEvolution) {
super(schema, caseSensitive);
this.allowTimestampPrecisionEvolution = allowTimestampPrecisionEvolution;
}

@Override
Expand Down Expand Up @@ -96,7 +102,7 @@ public ColumnUpdateChange updateColumnType(String name, Type newType) {
throw new SchemaCompatibilityException(String.format("Cannot update type for column '%s' because it does not exist in the schema", name));
}

if (!SchemaChangeUtils.isTypeUpdateAllow(field.type(), newType)) {
if (!SchemaChangeUtils.isTypeUpdateAllow(field.type(), newType, allowTimestampPrecisionEvolution)) {
throw new SchemaCompatibilityException(String.format(
"Cannot update column '%s' from type '%s' to incompatible type '%s'.", name, field.type(), newType));
}
Expand Down Expand Up @@ -238,6 +244,10 @@ public static ColumnUpdateChange get(InternalSchema schema) {
public static ColumnUpdateChange get(InternalSchema schema, boolean caseSensitive) {
return new ColumnUpdateChange(schema, caseSensitive);
}

public static ColumnUpdateChange get(InternalSchema schema, boolean caseSensitive, boolean allowTimestampPrecisionEvolution) {
return new ColumnUpdateChange(schema, caseSensitive, allowTimestampPrecisionEvolution);
}
}

/** Deal with delete columns changes for table. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ public class AvroSchemaEvolutionUtils {
* @return reconcile Schema
*/
public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSchema oldTableSchema, boolean makeMissingFieldsNullable) {
return reconcileSchema(incomingSchema, oldTableSchema, makeMissingFieldsNullable, false);
}

/**
* Variant that allows opting in to timestamp-millis <-> timestamp-micros (and the local-timestamp
* variants) precision evolution, which is rejected by default.
*/
public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSchema oldTableSchema,
boolean makeMissingFieldsNullable, boolean allowTimestampPrecisionEvolution) {
/* If incoming schema is null, we fall back on table schema. */
if (incomingSchema.getType() == Schema.Type.NULL) {
return oldTableSchema;
Expand Down Expand Up @@ -119,7 +128,8 @@ public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSche

// do type evolution.
InternalSchema internalSchemaAfterAddColumns = SchemaChangeUtils.applyTableChanges2Schema(oldTableSchema, addChange);
TableChanges.ColumnUpdateChange typeChange = TableChanges.ColumnUpdateChange.get(internalSchemaAfterAddColumns);
TableChanges.ColumnUpdateChange typeChange = TableChanges.ColumnUpdateChange.get(
internalSchemaAfterAddColumns, false, allowTimestampPrecisionEvolution);
typeChangeColumns.stream().filter(f -> !inComingInternalSchema.findType(f).isNestedType()).forEach(col -> {
typeChange.updateColumnType(col, inComingInternalSchema.findType(col));
});
Expand Down Expand Up @@ -150,7 +160,13 @@ public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSche
}

public static Schema reconcileSchema(Schema incomingSchema, Schema oldTableSchema, boolean makeMissingFieldsNullable) {
return convert(reconcileSchema(incomingSchema, convert(HoodieSchema.fromAvroSchema(oldTableSchema)), makeMissingFieldsNullable), oldTableSchema.getFullName()).toAvroSchema();
return reconcileSchema(incomingSchema, oldTableSchema, makeMissingFieldsNullable, false);
}

public static Schema reconcileSchema(Schema incomingSchema, Schema oldTableSchema, boolean makeMissingFieldsNullable,
boolean allowTimestampPrecisionEvolution) {
return convert(reconcileSchema(incomingSchema, convert(HoodieSchema.fromAvroSchema(oldTableSchema)), makeMissingFieldsNullable, allowTimestampPrecisionEvolution),
oldTableSchema.getFullName()).toAvroSchema();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,39 @@ public class SchemaChangeUtils {
* @param dst new column type.
* @return whether to allow the column type to be updated.
*/
public static boolean isTypeUpdateAllow(Type src, Type dst) {
/**

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The PR description notes "Pre-existing overloads kept as delegates," which is followed for reconcileSchema and ColumnUpdateChange.get, but here the 2-arg isTypeUpdateAllow(Type, Type) is replaced rather than kept as a delegate. Could you preserve the original 2-arg signature as a thin delegate (return isTypeUpdateAllow(src, dst, false);)? This class is a public utility in internal.schema.utils and could plausibly be called by downstream tooling.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

* Variant that allows opting in to timestamp-millis <-> timestamp-micros (and the local-timestamp
* variants) precision evolution, which is rejected by default.
*/
public static boolean isTypeUpdateAllow(Type src, Type dst, boolean allowTimestampPrecisionEvolution) {
if (src.isNestedType() || dst.isNestedType()) {
throw new IllegalArgumentException("only support update primitive type");
}
if (src.equals(dst)) {
return true;
}
return isTypeUpdateAllowInternal(src, dst);
return isTypeUpdateAllowInternal(src, dst, allowTimestampPrecisionEvolution);
}

public static boolean shouldPromoteType(Type src, Type dst) {
if (src.equals(dst) || src.isNestedType() || dst.isNestedType()) {
return false;
}
return isTypeUpdateAllowInternal(src, dst);
return isTypeUpdateAllowInternal(src, dst, false);
}

private static boolean isTypeUpdateAllowInternal(Type src, Type dst) {
private static boolean isTypeUpdateAllowInternal(Type src, Type dst, boolean allowTimestampPrecisionEvolution) {
switch (src.typeId()) {
case INT:
return dst == Types.LongType.get() || dst == Types.FloatType.get()
|| dst == Types.DoubleType.get() || dst == Types.StringType.get() || dst.typeId() == Type.TypeID.DECIMAL || dst.typeId() == Type.TypeID.DECIMAL_FIXED;
case LONG:
if (allowTimestampPrecisionEvolution
&& (dst.typeId() == Type.TypeID.LOCAL_TIMESTAMP_MILLIS || dst.typeId() == Type.TypeID.LOCAL_TIMESTAMP_MICROS)) {
// Forward-fix path: 0.x stored local-timestamp columns as bare long because its converter
// did not recognize the logical type. Allow attaching the logical type when the gate is open.
return true;
}
return dst == Types.FloatType.get() || dst == Types.DoubleType.get() || dst == Types.StringType.get() || dst.typeId() == Type.TypeID.DECIMAL || dst.typeId() == Type.TypeID.DECIMAL_FIXED;
case FLOAT:
return dst == Types.DoubleType.get() || dst == Types.StringType.get() || dst.typeId() == Type.TypeID.DECIMAL || dst.typeId() == Type.TypeID.DECIMAL_FIXED;
Expand All @@ -90,6 +100,18 @@ private static boolean isTypeUpdateAllowInternal(Type src, Type dst) {
return isDecimalFixedUpdateAllowInternal(src, dst);
case STRING:
return dst == Types.DateType.get() || dst.typeId() == Type.TypeID.DECIMAL || dst.typeId() == Type.TypeID.DECIMAL_FIXED || dst == Types.BinaryType.get();
case TIMESTAMP:
case TIMESTAMP_MILLIS:
if (!allowTimestampPrecisionEvolution) {
return false;
}
return dst.typeId() == Type.TypeID.TIMESTAMP || dst.typeId() == Type.TypeID.TIMESTAMP_MILLIS;
case LOCAL_TIMESTAMP_MILLIS:
case LOCAL_TIMESTAMP_MICROS:
if (!allowTimestampPrecisionEvolution) {
return false;
}
return dst.typeId() == Type.TypeID.LOCAL_TIMESTAMP_MILLIS || dst.typeId() == Type.TypeID.LOCAL_TIMESTAMP_MICROS;
default:
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,4 +567,64 @@ public void testNotEvolveSchemaIfReconciledSchemaUnchanged() {
// the evolved schema should be the old table schema, since there is no type change at all.
Assertions.assertEquals(oldInternalSchema, evolvedSchema);
}

@Test
public void testReconcileSchemaTimestampPrecisionEvolution() {
// Strict by default: changing a column's logical type between timestamp-millis and timestamp-micros throws.
// Opt-in via the allowTimestampPrecisionEvolution=true overload permits the precision change.
Schema tableSchemaMicros = Schema.createRecord("trip", null, null, false, Arrays.asList(
new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
new Schema.Field("ts", LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)), null, null)));
Schema incomingSchemaMillis = Schema.createRecord("trip", null, null, false, Arrays.asList(
new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
new Schema.Field("ts", LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)), null, null)));

// Default strict behavior rejects the precision change in either direction.
Throwable rejectedMicrosToMillis = assertThrows(Exception.class,
() -> AvroSchemaEvolutionUtils.reconcileSchema(incomingSchemaMillis, tableSchemaMicros, false, false));
assertTrue(rejectedMicrosToMillis.getMessage().contains("incompatible type"));
Throwable rejectedMillisToMicros = assertThrows(Exception.class,
() -> AvroSchemaEvolutionUtils.reconcileSchema(tableSchemaMicros, incomingSchemaMillis, false, false));
assertTrue(rejectedMillisToMicros.getMessage().contains("incompatible type"));

// With the gate opened, both directions succeed and produce a schema labeled with the incoming precision.
Schema reconciledToMillis = AvroSchemaEvolutionUtils.reconcileSchema(
incomingSchemaMillis, tableSchemaMicros, false, true);
Assertions.assertEquals("timestamp-millis", reconciledToMillis.getField("ts").schema().getLogicalType().getName());

Schema reconciledToMicros = AvroSchemaEvolutionUtils.reconcileSchema(
tableSchemaMicros, incomingSchemaMillis, false, true);
Assertions.assertEquals("timestamp-micros", reconciledToMicros.getField("ts").schema().getLogicalType().getName());

// The same gate applies to the local-timestamp variants.
Schema tableLocalMicros = Schema.createRecord("trip", null, null, false, Arrays.asList(
new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
new Schema.Field("ts", LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG)), null, null)));
Schema incomingLocalMillis = Schema.createRecord("trip", null, null, false, Arrays.asList(
new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
new Schema.Field("ts", LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG)), null, null)));
assertThrows(Exception.class,
() -> AvroSchemaEvolutionUtils.reconcileSchema(incomingLocalMillis, tableLocalMicros, false, false));
Schema reconciledLocal = AvroSchemaEvolutionUtils.reconcileSchema(
incomingLocalMillis, tableLocalMicros, false, true);
Assertions.assertEquals("local-timestamp-millis", reconciledLocal.getField("ts").schema().getLogicalType().getName());

// 0.x did not recognize the local-timestamp logical types, so affected tables persisted those
// columns as bare long. The gate must also allow attaching the logical type on forward-fix.
Schema tableBareLong = Schema.createRecord("trip", null, null, false, Arrays.asList(
new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
new Schema.Field("ts", Schema.create(Schema.Type.LONG), null, null)));
assertThrows(Exception.class,
() -> AvroSchemaEvolutionUtils.reconcileSchema(incomingLocalMillis, tableBareLong, false, false));
Schema repairedToLocalMillis = AvroSchemaEvolutionUtils.reconcileSchema(
incomingLocalMillis, tableBareLong, false, true);
Assertions.assertEquals("local-timestamp-millis", repairedToLocalMillis.getField("ts").schema().getLogicalType().getName());

Schema incomingLocalMicros = Schema.createRecord("trip", null, null, false, Arrays.asList(
new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null),
new Schema.Field("ts", LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG)), null, null)));
Schema repairedToLocalMicros = AvroSchemaEvolutionUtils.reconcileSchema(
incomingLocalMicros, tableBareLong, false, true);
Assertions.assertEquals("local-timestamp-micros", repairedToLocalMicros.getField("ts").schema().getLogicalType().getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,15 @@ object HoodieSchemaUtils {
HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean
val setNullForMissingColumns = opts.getOrElse(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.key(),
DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.defaultValue).toBoolean
val allowTimestampPrecisionEvolution = opts.getOrElse(HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION.key,
HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION.defaultValue.toString).toBoolean

if (!mergeIntoWrites && !shouldValidateSchemasCompatibility && !allowAutoEvolutionColumnDrop) {
// Default behaviour
val reconciledSchema = if (setNullForMissingColumns) {
HoodieSchema.fromAvroSchema(AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema.toAvroSchema(), latestTableSchema.toAvroSchema(), setNullForMissingColumns))
HoodieSchema.fromAvroSchema(AvroSchemaEvolutionUtils.reconcileSchema(
canonicalizedSourceSchema.toAvroSchema(), latestTableSchema.toAvroSchema(),
setNullForMissingColumns, allowTimestampPrecisionEvolution))
} else {
canonicalizedSourceSchema
}
Expand Down Expand Up @@ -205,7 +209,10 @@ object HoodieSchemaUtils {
// Apply schema evolution, by auto-merging write schema and read schema
val setNullForMissingColumns = opts.getOrElse(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key(),
HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.defaultValue()).toBoolean
val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema.toAvroSchema(), internalSchema, setNullForMissingColumns)
val allowTimestampPrecisionEvolution = opts.getOrElse(HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION.key,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: allowTimestampPrecisionEvolution is already declared from opts at the outer scope a few dozen lines above — could you just reference that val here instead of re-reading the same key with the same default?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION.defaultValue.toString).toBoolean
val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema.toAvroSchema(), internalSchema,
setNullForMissingColumns, allowTimestampPrecisionEvolution)
val evolvedSchema = InternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getFullName)
val shouldRemoveMetaDataFromInternalSchema = sourceSchema.getFields.asScala.filter(f => f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty
if (shouldRemoveMetaDataFromInternalSchema) HoodieCommonSchemaUtils.removeMetadataFields(evolvedSchema) else evolvedSchema
Expand Down
Loading
Loading