From b8c11e70fe1d00616110d07beb501ed5bbbc1032 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Tue, 9 Jun 2026 20:05:48 -0700 Subject: [PATCH 1/4] docs(datasource): START_COMMIT is exclusive, not inclusive Spark's incremental relation filters with completion_time > START_COMMIT (start-exclusive). The config doc said >=, which contradicts both the V1 relation's findInstantsInRange (start, end] and the V2 relation's RangeType.OPEN_CLOSED default. --- .../src/main/scala/org/apache/hudi/DataSourceOptions.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 445f052ee4f6a..75b8b97e70510 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -114,9 +114,9 @@ object DataSourceReadOptions { .noDefaultValue() .sinceVersion("0.9.0") .withDocumentation("Required when `" + QUERY_TYPE.key() + "` is set to `" + QUERY_TYPE_INCREMENTAL_OPT_VAL + "`. " - + "Represents the completion time to start incrementally pulling data from. The completion time here need not necessarily " - + "correspond to an instant on the timeline. New data written with completion_time >= START_COMMIT are fetched out. " - + "For e.g: ‘20170901080000’ will get all new data written on or after Sep 1, 2017 08:00AM.") + + "Represents the completion time to start incrementally pulling data from (exclusive). The completion time here need not " + + "necessarily correspond to an instant on the timeline. New data written with completion_time > START_COMMIT are fetched out. " + + "For e.g: ‘20170901080000’ will get all new data written strictly after Sep 1, 2017 08:00AM.") val END_COMMIT: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.end.instanttime") From 13c860bd6ae646c838f293526bed06a112121135 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Tue, 9 Jun 2026 21:03:19 -0700 Subject: [PATCH 2/4] Clarify START_COMMIT semantics by table version V1 relation (used for table version 6 source tables) interprets START_COMMIT as requested/instant time; V2 relation (table version 8+) interprets it as completion time. Both are start-exclusive. --- .../src/main/scala/org/apache/hudi/DataSourceOptions.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 75b8b97e70510..d7df43254f92f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -114,8 +114,9 @@ object DataSourceReadOptions { .noDefaultValue() .sinceVersion("0.9.0") .withDocumentation("Required when `" + QUERY_TYPE.key() + "` is set to `" + QUERY_TYPE_INCREMENTAL_OPT_VAL + "`. " - + "Represents the completion time to start incrementally pulling data from (exclusive). The completion time here need not " - + "necessarily correspond to an instant on the timeline. New data written with completion_time > START_COMMIT are fetched out. " + + "Represents the start point (exclusive) to begin incrementally pulling data from. Interpreted as completion time for " + + "table version 8 and later, or as requested time (instant time) for earlier table versions. The value need not " + + "necessarily correspond to an instant on the timeline. New data written strictly after START_COMMIT are fetched out. " + "For e.g: ‘20170901080000’ will get all new data written strictly after Sep 1, 2017 08:00AM.") val END_COMMIT: ConfigProperty[String] = ConfigProperty From a1c37b23f3c01cfb99f0b1df7df1d9ebfaa2fff5 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Tue, 9 Jun 2026 21:04:42 -0700 Subject: [PATCH 3/4] Clarify START_COMMIT and END_COMMIT semantics by source table version --- .../org/apache/hudi/DataSourceOptions.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index d7df43254f92f..963d58a421fcf 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -114,20 +114,20 @@ object DataSourceReadOptions { .noDefaultValue() .sinceVersion("0.9.0") .withDocumentation("Required when `" + QUERY_TYPE.key() + "` is set to `" + QUERY_TYPE_INCREMENTAL_OPT_VAL + "`. " - + "Represents the start point (exclusive) to begin incrementally pulling data from. Interpreted as completion time for " - + "table version 8 and later, or as requested time (instant time) for earlier table versions. The value need not " - + "necessarily correspond to an instant on the timeline. New data written strictly after START_COMMIT are fetched out. " - + "For e.g: ‘20170901080000’ will get all new data written strictly after Sep 1, 2017 08:00AM.") + + "The start point (exclusive) to begin incrementally pulling data from. For source tables at version 8 or later this is " + + "a completion time; for earlier source table versions (e.g., version 6) it is a requested time (instant time). The " + + "value need not necessarily correspond to an instant on the timeline. New data written strictly after START_COMMIT are " + + "fetched out. For e.g. ‘20170901080000’ will get all new data written strictly after Sep 1, 2017 08:00AM.") val END_COMMIT: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.end.instanttime") .noDefaultValue() .sinceVersion("0.9.0") .withDocumentation("Used when `" + QUERY_TYPE.key() + "` is set to `" + QUERY_TYPE_INCREMENTAL_OPT_VAL - + "`. Represents the completion time to limit incrementally fetched data to. When not specified latest commit " - + "completion time from timeline is assumed by default. When specified, new data written with " - + "completion_time <= END_COMMIT are fetched out. " - + "Point in time type queries make more sense with begin and end completion times specified.") + + "`. The end point (inclusive) to limit incrementally fetched data to. For source tables at version 8 or later this is " + + "a completion time; for earlier source table versions (e.g., version 6) it is a requested time (instant time). When " + + "not specified, the latest committed instant from the timeline is used. Point in time type queries make more sense " + + "with both begin and end specified.") val STREAMING_READ_TABLE_VERSION: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.streaming.table.version") From 0b00aad11438d1e4f8361c048cd8919464240bf4 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Tue, 9 Jun 2026 21:06:41 -0700 Subject: [PATCH 4/4] Note override configs (incr/streaming read table version) in START_COMMIT/END_COMMIT docs --- .../org/apache/hudi/DataSourceOptions.scala | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 963d58a421fcf..56d2e1fab6fc7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -114,32 +114,40 @@ object DataSourceReadOptions { .noDefaultValue() .sinceVersion("0.9.0") .withDocumentation("Required when `" + QUERY_TYPE.key() + "` is set to `" + QUERY_TYPE_INCREMENTAL_OPT_VAL + "`. " - + "The start point (exclusive) to begin incrementally pulling data from. For source tables at version 8 or later this is " - + "a completion time; for earlier source table versions (e.g., version 6) it is a requested time (instant time). The " - + "value need not necessarily correspond to an instant on the timeline. New data written strictly after START_COMMIT are " - + "fetched out. For e.g. ‘20170901080000’ will get all new data written strictly after Sep 1, 2017 08:00AM.") + + "The start point (exclusive) to begin incrementally pulling data from. The semantics depend on the effective table " + + "version (overridable via `hoodie.datasource.read.incr.table.version` for incremental reads or " + + "`hoodie.datasource.read.streaming.table.version` for streaming reads; otherwise the source table's actual version): " + + "version 8 or later treats this as a completion time, earlier versions (e.g., version 6) treat it as a requested time " + + "(instant time). The value need not necessarily correspond to an instant on the timeline. New data written strictly " + + "after START_COMMIT are fetched out. For e.g. ‘20170901080000’ will get all new data written strictly after Sep 1, " + + "2017 08:00AM.") val END_COMMIT: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.end.instanttime") .noDefaultValue() .sinceVersion("0.9.0") .withDocumentation("Used when `" + QUERY_TYPE.key() + "` is set to `" + QUERY_TYPE_INCREMENTAL_OPT_VAL - + "`. The end point (inclusive) to limit incrementally fetched data to. For source tables at version 8 or later this is " - + "a completion time; for earlier source table versions (e.g., version 6) it is a requested time (instant time). When " - + "not specified, the latest committed instant from the timeline is used. Point in time type queries make more sense " - + "with both begin and end specified.") + + "`. The end point (inclusive) to limit incrementally fetched data to. Same time-semantics rules as START_COMMIT: " + + "version 8 or later treats this as a completion time, earlier versions (e.g., version 6) treat it as a requested time " + + "(overridable via `hoodie.datasource.read.incr.table.version` or `hoodie.datasource.read.streaming.table.version`). " + + "When not specified, the latest committed instant from the timeline is used. Point in time type queries make more " + + "sense with both begin and end specified.") val STREAMING_READ_TABLE_VERSION: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.streaming.table.version") .noDefaultValue() .sinceVersion("1.0.0") - .withDocumentation("The table version assumed for streaming read") + .withDocumentation("Overrides the table version assumed for streaming reads. Version 8+ selects HoodieStreamSourceV2 " + + "(completion-time based START_COMMIT/END_COMMIT); earlier versions select HoodieStreamSourceV1 (requested-time based). " + + "If unset, the source table's actual version is used.") val INCREMENTAL_READ_TABLE_VERSION: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.incr.table.version") .noDefaultValue() .sinceVersion("1.0.0") - .withDocumentation("The table version assumed for incremental read") + .withDocumentation("Overrides the table version assumed for incremental reads. Version 8+ selects the V2 incremental " + + "relation (completion-time based START_COMMIT/END_COMMIT); earlier versions select the V1 relation (requested-time " + + "based). If unset, the source table's actual version is used.") val INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.schema.use.end.instanttime")