diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 14789a4c667df..7bc2aaebfa10b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -369,7 +369,9 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime), SerDeHelper.parseSchemas(historySchemaStr)); } - InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(schema.toAvroSchema(), internalSchema, config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)); + InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(schema.toAvroSchema(), internalSchema, + config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS), + config.getBooleanOrDefault(HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION)); if (evolvedSchema.equals(internalSchema)) { metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(evolvedSchema)); //TODO save history schema by metaTable diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java index 2893913a0d13a..b7ee7dacd9313 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java @@ -258,7 +258,8 @@ public void doMerge() { boolean usePosition = config.getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS); Option internalSchemaOption = SerDeHelper.fromJson(config.getInternalSchema()) .map(internalSchema -> AvroSchemaEvolutionUtils.reconcileSchema(writeSchemaWithMetaFields.toAvroSchema(), internalSchema, - config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS))); + config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS), + config.getBooleanOrDefault(HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION))); long maxMemoryPerCompaction = getMaxMemoryForMerge(); props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), String.valueOf(maxMemoryPerCompaction)); Option> logFilesStreamOpt = compactionOperation.map(op -> op.getDeltaFileNames().stream().map(logFileName -> diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index f887e4d5ac746..de11446ceed4c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -171,7 +171,8 @@ private Option> composeSchemaEvolutionTrans if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) { // check implicitly add columns, and position reorder(spark sql may change cols order) InternalSchema querySchema = AvroSchemaEvolutionUtils.reconcileSchema(writerSchema.toAvroSchema(), - querySchemaOpt.get(), writeConfig.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)); + querySchemaOpt.get(), writeConfig.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS), + writeConfig.getBooleanOrDefault(HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION)); long commitInstantTime = Long.parseLong(baseFile.getCommitTime()); InternalSchema fileSchema = InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, metaClient); if (fileSchema.isEmptySchema() && writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java index e488324af3412..91785dc0a2c49 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java @@ -83,6 +83,15 @@ public class HoodieCommonConfig extends HoodieConfig { + " operation will fail schema compatibility check. Set this option to true will make the missing " + " column be filled with null values to successfully complete the write operation."); + public static final ConfigProperty ALLOW_TIMESTAMP_PRECISION_EVOLUTION = ConfigProperty + .key("hoodie.write.schema.allow.timestamp.precision.evolution") + .defaultValue(false) + .markAdvanced() + .sinceVersion("1.3.0") + .withDocumentation("Controls whether schema evolution may change a column between timestamp-millis and " + + "timestamp-micros (and between the local-timestamp variants). Default false rejects such changes. " + + "Set to true to permit precision-only evolution between these logical types."); + public static final ConfigProperty SPILLABLE_DISK_MAP_TYPE = ConfigProperty .key("hoodie.common.spillable.diskmap.type") .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK) diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChanges.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChanges.java index 05d9e0bfa3f34..b603be489a660 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChanges.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChanges.java @@ -47,13 +47,19 @@ public static class ColumnUpdateChange extends TableChange.BaseColumnChange { @Getter private final Map updates = new HashMap<>(); + private final boolean allowTimestampPrecisionEvolution; private ColumnUpdateChange(InternalSchema schema) { - super(schema, false); + this(schema, false, false); } private ColumnUpdateChange(InternalSchema schema, boolean caseSensitive) { + this(schema, caseSensitive, false); + } + + private ColumnUpdateChange(InternalSchema schema, boolean caseSensitive, boolean allowTimestampPrecisionEvolution) { super(schema, caseSensitive); + this.allowTimestampPrecisionEvolution = allowTimestampPrecisionEvolution; } @Override @@ -96,7 +102,7 @@ public ColumnUpdateChange updateColumnType(String name, Type newType) { throw new SchemaCompatibilityException(String.format("Cannot update type for column '%s' because it does not exist in the schema", name)); } - if (!SchemaChangeUtils.isTypeUpdateAllow(field.type(), newType)) { + if (!SchemaChangeUtils.isTypeUpdateAllow(field.type(), newType, allowTimestampPrecisionEvolution)) { throw new SchemaCompatibilityException(String.format( "Cannot update column '%s' from type '%s' to incompatible type '%s'.", name, field.type(), newType)); } @@ -238,6 +244,10 @@ public static ColumnUpdateChange get(InternalSchema schema) { public static ColumnUpdateChange get(InternalSchema schema, boolean caseSensitive) { return new ColumnUpdateChange(schema, caseSensitive); } + + public static ColumnUpdateChange get(InternalSchema schema, boolean caseSensitive, boolean allowTimestampPrecisionEvolution) { + return new ColumnUpdateChange(schema, caseSensitive, allowTimestampPrecisionEvolution); + } } /** Deal with delete columns changes for table. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java index 5ae6f203f0f3b..fdc18f603e36c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java @@ -65,6 +65,15 @@ public class AvroSchemaEvolutionUtils { * @return reconcile Schema */ public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSchema oldTableSchema, boolean makeMissingFieldsNullable) { + return reconcileSchema(incomingSchema, oldTableSchema, makeMissingFieldsNullable, false); + } + + /** + * Variant that allows opting in to timestamp-millis <-> timestamp-micros (and the local-timestamp + * variants) precision evolution, which is rejected by default. + */ + public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSchema oldTableSchema, + boolean makeMissingFieldsNullable, boolean allowTimestampPrecisionEvolution) { /* If incoming schema is null, we fall back on table schema. */ if (incomingSchema.getType() == Schema.Type.NULL) { return oldTableSchema; @@ -119,7 +128,8 @@ public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSche // do type evolution. InternalSchema internalSchemaAfterAddColumns = SchemaChangeUtils.applyTableChanges2Schema(oldTableSchema, addChange); - TableChanges.ColumnUpdateChange typeChange = TableChanges.ColumnUpdateChange.get(internalSchemaAfterAddColumns); + TableChanges.ColumnUpdateChange typeChange = TableChanges.ColumnUpdateChange.get( + internalSchemaAfterAddColumns, false, allowTimestampPrecisionEvolution); typeChangeColumns.stream().filter(f -> !inComingInternalSchema.findType(f).isNestedType()).forEach(col -> { typeChange.updateColumnType(col, inComingInternalSchema.findType(col)); }); @@ -150,7 +160,13 @@ public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSche } public static Schema reconcileSchema(Schema incomingSchema, Schema oldTableSchema, boolean makeMissingFieldsNullable) { - return convert(reconcileSchema(incomingSchema, convert(HoodieSchema.fromAvroSchema(oldTableSchema)), makeMissingFieldsNullable), oldTableSchema.getFullName()).toAvroSchema(); + return reconcileSchema(incomingSchema, oldTableSchema, makeMissingFieldsNullable, false); + } + + public static Schema reconcileSchema(Schema incomingSchema, Schema oldTableSchema, boolean makeMissingFieldsNullable, + boolean allowTimestampPrecisionEvolution) { + return convert(reconcileSchema(incomingSchema, convert(HoodieSchema.fromAvroSchema(oldTableSchema)), makeMissingFieldsNullable, allowTimestampPrecisionEvolution), + oldTableSchema.getFullName()).toAvroSchema(); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SchemaChangeUtils.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SchemaChangeUtils.java index f02407b986ed7..e6f7b85ff0506 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SchemaChangeUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SchemaChangeUtils.java @@ -52,29 +52,39 @@ public class SchemaChangeUtils { * @param dst new column type. * @return whether to allow the column type to be updated. */ - public static boolean isTypeUpdateAllow(Type src, Type dst) { + /** + * Variant that allows opting in to timestamp-millis <-> timestamp-micros (and the local-timestamp + * variants) precision evolution, which is rejected by default. + */ + public static boolean isTypeUpdateAllow(Type src, Type dst, boolean allowTimestampPrecisionEvolution) { if (src.isNestedType() || dst.isNestedType()) { throw new IllegalArgumentException("only support update primitive type"); } if (src.equals(dst)) { return true; } - return isTypeUpdateAllowInternal(src, dst); + return isTypeUpdateAllowInternal(src, dst, allowTimestampPrecisionEvolution); } public static boolean shouldPromoteType(Type src, Type dst) { if (src.equals(dst) || src.isNestedType() || dst.isNestedType()) { return false; } - return isTypeUpdateAllowInternal(src, dst); + return isTypeUpdateAllowInternal(src, dst, false); } - private static boolean isTypeUpdateAllowInternal(Type src, Type dst) { + private static boolean isTypeUpdateAllowInternal(Type src, Type dst, boolean allowTimestampPrecisionEvolution) { switch (src.typeId()) { case INT: return dst == Types.LongType.get() || dst == Types.FloatType.get() || dst == Types.DoubleType.get() || dst == Types.StringType.get() || dst.typeId() == Type.TypeID.DECIMAL || dst.typeId() == Type.TypeID.DECIMAL_FIXED; case LONG: + if (allowTimestampPrecisionEvolution + && (dst.typeId() == Type.TypeID.LOCAL_TIMESTAMP_MILLIS || dst.typeId() == Type.TypeID.LOCAL_TIMESTAMP_MICROS)) { + // Forward-fix path: 0.x stored local-timestamp columns as bare long because its converter + // did not recognize the logical type. Allow attaching the logical type when the gate is open. + return true; + } return dst == Types.FloatType.get() || dst == Types.DoubleType.get() || dst == Types.StringType.get() || dst.typeId() == Type.TypeID.DECIMAL || dst.typeId() == Type.TypeID.DECIMAL_FIXED; case FLOAT: return dst == Types.DoubleType.get() || dst == Types.StringType.get() || dst.typeId() == Type.TypeID.DECIMAL || dst.typeId() == Type.TypeID.DECIMAL_FIXED; @@ -90,6 +100,18 @@ private static boolean isTypeUpdateAllowInternal(Type src, Type dst) { return isDecimalFixedUpdateAllowInternal(src, dst); case STRING: return dst == Types.DateType.get() || dst.typeId() == Type.TypeID.DECIMAL || dst.typeId() == Type.TypeID.DECIMAL_FIXED || dst == Types.BinaryType.get(); + case TIMESTAMP: + case TIMESTAMP_MILLIS: + if (!allowTimestampPrecisionEvolution) { + return false; + } + return dst.typeId() == Type.TypeID.TIMESTAMP || dst.typeId() == Type.TypeID.TIMESTAMP_MILLIS; + case LOCAL_TIMESTAMP_MILLIS: + case LOCAL_TIMESTAMP_MICROS: + if (!allowTimestampPrecisionEvolution) { + return false; + } + return dst.typeId() == Type.TypeID.LOCAL_TIMESTAMP_MILLIS || dst.typeId() == Type.TypeID.LOCAL_TIMESTAMP_MICROS; default: return false; } diff --git a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java index 662436de9d757..bcd0bd79601fb 100644 --- a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java @@ -567,4 +567,64 @@ public void testNotEvolveSchemaIfReconciledSchemaUnchanged() { // the evolved schema should be the old table schema, since there is no type change at all. Assertions.assertEquals(oldInternalSchema, evolvedSchema); } + + @Test + public void testReconcileSchemaTimestampPrecisionEvolution() { + // Strict by default: changing a column's logical type between timestamp-millis and timestamp-micros throws. + // Opt-in via the allowTimestampPrecisionEvolution=true overload permits the precision change. + Schema tableSchemaMicros = Schema.createRecord("trip", null, null, false, Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("ts", LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)), null, null))); + Schema incomingSchemaMillis = Schema.createRecord("trip", null, null, false, Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("ts", LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)), null, null))); + + // Default strict behavior rejects the precision change in either direction. + Throwable rejectedMicrosToMillis = assertThrows(Exception.class, + () -> AvroSchemaEvolutionUtils.reconcileSchema(incomingSchemaMillis, tableSchemaMicros, false, false)); + assertTrue(rejectedMicrosToMillis.getMessage().contains("incompatible type")); + Throwable rejectedMillisToMicros = assertThrows(Exception.class, + () -> AvroSchemaEvolutionUtils.reconcileSchema(tableSchemaMicros, incomingSchemaMillis, false, false)); + assertTrue(rejectedMillisToMicros.getMessage().contains("incompatible type")); + + // With the gate opened, both directions succeed and produce a schema labeled with the incoming precision. + Schema reconciledToMillis = AvroSchemaEvolutionUtils.reconcileSchema( + incomingSchemaMillis, tableSchemaMicros, false, true); + Assertions.assertEquals("timestamp-millis", reconciledToMillis.getField("ts").schema().getLogicalType().getName()); + + Schema reconciledToMicros = AvroSchemaEvolutionUtils.reconcileSchema( + tableSchemaMicros, incomingSchemaMillis, false, true); + Assertions.assertEquals("timestamp-micros", reconciledToMicros.getField("ts").schema().getLogicalType().getName()); + + // The same gate applies to the local-timestamp variants. + Schema tableLocalMicros = Schema.createRecord("trip", null, null, false, Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("ts", LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG)), null, null))); + Schema incomingLocalMillis = Schema.createRecord("trip", null, null, false, Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("ts", LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG)), null, null))); + assertThrows(Exception.class, + () -> AvroSchemaEvolutionUtils.reconcileSchema(incomingLocalMillis, tableLocalMicros, false, false)); + Schema reconciledLocal = AvroSchemaEvolutionUtils.reconcileSchema( + incomingLocalMillis, tableLocalMicros, false, true); + Assertions.assertEquals("local-timestamp-millis", reconciledLocal.getField("ts").schema().getLogicalType().getName()); + + // 0.x did not recognize the local-timestamp logical types, so affected tables persisted those + // columns as bare long. The gate must also allow attaching the logical type on forward-fix. + Schema tableBareLong = Schema.createRecord("trip", null, null, false, Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("ts", Schema.create(Schema.Type.LONG), null, null))); + assertThrows(Exception.class, + () -> AvroSchemaEvolutionUtils.reconcileSchema(incomingLocalMillis, tableBareLong, false, false)); + Schema repairedToLocalMillis = AvroSchemaEvolutionUtils.reconcileSchema( + incomingLocalMillis, tableBareLong, false, true); + Assertions.assertEquals("local-timestamp-millis", repairedToLocalMillis.getField("ts").schema().getLogicalType().getName()); + + Schema incomingLocalMicros = Schema.createRecord("trip", null, null, false, Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.STRING), null, null), + new Schema.Field("ts", LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG)), null, null))); + Schema repairedToLocalMicros = AvroSchemaEvolutionUtils.reconcileSchema( + incomingLocalMicros, tableBareLong, false, true); + Assertions.assertEquals("local-timestamp-micros", repairedToLocalMicros.getField("ts").schema().getLogicalType().getName()); + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala index 5a40f45c12412..085653184999f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala @@ -169,11 +169,15 @@ object HoodieSchemaUtils { HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean val setNullForMissingColumns = opts.getOrElse(DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.key(), DataSourceWriteOptions.SET_NULL_FOR_MISSING_COLUMNS.defaultValue).toBoolean + val allowTimestampPrecisionEvolution = opts.getOrElse(HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION.key, + HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION.defaultValue.toString).toBoolean if (!mergeIntoWrites && !shouldValidateSchemasCompatibility && !allowAutoEvolutionColumnDrop) { // Default behaviour val reconciledSchema = if (setNullForMissingColumns) { - HoodieSchema.fromAvroSchema(AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema.toAvroSchema(), latestTableSchema.toAvroSchema(), setNullForMissingColumns)) + HoodieSchema.fromAvroSchema(AvroSchemaEvolutionUtils.reconcileSchema( + canonicalizedSourceSchema.toAvroSchema(), latestTableSchema.toAvroSchema(), + setNullForMissingColumns, allowTimestampPrecisionEvolution)) } else { canonicalizedSourceSchema } @@ -205,7 +209,10 @@ object HoodieSchemaUtils { // Apply schema evolution, by auto-merging write schema and read schema val setNullForMissingColumns = opts.getOrElse(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key(), HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.defaultValue()).toBoolean - val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema.toAvroSchema(), internalSchema, setNullForMissingColumns) + val allowTimestampPrecisionEvolution = opts.getOrElse(HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION.key, + HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION.defaultValue.toString).toBoolean + val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema.toAvroSchema(), internalSchema, + setNullForMissingColumns, allowTimestampPrecisionEvolution) val evolvedSchema = InternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getFullName) val shouldRemoveMetaDataFromInternalSchema = sourceSchema.getFields.asScala.filter(f => f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty if (shouldRemoveMetaDataFromInternalSchema) HoodieCommonSchemaUtils.removeMetadataFields(evolvedSchema) else evolvedSchema diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 684352d033096..f6812da93117b 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -27,6 +27,7 @@ import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.common.config.DFSPropertiesConfiguration; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.config.LockConfiguration; @@ -79,6 +80,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.SchemaCompatibilityException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.hadoop.fs.HadoopFSUtils; @@ -1109,8 +1111,19 @@ private void assertBoundaryCounts(Dataset df, String exprZero, String exprT } @ParameterizedTest - @CsvSource(value = {"SIX,AVRO,CLUSTER", "EIGHT,AVRO,CLUSTER", "CURRENT,AVRO,NONE", "CURRENT,AVRO,CLUSTER", "CURRENT,SPARK,NONE", "CURRENT,SPARK,CLUSTER"}) - void testCOWLogicalRepair(String tableVersion, String recordType, String operation) throws Exception { + @CsvSource(value = { + "SIX,AVRO,CLUSTER,false,false", "EIGHT,AVRO,CLUSTER,false,false", + "CURRENT,AVRO,NONE,false,false", "CURRENT,AVRO,CLUSTER,false,false", + "CURRENT,SPARK,NONE,false,false", "CURRENT,SPARK,CLUSTER,false,false", + // Variants that exercise the schema-reconcile path (enabled by setNullForMissingColumns=true) + // together with the opt-in to permit timestamp-millis <-> timestamp-micros evolution. + "SIX,AVRO,CLUSTER,true,true", "EIGHT,AVRO,CLUSTER,true,true", "CURRENT,AVRO,CLUSTER,true,true", + // Reconcile path on, precision-evolution gate closed: the first sync must throw because + // the corrupt fixture's timestamp/local-timestamp columns trigger a disallowed type change. + "SIX,AVRO,CLUSTER,true,false"}) + void testCOWLogicalRepair(String tableVersion, String recordType, String operation, + boolean setNullForMissingColumns, + boolean allowTimestampPrecisionEvolution) throws Exception { TestMercifulJsonToRowConverterBase.timestampNTZCompatibility(() -> { String dirName = "trips_logical_types_json_cow_write"; String dataPath = basePath + "/" + dirName; @@ -1139,9 +1152,24 @@ void testCOWLogicalRepair(String tableVersion, String recordType, String operati properties.setProperty("hoodie.parquet.small.file.limit", "-1"); properties.setProperty("hoodie.cleaner.commits.retained", "10"); properties.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), tableVersionString); + properties.setProperty(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key(), + Boolean.toString(setNullForMissingColumns)); + properties.setProperty(HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION.key(), + Boolean.toString(allowTimestampPrecisionEvolution)); Option propt = Option.of(properties); + if (setNullForMissingColumns && !allowTimestampPrecisionEvolution) { + // Reconcile path is active but the precision-evolution gate is closed. + // The mislabeled timestamp/local-timestamp columns must surface as a + // SchemaCompatibilityException on the first sync. + SchemaCompatibilityException thrown = assertThrows(SchemaCompatibilityException.class, + () -> syncOnce(prepCfgForCowLogicalRepair(tableBasePath, "456"), propt)); + assertTrue(thrown.getMessage().contains("incompatible type"), + "expected disallowed-type-change message, got: " + thrown.getMessage()); + return; + } + syncOnce(prepCfgForCowLogicalRepair(tableBasePath, "456"), propt); inputDataPath = getClass().getClassLoader().getResource("logical-repair/cow_write_updates/3").toURI().toString(); @@ -1189,11 +1217,17 @@ void testCOWLogicalRepair(String tableVersion, String recordType, String operati } @ParameterizedTest - @CsvSource(value = {"SIX,AVRO,CLUSTER,AVRO", "EIGHT,AVRO,CLUSTER,AVRO", - "CURRENT,AVRO,NONE,AVRO", "CURRENT,AVRO,CLUSTER,AVRO", "CURRENT,AVRO,COMPACT,AVRO", - "CURRENT,AVRO,NONE,PARQUET", "CURRENT,AVRO,CLUSTER,PARQUET", "CURRENT,AVRO,COMPACT,PARQUET", - "CURRENT,SPARK,NONE,PARQUET", "CURRENT,SPARK,CLUSTER,PARQUET", "CURRENT,SPARK,COMPACT,PARQUET"}) - void testMORLogicalRepair(String tableVersion, String recordType, String operation, String logBlockType) throws Exception { + @CsvSource(value = {"SIX,AVRO,CLUSTER,AVRO,false,false", "EIGHT,AVRO,CLUSTER,AVRO,false,false", + "CURRENT,AVRO,NONE,AVRO,false,false", "CURRENT,AVRO,CLUSTER,AVRO,false,false", "CURRENT,AVRO,COMPACT,AVRO,false,false", + "CURRENT,AVRO,NONE,PARQUET,false,false", "CURRENT,AVRO,CLUSTER,PARQUET,false,false", "CURRENT,AVRO,COMPACT,PARQUET,false,false", + "CURRENT,SPARK,NONE,PARQUET,false,false", "CURRENT,SPARK,CLUSTER,PARQUET,false,false", "CURRENT,SPARK,COMPACT,PARQUET,false,false", + // Variants that exercise the schema-reconcile path with the timestamp-precision opt-in. + "SIX,AVRO,CLUSTER,AVRO,true,true", "EIGHT,AVRO,CLUSTER,AVRO,true,true", "CURRENT,AVRO,CLUSTER,AVRO,true,true", + // Reconcile path on, precision-evolution gate closed: the first sync must throw. + "SIX,AVRO,CLUSTER,AVRO,true,false"}) + void testMORLogicalRepair(String tableVersion, String recordType, String operation, String logBlockType, + boolean setNullForMissingColumns, + boolean allowTimestampPrecisionEvolution) throws Exception { TestMercifulJsonToRowConverterBase.timestampNTZCompatibility(() -> { String tableSuffix; String logFormatValue; @@ -1241,6 +1275,10 @@ void testMORLogicalRepair(String tableVersion, String recordType, String operati properties.setProperty("hoodie.cleaner.commits.retained", "10"); properties.setProperty(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), tableVersionString); properties.setProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), logFormatValue); + properties.setProperty(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key(), + Boolean.toString(setNullForMissingColumns)); + properties.setProperty(HoodieCommonConfig.ALLOW_TIMESTAMP_PRECISION_EVOLUTION.key(), + Boolean.toString(allowTimestampPrecisionEvolution)); boolean disableCompaction; if ("COMPACT".equals(operation)) { @@ -1262,6 +1300,17 @@ void testMORLogicalRepair(String tableVersion, String recordType, String operati Option propt = Option.of(properties); + if (setNullForMissingColumns && !allowTimestampPrecisionEvolution) { + // Reconcile path is active but the precision-evolution gate is closed. + // The mislabeled timestamp/local-timestamp columns must surface as a + // SchemaCompatibilityException on the first sync. + SchemaCompatibilityException thrown = assertThrows(SchemaCompatibilityException.class, + () -> syncOnce(prepCfgForMorLogicalRepair(tableBasePath, dirName, "123", disableCompaction), propt)); + assertTrue(thrown.getMessage().contains("incompatible type"), + "expected disallowed-type-change message, got: " + thrown.getMessage()); + return; + } + syncOnce(prepCfgForMorLogicalRepair(tableBasePath, dirName, "123", disableCompaction), propt); String prevTimezone = sparkSession.conf().get("spark.sql.session.timeZone");