Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub use cdf::scan::DeltaCdfTableProvider;
pub(crate) use data_validation::{
DataValidationExec, constraints_to_exprs, generated_columns_to_exprs, validation_predicates,
};
pub use engine::DataFusionEngine;
pub(crate) use find_files::*;
pub use table_provider::{
DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaTableProvider, TableProviderBuilder,
Expand Down
9 changes: 7 additions & 2 deletions crates/core/src/delta_datafusion/table_provider/next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use datafusion::{
physical_plan::ExecutionPlan,
};
use datafusion::catalog::{ScanArgs, ScanResult};
use delta_kernel::Engine;
use delta_kernel::table_configuration::TableConfiguration;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -175,7 +176,9 @@ impl TableProvider for DeltaScan {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let engine = DataFusionEngine::new_from_session(session);
let engine: Arc<dyn Engine> = self.snapshot.snapshot().config.engine.as_ref()
.map(|e| e.0.clone())
.unwrap_or_else(|| DataFusionEngine::new_from_session(session));

// Filter out file_id column from projection if present
let file_id_idx = self
Expand Down Expand Up @@ -218,7 +221,9 @@ impl TableProvider for DeltaScan {
}

async fn scan_with_args<'a>(&self, state: &dyn Session, args: ScanArgs<'a>) -> Result<ScanResult> {
let engine = DataFusionEngine::new_from_session(state);
let engine: Arc<dyn Engine> = self.snapshot.snapshot().config.engine.as_ref()
.map(|e| e.0.clone())
.unwrap_or_else(|| DataFusionEngine::new_from_session(state));

// Filter out file_id column from projection if present
let file_id_idx = self
Expand Down
30 changes: 19 additions & 11 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use arrow::compute::{filter_record_batch, is_not_null};
use arrow::datatypes::SchemaRef;
use arrow_arith::aggregate::sum_array_checked;
use arrow_array::{Int64Array, StructArray};

Check warning on line 24 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (LakeFS v1.48)

unused import: `StructArray`

Check warning on line 24 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `StructArray`

Check warning on line 24 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / aws-native-tls

unused import: `StructArray`

Check warning on line 24 in crates/core/src/kernel/snapshot/mod.rs

View workflow job for this annotation

GitHub Actions / Integration Tests (HDFS)

unused import: `StructArray`
use delta_kernel::actions::{Remove, Sidecar};
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::engine::arrow_data::ArrowEngineData;
Expand Down Expand Up @@ -128,14 +128,19 @@
})
}

/// Returns the engine from the config, or falls back to the log store's default engine.
fn resolve_engine(config: &DeltaTableConfig, log_store: &dyn LogStore) -> Arc<dyn Engine> {
config.engine.as_ref().map(|e| e.0.clone())
.unwrap_or_else(|| log_store.engine(None))
}

/// Create a new [`Snapshot`] instance
pub async fn try_new(
log_store: &dyn LogStore,
config: DeltaTableConfig,
version: Option<i64>,
) -> DeltaResult<Self> {
// TODO: bundle operation_id with logstore ...
let engine = log_store.engine(None);
let engine = Self::resolve_engine(&config, log_store);

// NB: kernel engine uses Url::join to construct paths,
// if the path does not end with a slash, the would override the entire path.
Expand All @@ -148,6 +153,11 @@
Self::try_new_with_engine(log_store, engine, table_root, config, version.map(|v| v as u64)).await
}

/// Returns the configured engine, falling back to the log store's default engine.
pub(crate) fn engine(&self, log_store: &dyn LogStore) -> Arc<dyn Engine> {
Self::resolve_engine(&self.config, log_store)
}

pub fn scan_builder(&self) -> ScanBuilder {
ScanBuilder::new(self.inner.clone())
}
Expand Down Expand Up @@ -276,8 +286,7 @@
Err(err) => return Box::pin(once(ready(Err(err)))),
};

// TODO: bundle operation id with log store ...
let engine = log_store.engine(None);
let engine = self.engine(log_store);
let stream = scan
.scan_metadata(engine)
.map(|d| Ok(rb_from_scan_meta(d?)?));
Expand All @@ -301,7 +310,7 @@
Err(err) => return Box::pin(once(ready(Err(err)))),
};

