From 7a78d8ea4c36d0e0652416253aa0de26ed6fae22 Mon Sep 17 00:00:00 2001 From: "R. Conner Howell" Date: Tue, 28 Apr 2026 09:52:01 -0700 Subject: [PATCH 1/3] fix: automatically migrate V0 SqlCatalog catalog tables --- crates/catalog/sql/src/catalog.rs | 78 +++++++++++++++++++++++++++++++ crates/catalog/sql/src/lib.rs | 2 +- 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index c7bf9d0cfd..9d3dc43665 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -310,6 +310,24 @@ impl SqlCatalog { .await .map_err(from_sqlx_error)?; + // Check if the catalog table has the iceberg_type column, if not, it's a V0 schema. + let needs_migration = sqlx::query(&format!( + "SELECT {CATALOG_FIELD_RECORD_TYPE} FROM {CATALOG_TABLE_NAME} LIMIT 0" + )) + .fetch_all(&pool) + .await + .is_err(); + + // Add the iceberg_type column if it's missing, thereby updating the schema to V1. + if needs_migration { + sqlx::query(&format!( + "ALTER TABLE {CATALOG_TABLE_NAME} ADD COLUMN {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5)" + )) + .execute(&pool) + .await + .map_err(from_sqlx_error)?; + } + Ok(SqlCatalog { name: config.name.to_owned(), connection: pool, @@ -1037,6 +1055,7 @@ mod tests { use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent}; use itertools::Itertools; use regex::Regex; + use sqlx::any::install_default_drivers; use sqlx::migrate::MigrateDatabase; use tempfile::TempDir; @@ -2046,4 +2065,63 @@ mod tests { format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"), ); } + + #[tokio::test] + async fn test_v0_schema_migration() { + install_default_drivers(); + + // Simulate a V0 database with no "iceberg_type" column. + let temp_dir = TempDir::new().unwrap(); + let db_path = temp_dir.path().join("catalog.db"); + let uri = format!("sqlite:{}", db_path.to_str().unwrap()); + sqlx::Sqlite::create_database(&uri).await.unwrap(); + + let pool = sqlx::AnyPool::connect(&uri).await.unwrap(); + sqlx::query( + "CREATE TABLE iceberg_tables ( + catalog_name VARCHAR(255) NOT NULL, + table_namespace VARCHAR(255) NOT NULL, + table_name VARCHAR(255) NOT NULL, + metadata_location VARCHAR(1000), + previous_metadata_location VARCHAR(1000), + PRIMARY KEY (catalog_name, table_namespace, table_name) + )", + ) + .execute(&pool) + .await + .unwrap(); + sqlx::query( + "INSERT INTO iceberg_tables + (catalog_name, table_namespace, table_name, metadata_location) + VALUES ('iceberg', 'ns', 'tbl', '/tmp/fake-location')", + ) + .execute(&pool) + .await + .unwrap(); + pool.close().await; + + // Opening the catalog should migrate the V0 schema to V1 without error. + let props = HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), uri), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + temp_dir.path().to_str().unwrap().to_string(), + ), + ( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::QMark.to_string(), + ), + ]); + let catalog = SqlCatalogBuilder::default() + .with_storage_factory(Arc::new(LocalFsStorageFactory)) + .load("iceberg", props) + .await + .expect("should open V0 catalog and migrate schema"); + + // The V0 row (no "iceberg_type" column) should be treated as a TABLE. + let ns = NamespaceIdent::from_strs(["ns"]).unwrap(); + let tables = catalog.list_tables(&ns).await.unwrap(); + assert_eq!(tables.len(), 1); + assert_eq!(tables[0].name(), "tbl"); + } } diff --git a/crates/catalog/sql/src/lib.rs b/crates/catalog/sql/src/lib.rs index b76006ed3b..3d6e9cb2e8 100644 --- a/crates/catalog/sql/src/lib.rs +++ b/crates/catalog/sql/src/lib.rs @@ -29,7 +29,7 @@ //! SqlBindStyle, SqlCatalogBuilder, //! }; //! -//! #[tokio::main] +//! #[tokio::main(flavor = "current_thread")] //! async fn main() { //! let catalog = SqlCatalogBuilder::default() //! .load( From d33c0f89ba0eb3e9c3492dc6f4113685813d99a1 Mon Sep 17 00:00:00 2001 From: "R. Conner Howell" Date: Fri, 1 May 2026 08:58:44 -0700 Subject: [PATCH 2/3] Migration based upon sql.schema-version prop --- Cargo.lock | 1 + crates/catalog/sql/Cargo.toml | 1 + crates/catalog/sql/src/catalog.rs | 188 +++++++++++++++++++++--------- 3 files changed, 138 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 71c4edadf2..a9143cbaf2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3865,6 +3865,7 @@ dependencies = [ "strum", "tempfile", "tokio", + "tracing", ] [[package]] diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index c63e3061bd..9782641fc5 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -34,6 +34,7 @@ async-trait = { workspace = true } iceberg = { workspace = true } sqlx = { version = "0.8.1", features = ["any"], default-features = false } strum = { workspace = true } +tracing = { workspace = true } [dev-dependencies] itertools = { workspace = true } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 9d3dc43665..9bb604fb58 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -41,6 +41,8 @@ pub const SQL_CATALOG_PROP_URI: &str = "uri"; pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; /// catalog sql bind style pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style"; +/// catalog schema version, setting to "V1" will migrate from V0 to V1 schema +pub const SQL_CATALOG_PROP_SCHEMA_VERSION: &str = "sql.schema-version"; static CATALOG_TABLE_NAME: &str = "iceberg_tables"; static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name"; @@ -233,6 +235,17 @@ pub struct SqlCatalog { fileio: FileIO, sql_bind_style: SqlBindStyle, runtime: Runtime, + schema_version: SchemaVersion, +} + +#[derive(Debug, Clone, Copy, PartialEq, strum::EnumString, strum::Display)] +#[strum(ascii_case_insensitive)] +/// Schema version of the `iceberg_tables` catalog table. +pub enum SchemaVersion { + /// Original schema without the `iceberg_type` column. + V0, + /// Extended schema with the `iceberg_type` column for view support. + V1, } #[derive(Debug, PartialEq, strum::EnumString, strum::Display)] @@ -310,23 +323,42 @@ impl SqlCatalog { .await .map_err(from_sqlx_error)?; - // Check if the catalog table has the iceberg_type column, if not, it's a V0 schema. - let needs_migration = sqlx::query(&format!( + // Check if the catalog table supports views, indicating that the schema is V1 + let is_v1 = sqlx::query(&format!( "SELECT {CATALOG_FIELD_RECORD_TYPE} FROM {CATALOG_TABLE_NAME} LIMIT 0" )) - .fetch_all(&pool) + .execute(&pool) .await - .is_err(); + .is_ok(); - // Add the iceberg_type column if it's missing, thereby updating the schema to V1. - if needs_migration { - sqlx::query(&format!( - "ALTER TABLE {CATALOG_TABLE_NAME} ADD COLUMN {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5)" - )) - .execute(&pool) - .await - .map_err(from_sqlx_error)?; - } + // Migrate the schema to V1 if the catalog table does not support views and the caller opted in. + let schema_version = if is_v1 { + tracing::debug!("{CATALOG_TABLE_NAME} already supports views"); + SchemaVersion::V1 + } else { + let requested: SchemaVersion = config + .props + .get(SQL_CATALOG_PROP_SCHEMA_VERSION) + .and_then(|v| v.parse().ok()) + .unwrap_or(SchemaVersion::V0); + if requested == SchemaVersion::V1 { + tracing::debug!("{CATALOG_TABLE_NAME} is being updated to support views"); + sqlx::query(&format!( + "ALTER TABLE {CATALOG_TABLE_NAME} \ + ADD COLUMN {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5)" + )) + .execute(&pool) + .await + .map_err(from_sqlx_error)?; + SchemaVersion::V1 + } else { + tracing::warn!( + "SqlCatalog is initialized without view support. To auto-migrate the database's schema and enable view support, set {}=V1", + SQL_CATALOG_PROP_SCHEMA_VERSION + ); + SchemaVersion::V0 + } + }; Ok(SqlCatalog { name: config.name.to_owned(), @@ -335,9 +367,17 @@ impl SqlCatalog { fileio, sql_bind_style: config.sql_bind_style, runtime, + schema_version, }) } + fn record_type_filter(&self) -> &'static str { + match self.schema_version { + SchemaVersion::V1 => "AND (iceberg_type = 'TABLE' OR iceberg_type IS NULL)", + SchemaVersion::V0 => "", + } + } + /// SQLX Any does not implement PostgresSQL bindings, so we have to do this. fn replace_placeholders(&self, query: &str) -> String { match self.sql_bind_style { @@ -706,10 +746,8 @@ impl Catalog for SqlCatalog { FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? AND {CATALOG_FIELD_CATALOG_NAME} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )", + {}", + self.record_type_filter() ), vec![Some(&namespace.join(".")), Some(&self.name)], ) @@ -745,10 +783,8 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? AND {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )" + {}", + self.record_type_filter() ), vec![Some(&namespace), Some(&self.name), Some(table_name)], ) @@ -772,10 +808,8 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )" + {}", + self.record_type_filter() ), vec![ Some(&self.name), @@ -813,10 +847,8 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )" + {}", + self.record_type_filter() ), vec![ Some(&self.name), @@ -941,10 +973,8 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - )" + {}", + self.record_type_filter() ), vec![ Some(dest.name()), @@ -1013,11 +1043,9 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND ( - {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' - OR {CATALOG_FIELD_RECORD_TYPE} IS NULL - ) - AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?" + {} + AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?", + self.record_type_filter() ), vec![ Some(&staged_metadata_location_str), @@ -1060,10 +1088,10 @@ mod tests { use tempfile::TempDir; use crate::catalog::{ - NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, SQL_CATALOG_PROP_URI, - SQL_CATALOG_PROP_WAREHOUSE, + NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, + SQL_CATALOG_PROP_SCHEMA_VERSION, SQL_CATALOG_PROP_URI, SQL_CATALOG_PROP_WAREHOUSE, }; - use crate::{SqlBindStyle, SqlCatalogBuilder}; + use crate::{SchemaVersion, SqlBindStyle, SqlCatalogBuilder}; const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; @@ -2066,16 +2094,15 @@ mod tests { ); } - #[tokio::test] - async fn test_v0_schema_migration() { - install_default_drivers(); - - // Simulate a V0 database with no "iceberg_type" column. + /// Creates a V0 SQLite database (no `iceberg_type` column) with one pre-inserted table row. + /// Returns the SQLite URI and the temp dir that owns the database file. + async fn create_v0_sqlite_db() -> (String, TempDir) { let temp_dir = TempDir::new().unwrap(); - let db_path = temp_dir.path().join("catalog.db"); - let uri = format!("sqlite:{}", db_path.to_str().unwrap()); + let uri = format!( + "sqlite:{}", + temp_dir.path().join("catalog.db").to_str().unwrap() + ); sqlx::Sqlite::create_database(&uri).await.unwrap(); - let pool = sqlx::AnyPool::connect(&uri).await.unwrap(); sqlx::query( "CREATE TABLE iceberg_tables ( @@ -2099,8 +2126,16 @@ mod tests { .await .unwrap(); pool.close().await; + (uri, temp_dir) + } - // Opening the catalog should migrate the V0 schema to V1 without error. + #[tokio::test] + async fn test_v0_schema_migration() { + install_default_drivers(); + + let (uri, temp_dir) = create_v0_sqlite_db().await; + + // Opening the catalog with sql.schema-version=V1 should migrate the V0 schema. let props = HashMap::from_iter([ (SQL_CATALOG_PROP_URI.to_string(), uri), ( @@ -2111,17 +2146,66 @@ mod tests { SQL_CATALOG_PROP_BIND_STYLE.to_string(), SqlBindStyle::QMark.to_string(), ), + ( + SQL_CATALOG_PROP_SCHEMA_VERSION.to_string(), + SchemaVersion::V1.to_string(), + ), + ]); + let catalog = SqlCatalogBuilder::default() + .with_storage_factory(Arc::new(LocalFsStorageFactory)) + .load("iceberg", props) + .await + .expect("should open V0 catalog and migrate schema when sql.schema-version=V1"); + + // The V0 row (no "iceberg_type" column) should be treated as a TABLE after migration. + let ns = NamespaceIdent::from_strs(["ns"]).unwrap(); + let tables = catalog.list_tables(&ns).await.unwrap(); + assert_eq!(tables.len(), 1); + assert_eq!(tables[0].name(), "tbl"); + } + + #[tokio::test] + async fn test_v0_schema_no_migration_without_property() { + install_default_drivers(); + + let (uri, temp_dir) = create_v0_sqlite_db().await; + + // Opening without sql.schema-version=V1 should NOT migrate — but should still work. + let props = HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), uri.clone()), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + temp_dir.path().to_str().unwrap().to_string(), + ), + ( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::QMark.to_string(), + ), ]); let catalog = SqlCatalogBuilder::default() .with_storage_factory(Arc::new(LocalFsStorageFactory)) .load("iceberg", props) .await - .expect("should open V0 catalog and migrate schema"); + .expect("should open V0 catalog without migrating"); - // The V0 row (no "iceberg_type" column) should be treated as a TABLE. + assert_eq!(catalog.schema_version, SchemaVersion::V0); + + // The table should still be visible via V0 queries (no iceberg_type filter). let ns = NamespaceIdent::from_strs(["ns"]).unwrap(); let tables = catalog.list_tables(&ns).await.unwrap(); assert_eq!(tables.len(), 1); assert_eq!(tables[0].name(), "tbl"); + + // Confirm the column was NOT added to the database. + let probe_pool = sqlx::AnyPool::connect(&uri).await.unwrap(); + let column_exists = sqlx::query("SELECT iceberg_type FROM iceberg_tables LIMIT 0") + .execute(&probe_pool) + .await + .is_ok(); + probe_pool.close().await; + assert!( + !column_exists, + "iceberg_type column should not exist when sql.schema-version=V1 was not set" + ); } } From f8e3d49f5af78cb0be3d11aa4166f2cdc2d4b22a Mon Sep 17 00:00:00 2001 From: "R. Conner Howell" Date: Mon, 1 Jun 2026 15:15:18 -0700 Subject: [PATCH 3/3] address PR feedback --- crates/catalog/sql/src/catalog.rs | 205 +++++++++++++++++++++++------- crates/catalog/sql/src/lib.rs | 2 +- 2 files changed, 162 insertions(+), 45 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 9bb604fb58..0f12043a67 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -40,8 +40,12 @@ pub const SQL_CATALOG_PROP_URI: &str = "uri"; /// catalog warehouse location pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; /// catalog sql bind style -pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style"; -/// catalog schema version, setting to "V1" will migrate from V0 to V1 schema +pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql.bind-style"; +/// Legacy (pre-`sql.bind-style`) key for [`SQL_CATALOG_PROP_BIND_STYLE`], still accepted for +/// backward compatibility. +const SQL_CATALOG_PROP_BIND_STYLE_LEGACY: &str = "sql_bind_style"; +/// Expected catalog schema version. +/// If set to `V1` while the catalog table is actually `V0`, it will be migrated from `V0` to `V1`. pub const SQL_CATALOG_PROP_SCHEMA_VERSION: &str = "sql.schema-version"; static CATALOG_TABLE_NAME: &str = "iceberg_tables"; @@ -80,6 +84,7 @@ impl Default for SqlCatalogBuilder { name: "".to_string(), warehouse_location: "".to_string(), sql_bind_style: SqlBindStyle::DollarNumeric, + schema_version: None, props: HashMap::new(), }, storage_factory: None, @@ -171,7 +176,16 @@ impl CatalogBuilder for SqlCatalogBuilder { let name = name.into(); let mut valid_sql_bind_style = true; - if let Some(sql_bind_style) = self.config.props.remove(SQL_CATALOG_PROP_BIND_STYLE) { + + // Accept the preferred `sql.bind-style` key, falling back to the legacy `sql_bind_style`. + let sql_bind_style = self + .config + .props + .remove(SQL_CATALOG_PROP_BIND_STYLE) + .or_else(|| self.config.props.remove(SQL_CATALOG_PROP_BIND_STYLE_LEGACY)); + + // Validate the SQL bind style + if let Some(sql_bind_style) = sql_bind_style { if let Ok(sql_bind_style) = SqlBindStyle::from_str(&sql_bind_style) { self.config.sql_bind_style = sql_bind_style; } else { @@ -179,6 +193,16 @@ impl CatalogBuilder for SqlCatalogBuilder { } } + // Parse the requested schema version up front so invalid values fail fast rather than + // silently falling back to V0. + let mut valid_schema_version = true; + if let Some(schema_version) = self.config.props.remove(SQL_CATALOG_PROP_SCHEMA_VERSION) { + match SchemaVersion::from_str(&schema_version) { + Ok(schema_version) => self.config.schema_version = Some(schema_version), + Err(_) => valid_schema_version = false, + } + } + let valid_name = !name.trim().is_empty(); async move { @@ -197,6 +221,16 @@ impl CatalogBuilder for SqlCatalogBuilder { SqlBindStyle::QMark ), )) + } else if !valid_schema_version { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "`{}` values are valid only if they're `{}` or `{}`", + SQL_CATALOG_PROP_SCHEMA_VERSION, + SchemaVersion::V0, + SchemaVersion::V1 + ), + )) } else { self.config.name = name; let runtime = match self.runtime { @@ -223,6 +257,7 @@ struct SqlCatalogConfig { name: String, warehouse_location: String, sql_bind_style: SqlBindStyle, + schema_version: Option, props: HashMap, } @@ -248,6 +283,32 @@ pub enum SchemaVersion { V1, } +impl SchemaVersion { + /// The trailing SQL `AND` clause used to exclude view rows when querying for tables. + /// + /// `V1` schemas carry an `iceberg_type` column, so table rows are those tagged `TABLE` + /// (or `NULL`, for rows written before the column existed). `V0` schemas have no such + /// column, so no filter is applied. + fn record_type_filter(self) -> &'static str { + match self { + SchemaVersion::V1 => "AND (iceberg_type = 'TABLE' OR iceberg_type IS NULL)", + SchemaVersion::V0 => "", + } + } + + /// The SQL needed to migrate a `V0` catalog table up to this schema version. + /// + /// Returns `None` when the target version requires no migration (i.e. `V0`). + fn migration_sql(self) -> Option { + match self { + SchemaVersion::V1 => Some(format!( + "ALTER TABLE {CATALOG_TABLE_NAME} ADD COLUMN {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5)" + )), + SchemaVersion::V0 => None, + } + } +} + #[derive(Debug, PartialEq, strum::EnumString, strum::Display)] /// Set the SQL parameter bind style to either $1..$N (Postgres style) or ? (SQLite/MySQL/MariaDB) pub enum SqlBindStyle { @@ -323,41 +384,44 @@ impl SqlCatalog { .await .map_err(from_sqlx_error)?; - // Check if the catalog table supports views, indicating that the schema is V1 - let is_v1 = sqlx::query(&format!( + // Probe for the `iceberg_type` column to detect whether the catalog table is already a schema version v1 (which supports views). + let is_v1 = match sqlx::query(&format!( "SELECT {CATALOG_FIELD_RECORD_TYPE} FROM {CATALOG_TABLE_NAME} LIMIT 0" )) .execute(&pool) .await - .is_ok(); + { + Ok(_) => true, + // The database rejected the query: the `iceberg_type` column (or table) is absent, + // so this is a genuine V0 schema. + Err(sqlx::Error::Database(_)) => false, + // Any other error (connection dropped, pool timeout, misconfiguration, ...) is not a + // signal about the schema version. Surface it rather than misclassifying as V0. + Err(e) => return Err(from_sqlx_error(e)), + }; - // Migrate the schema to V1 if the catalog table does not support views and the caller opted in. + // Migrate the schema to V1 if the catalog table does not support views and the caller + // opted in via `sql.schema-version=V1`. let schema_version = if is_v1 { - tracing::debug!("{CATALOG_TABLE_NAME} already supports views"); + tracing::debug!("detected {CATALOG_TABLE_NAME} schema v1 which already supports views"); SchemaVersion::V1 - } else { - let requested: SchemaVersion = config - .props - .get(SQL_CATALOG_PROP_SCHEMA_VERSION) - .and_then(|v| v.parse().ok()) - .unwrap_or(SchemaVersion::V0); - if requested == SchemaVersion::V1 { - tracing::debug!("{CATALOG_TABLE_NAME} is being updated to support views"); - sqlx::query(&format!( - "ALTER TABLE {CATALOG_TABLE_NAME} \ - ADD COLUMN {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5)" - )) - .execute(&pool) - .await - .map_err(from_sqlx_error)?; - SchemaVersion::V1 - } else { - tracing::warn!( - "SqlCatalog is initialized without view support. To auto-migrate the database's schema and enable view support, set {}=V1", - SQL_CATALOG_PROP_SCHEMA_VERSION - ); - SchemaVersion::V0 + } else if config.schema_version == Some(SchemaVersion::V1) { + tracing::warn!( + "detected {CATALOG_TABLE_NAME} schema v0; migrating to v1 to enable view support" + ); + if let Some(migration_sql) = SchemaVersion::V1.migration_sql() { + sqlx::query(&migration_sql) + .execute(&pool) + .await + .map_err(from_sqlx_error)?; } + SchemaVersion::V1 + } else { + tracing::warn!( + "detected v0 {CATALOG_TABLE_NAME} schema; SqlCatalog is initialized without view support. To auto-migrate the database's schema and enable view support, set {}=V1", + SQL_CATALOG_PROP_SCHEMA_VERSION + ); + SchemaVersion::V0 }; Ok(SqlCatalog { @@ -371,13 +435,6 @@ impl SqlCatalog { }) } - fn record_type_filter(&self) -> &'static str { - match self.schema_version { - SchemaVersion::V1 => "AND (iceberg_type = 'TABLE' OR iceberg_type IS NULL)", - SchemaVersion::V0 => "", - } - } - /// SQLX Any does not implement PostgresSQL bindings, so we have to do this. fn replace_placeholders(&self, query: &str) -> String { match self.sql_bind_style { @@ -747,7 +804,7 @@ impl Catalog for SqlCatalog { WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? AND {CATALOG_FIELD_CATALOG_NAME} = ? {}", - self.record_type_filter() + self.schema_version.record_type_filter() ), vec![Some(&namespace.join(".")), Some(&self.name)], ) @@ -784,7 +841,7 @@ impl Catalog for SqlCatalog { AND {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? {}", - self.record_type_filter() + self.schema_version.record_type_filter() ), vec![Some(&namespace), Some(&self.name), Some(table_name)], ) @@ -809,7 +866,7 @@ impl Catalog for SqlCatalog { AND {CATALOG_FIELD_TABLE_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? {}", - self.record_type_filter() + self.schema_version.record_type_filter() ), vec![ Some(&self.name), @@ -848,7 +905,7 @@ impl Catalog for SqlCatalog { AND {CATALOG_FIELD_TABLE_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? {}", - self.record_type_filter() + self.schema_version.record_type_filter() ), vec![ Some(&self.name), @@ -974,7 +1031,7 @@ impl Catalog for SqlCatalog { AND {CATALOG_FIELD_TABLE_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? {}", - self.record_type_filter() + self.schema_version.record_type_filter() ), vec![ Some(dest.name()), @@ -1045,7 +1102,7 @@ impl Catalog for SqlCatalog { AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? {} AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?", - self.record_type_filter() + self.schema_version.record_type_filter() ), vec![ Some(&staged_metadata_location_str), @@ -1089,7 +1146,8 @@ mod tests { use crate::catalog::{ NAMESPACE_LOCATION_PROPERTY_KEY, SQL_CATALOG_PROP_BIND_STYLE, - SQL_CATALOG_PROP_SCHEMA_VERSION, SQL_CATALOG_PROP_URI, SQL_CATALOG_PROP_WAREHOUSE, + SQL_CATALOG_PROP_BIND_STYLE_LEGACY, SQL_CATALOG_PROP_SCHEMA_VERSION, SQL_CATALOG_PROP_URI, + SQL_CATALOG_PROP_WAREHOUSE, }; use crate::{SchemaVersion, SqlBindStyle, SqlCatalogBuilder}; @@ -2208,4 +2266,63 @@ mod tests { "iceberg_type column should not exist when sql.schema-version=V1 was not set" ); } + + #[tokio::test] + async fn test_invalid_schema_version_is_rejected() { + install_default_drivers(); + + let (uri, temp_dir) = create_v0_sqlite_db().await; + + // An unrecognized sql.schema-version value must fail fast rather than silently + // falling back to V0. + let props = HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), uri), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + temp_dir.path().to_str().unwrap().to_string(), + ), + ( + SQL_CATALOG_PROP_BIND_STYLE.to_string(), + SqlBindStyle::QMark.to_string(), + ), + ( + SQL_CATALOG_PROP_SCHEMA_VERSION.to_string(), + "v2".to_string(), + ), + ]); + let result = SqlCatalogBuilder::default() + .with_storage_factory(Arc::new(LocalFsStorageFactory)) + .load("iceberg", props) + .await; + + let err = result.expect_err("an invalid sql.schema-version should be rejected"); + assert_eq!(err.kind(), iceberg::ErrorKind::DataInvalid); + } + + #[tokio::test] + async fn test_legacy_bind_style_key_is_accepted() { + install_default_drivers(); + + let (uri, temp_dir) = create_v0_sqlite_db().await; + + // The legacy `sql_bind_style` key must keep working alongside the new `sql.bind-style`. + let props = HashMap::from_iter([ + (SQL_CATALOG_PROP_URI.to_string(), uri), + ( + SQL_CATALOG_PROP_WAREHOUSE.to_string(), + temp_dir.path().to_str().unwrap().to_string(), + ), + ( + SQL_CATALOG_PROP_BIND_STYLE_LEGACY.to_string(), + SqlBindStyle::QMark.to_string(), + ), + ]); + let catalog = SqlCatalogBuilder::default() + .with_storage_factory(Arc::new(LocalFsStorageFactory)) + .load("iceberg", props) + .await + .expect("legacy sql_bind_style key should still be accepted"); + + assert_eq!(catalog.sql_bind_style, SqlBindStyle::QMark); + } } diff --git a/crates/catalog/sql/src/lib.rs b/crates/catalog/sql/src/lib.rs index 3d6e9cb2e8..b76006ed3b 100644 --- a/crates/catalog/sql/src/lib.rs +++ b/crates/catalog/sql/src/lib.rs @@ -29,7 +29,7 @@ //! SqlBindStyle, SqlCatalogBuilder, //! }; //! -//! #[tokio::main(flavor = "current_thread")] +//! #[tokio::main] //! async fn main() { //! let catalog = SqlCatalogBuilder::default() //! .load(