From a82ed101538dcc28ff4976be75f53f330dd15397 Mon Sep 17 00:00:00 2001 From: dvlascenco Date: Tue, 5 May 2026 10:31:24 +0300 Subject: [PATCH] feat: delta engine injection support --- crates/core/src/delta_datafusion/mod.rs | 1 + .../table_provider/next/mod.rs | 9 ++++-- crates/core/src/kernel/snapshot/mod.rs | 30 ++++++++++++------- crates/core/src/lib.rs | 2 +- crates/core/src/table/builder.rs | 25 ++++++++++++++++ 5 files changed, 53 insertions(+), 14 deletions(-) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index f03bc405a..51b9b5b82 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -72,6 +72,7 @@ pub use cdf::scan::DeltaCdfTableProvider; pub(crate) use data_validation::{ DataValidationExec, constraints_to_exprs, generated_columns_to_exprs, validation_predicates, }; +pub use engine::DataFusionEngine; pub(crate) use find_files::*; pub use table_provider::{ DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaTableProvider, TableProviderBuilder, diff --git a/crates/core/src/delta_datafusion/table_provider/next/mod.rs b/crates/core/src/delta_datafusion/table_provider/next/mod.rs index 7cad05f4b..fa9d34cfc 100644 --- a/crates/core/src/delta_datafusion/table_provider/next/mod.rs +++ b/crates/core/src/delta_datafusion/table_provider/next/mod.rs @@ -40,6 +40,7 @@ use datafusion::{ physical_plan::ExecutionPlan, }; use datafusion::catalog::{ScanArgs, ScanResult}; +use delta_kernel::Engine; use delta_kernel::table_configuration::TableConfiguration; use serde::{Deserialize, Serialize}; @@ -175,7 +176,9 @@ impl TableProvider for DeltaScan { filters: &[Expr], limit: Option, ) -> Result> { - let engine = DataFusionEngine::new_from_session(session); + let engine: Arc = self.snapshot.snapshot().config.engine.as_ref() + .map(|e| e.0.clone()) + .unwrap_or_else(|| DataFusionEngine::new_from_session(session)); // Filter out file_id column from projection if present let file_id_idx = self @@ -218,7 +221,9 @@ impl TableProvider for DeltaScan { } async fn scan_with_args<'a>(&self, state: &dyn Session, args: ScanArgs<'a>) -> Result { - let engine = DataFusionEngine::new_from_session(state); + let engine: Arc = self.snapshot.snapshot().config.engine.as_ref() + .map(|e| e.0.clone()) + .unwrap_or_else(|| DataFusionEngine::new_from_session(state)); // Filter out file_id column from projection if present let file_id_idx = self diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index f1523ebf3..1703f9b52 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -128,14 +128,19 @@ impl Snapshot { }) } + /// Returns the engine from the config, or falls back to the log store's default engine. + fn resolve_engine(config: &DeltaTableConfig, log_store: &dyn LogStore) -> Arc { + config.engine.as_ref().map(|e| e.0.clone()) + .unwrap_or_else(|| log_store.engine(None)) + } + /// Create a new [`Snapshot`] instance pub async fn try_new( log_store: &dyn LogStore, config: DeltaTableConfig, version: Option, ) -> DeltaResult { - // TODO: bundle operation_id with logstore ... - let engine = log_store.engine(None); + let engine = Self::resolve_engine(&config, log_store); // NB: kernel engine uses Url::join to construct paths, // if the path does not end with a slash, the would override the entire path. @@ -148,6 +153,11 @@ impl Snapshot { Self::try_new_with_engine(log_store, engine, table_root, config, version.map(|v| v as u64)).await } + /// Returns the configured engine, falling back to the log store's default engine. + pub(crate) fn engine(&self, log_store: &dyn LogStore) -> Arc { + Self::resolve_engine(&self.config, log_store) + } + pub fn scan_builder(&self) -> ScanBuilder { ScanBuilder::new(self.inner.clone()) } @@ -276,8 +286,7 @@ impl Snapshot { Err(err) => return Box::pin(once(ready(Err(err)))), }; - // TODO: bundle operation id with log store ... - let engine = log_store.engine(None); + let engine = self.engine(log_store); let stream = scan .scan_metadata(engine) .map(|d| Ok(rb_from_scan_meta(d?)?)); @@ -301,7 +310,7 @@ impl Snapshot { Err(err) => return Box::pin(once(ready(Err(err)))), }; - let engine = log_store.engine(None); + let engine = self.engine(log_store); let stream = scan .scan_metadata_from(engine, existing_version, existing_data, existing_predicate) .map(|d| Ok(rb_from_scan_meta(d?)?)); @@ -439,8 +448,7 @@ impl Snapshot { let mut builder = RecordBatchReceiverStreamBuilder::new(100); let tx = builder.tx(); - // TODO: bundle operation id with log store ... - let engine = log_store.engine(None); + let engine = self.engine(log_store); let remove_data = match self.inner.log_segment().read_actions( engine.as_ref(), @@ -486,8 +494,7 @@ impl Snapshot { log_store: &dyn LogStore, app_id: String, ) -> DeltaResult> { - // TODO: bundle operation id with log store ... - let engine = log_store.engine(None); + let engine = self.engine(log_store); let inner = self.inner.clone(); let version = spawn_blocking_with_span(move || inner.get_app_id_version(&app_id, engine.as_ref())) @@ -506,7 +513,7 @@ impl Snapshot { log_store: &dyn LogStore, domain: impl ToString, ) -> DeltaResult> { - let engine = log_store.engine(None); + let engine = self.engine(log_store); let inner = self.inner.clone(); let domain = domain.to_string(); let metadata = @@ -617,10 +624,11 @@ impl EagerSnapshot { return Ok(()); } + let engine = self.snapshot.engine(log_store); self.snapshot = self .snapshot .clone() - .update(log_store.engine(None), target_version) + .update(engine, target_version) .await?; self.files = self diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 7246607a6..7c490bf7c 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -104,7 +104,7 @@ pub use self::schema::partitions::*; pub use self::schema::*; pub use self::table::DeltaTable; pub use self::table::builder::{ - DeltaTableBuilder, DeltaTableConfig, DeltaVersion, ensure_table_uri, + DeltaTableBuilder, DeltaTableConfig, DeltaVersion, EngineRef, ensure_table_uri, }; pub use self::table::config::TableProperty; pub use object_store::{Error as ObjectStoreError, ObjectMeta, ObjectStore, path::Path}; diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 1107d07c5..6588472f8 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -11,6 +11,8 @@ use serde::{Deserialize, Serialize}; use tracing::debug; use url::Url; +use delta_kernel::Engine; + use super::normalize_table_url; use crate::logstore::storage::IORuntime; use crate::logstore::{LogStoreRef, StorageConfig, object_store_factories}; @@ -29,6 +31,16 @@ pub enum DeltaVersion { Timestamp(DateTime), } +/// Wrapper around `Arc` that implements `Debug` and `Clone`. +#[derive(Clone)] +pub struct EngineRef(pub Arc); + +impl std::fmt::Debug for EngineRef { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "EngineRef()") + } +} + /// Configuration options for delta table #[derive(Debug, Serialize, Deserialize, Clone, DeltaConfig)] #[serde(rename_all = "camelCase")] @@ -61,6 +73,12 @@ pub struct DeltaTableConfig { #[delta(skip)] pub log_size_limiter: Option, + /// Custom kernel engine to use for snapshot loading and scan operations. + /// When set, this engine is used instead of the default engine derived from the log store. + #[serde(skip_serializing, skip_deserializing)] + #[delta(skip)] + pub engine: Option, + /// HSTACK: skip stats parsing during file listing. Runtime-only (not persisted). /// Default `true` for performance; set to `false` when stats-based pruning helps the query. #[serde(skip_serializing, skip_deserializing)] @@ -76,6 +94,7 @@ impl Default for DeltaTableConfig { log_batch_size: 1024, io_runtime: None, log_size_limiter: None, + engine: None, skip_stats_in_file_listing: true, } } @@ -156,6 +175,12 @@ impl DeltaTableBuilder { self } + /// Sets a custom kernel `Engine` to use for snapshot loading and scan operations. + pub fn with_engine(mut self, engine: Arc) -> Self { + self.table_config.engine = Some(EngineRef(engine)); + self + } + /// Sets `version` to the builder pub fn with_version(mut self, version: i64) -> Self { self.version = DeltaVersion::Version(version);