let engine = log_store.engine(None);
let engine = self.engine(log_store);
let stream = scan
.scan_metadata_from(engine, existing_version, existing_data, existing_predicate)
.map(|d| Ok(rb_from_scan_meta(d?)?));
Expand Down Expand Up @@ -439,8 +448,7 @@
let mut builder = RecordBatchReceiverStreamBuilder::new(100);
let tx = builder.tx();

// TODO: bundle operation id with log store ...
let engine = log_store.engine(None);
let engine = self.engine(log_store);

let remove_data = match self.inner.log_segment().read_actions(
engine.as_ref(),
Expand Down Expand Up @@ -486,8 +494,7 @@
log_store: &dyn LogStore,
app_id: String,
) -> DeltaResult<Option<i64>> {
// TODO: bundle operation id with log store ...
let engine = log_store.engine(None);
let engine = self.engine(log_store);
let inner = self.inner.clone();
let version =
spawn_blocking_with_span(move || inner.get_app_id_version(&app_id, engine.as_ref()))
Expand All @@ -506,7 +513,7 @@
log_store: &dyn LogStore,
domain: impl ToString,
) -> DeltaResult<Option<String>> {
let engine = log_store.engine(None);
let engine = self.engine(log_store);
let inner = self.inner.clone();
let domain = domain.to_string();
let metadata =
Expand Down Expand Up @@ -617,10 +624,11 @@
return Ok(());
}

let engine = self.snapshot.engine(log_store);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the reason for having public?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

self.snapshot = self
.snapshot
.clone()
.update(log_store.engine(None), target_version)
.update(engine, target_version)
.await?;

self.files = self
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub use self::schema::partitions::*;
pub use self::schema::*;
pub use self::table::DeltaTable;
pub use self::table::builder::{
DeltaTableBuilder, DeltaTableConfig, DeltaVersion, ensure_table_uri,
DeltaTableBuilder, DeltaTableConfig, DeltaVersion, EngineRef, ensure_table_uri,
};
pub use self::table::config::TableProperty;
pub use object_store::{Error as ObjectStoreError, ObjectMeta, ObjectStore, path::Path};
Expand Down
25 changes: 25 additions & 0 deletions crates/core/src/table/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use serde::{Deserialize, Serialize};
use tracing::debug;
use url::Url;

use delta_kernel::Engine;

use super::normalize_table_url;
use crate::logstore::storage::IORuntime;
use crate::logstore::{LogStoreRef, StorageConfig, object_store_factories};
Expand All @@ -29,6 +31,16 @@ pub enum DeltaVersion {
Timestamp(DateTime<Utc>),
}

/// Wrapper around `Arc<dyn Engine>` that implements `Debug` and `Clone`.
#[derive(Clone)]
pub struct EngineRef(pub Arc<dyn Engine>);

impl std::fmt::Debug for EngineRef {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "EngineRef(<engine>)")
}
}

/// Configuration options for delta table
#[derive(Debug, Serialize, Deserialize, Clone, DeltaConfig)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -61,6 +73,12 @@ pub struct DeltaTableConfig {
#[delta(skip)]
pub log_size_limiter: Option<LogSizeLimiter>,

/// Custom kernel engine to use for snapshot loading and scan operations.
/// When set, this engine is used instead of the default engine derived from the log store.
#[serde(skip_serializing, skip_deserializing)]
#[delta(skip)]
pub engine: Option<EngineRef>,

/// HSTACK: skip stats parsing during file listing. Runtime-only (not persisted).
/// Default `true` for performance; set to `false` when stats-based pruning helps the query.
#[serde(skip_serializing, skip_deserializing)]
Expand All @@ -76,6 +94,7 @@ impl Default for DeltaTableConfig {
log_batch_size: 1024,
io_runtime: None,
log_size_limiter: None,
engine: None,
skip_stats_in_file_listing: true,
}
}
Expand Down Expand Up @@ -156,6 +175,12 @@ impl DeltaTableBuilder {
self
}

/// Sets a custom kernel `Engine` to use for snapshot loading and scan operations.
pub fn with_engine(mut self, engine: Arc<dyn Engine>) -> Self {
self.table_config.engine = Some(EngineRef(engine));
self
}

/// Sets `version` to the builder
pub fn with_version(mut self, version: i64) -> Self {
self.version = DeltaVersion::Version(version);
Expand Down
Loading