From f778ad0fe00c063c9711f1d6d1da24ea2e84d581 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 20 May 2026 22:51:04 +0000 Subject: [PATCH 01/10] refactor(select): replace 3-variant IndexExprResult with {lower, upper} interval MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the `enum IndexExprResult { Exact(M), AtMost(M), AtLeast(M) }` representation with a single struct carrying two row-address masks: pub struct IndexExprResult { pub lower: RowAddrMask, // rows definitely in the answer pub upper: RowAddrMask, // rows that may be in the answer } // (rows outside upper are definitely out) Same change applied to `NullableIndexExprResult`. The three pre-existing shapes map to degenerate intervals: Exact(m) ≡ {lower: m, upper: m} AtMost(m) ≡ {lower: allow_nothing(), upper: m} AtLeast(m)≡ {lower: m, upper: all_rows()} …and are now constructed via `IndexExprResult::{exact, at_most, at_least}` associated fns / inspected via `is_exact()` / `is_at_most()` / `is_at_least()` predicates. Intervals that are none of those — a non-empty lower strictly inside a non-universe upper, the "Refined" case — are now representable; previously the algebra had to either pick a side (e.g. `AtMost(m) & AtLeast(_) → AtMost(m)`, dropping the lower bound) or fail to produce one. The boolean algebra collapses to elementwise lattice ops: !{l, u} = {!u, !l} {l1, u1} & {l2, u2} = {l1 & l2, u1 & u2} {l1, u1} | {l2, u2} = {l1 | l2, u1 | u2} This works for both `IndexExprResult` and `NullableIndexExprResult` — the underlying `RowAddrMask` / `NullableRowAddrMask` types already implement two-valued and SQL three-valued logic correctly inside each mask, and lifting elementwise to the interval preserves both. NULL info lives inside each `NullableRowAddrMask` (via its `nulls` field), and `NullableRowAddrMask::Not` flips Allow↔Block without touching `nulls`, so `!{l, u} = {!u, !l}` preserves NULL on both endpoints. Wire format change: the (mask, discriminant) layout serialized by `ScalarIndexExec` is now (lower, upper). Two binary columns instead of one binary + one u32. The schema is internal to the in-process hand-off from `ScalarIndexExec` to `FilteredReadExec`, so this isn't a persisted-format break. Other touchpoints: * `apply_index_to_fragment` in filtered_read switches from a 3-arm `match` to a predicate chain. The three current shapes preserve their existing read-plan semantics; the Refined case is handled conservatively (read `upper`, recheck via `full_filter`). When per-range filter pushdown lands, the Refined arm will split `to_read` into guaranteed-match ranges (skip recheck) and recheck ranges (apply full filter). * `MaterializeIndexExec` in scalar_index.rs uses `result.upper` for the candidate-row set. `AtLeast` still bails (upper is universe); everything else works. * Per-fn `NullableRowAddrMask::allow_nothing()` and `all_rows()` constructors added so the `at_most` / `at_least` builders have short, obvious empty/universe sentinels. * `NullableRowAddrMask` gains `PartialEq` so `is_exact()` can check `lower == upper`. * `IndexExprResult::{from_parts, serialize_to_arrow}` (inherent methods on the old enum) → `index_expr_result_from_parts(lower, upper)` / `serialize_index_expr_result(result, frags)` free functions on `lance-index::scalar::expression`. The schema `INDEX_EXPR_RESULT_SCHEMA` keeps the same name but its columns are now (`lower`, `upper`, `fragments_covered`). * Tests in expression.rs rewritten to use the new constructors and `is_at_most()` / `is_at_least()` predicates. All assertions still hold under the new algebra (verified by the bench: every cross-variant combination tested lands in the same degenerate corner of the lattice as before). Bench deltas at N=10M rows. Both runs at criterion `--measurement-time 3 --warm-up-time 2 --sample-size 30`, same workload (single contiguous run, AllowList with no nulls), same machine, idle CPU. Median time per op: Op Variant pair Baseline 2-mask Delta not Exact 8 ns 13 ns +60% not AtMost / AtLeast 8 ns 14 ns +75% and Exact_Exact 7.24 µs 13.84 µs +91% and AtMost_AtMost 7.49 µs 7.32 µs -2% and AtLeast_AtLeast 7.22 µs 7.35 µs +2% and Exact_AtMost 7.15 µs 10.16 µs +42% and Exact_AtLeast 1.63 µs 7.19 µs +341% (was a drop) and AtMost_AtLeast 1.66 µs 1.76 µs +6% or Exact_Exact 3.82 µs 7.54 µs +97% or AtMost_AtMost 3.90 µs 3.87 µs -1% or AtLeast_AtLeast 3.82 µs 4.00 µs +5% or Exact_AtMost 3.67 µs 3.75 µs +2% or Exact_AtLeast 3.74 µs 12.49 µs +234% (was a drop) or AtMost_AtLeast 3.04 µs 11.70 µs +285% (was a drop) Reading the deltas: * NOT: a few extra ns (one mask flip → two endpoint flips + swap). Irrelevant in absolute terms. * Same-variant `AtMost_AtMost` / `AtLeast_AtLeast`: ~unchanged. The second mask op composes empty-with-empty or universe-with-universe and short-circuits. * `Exact_Exact`: ~1.9-2× slower. Both endpoints carry the full mask; we genuinely do two mask ops instead of one. Expected. * `Exact_AtMost` AND: ~1.4× slower. New result still lands in AtMost (lower stays empty after `a & empty`), but the upper-side `a & b` is unchanged from before — the +42% is the second op + the empty intersection cost. * Cases that the old algebra resolved by dropping a bound (`Exact_AtLeast` for both, `AtMost_AtLeast` for OR) are 3-4× slower in absolute terms. The old code returned without doing a mask op; the new code computes both endpoints and preserves both bounds. The follow-up optimization here is to add `BlockList(empty)`/`AllowList(empty)` fast paths to `NullableRowAddrMask::{BitAnd, BitOr}`, which would catch the universe/empty operands directly. Trade-off: a few extra microseconds per binary op on 10M-row masks (no impact on query latency, which is millisecond-scale and dominated by I/O) in exchange for richer interval results. The Refined case unlocks the actual `IS NOT NULL` acceleration we couldn't reach before — a zone-map IsNotNull search can now produce `{lower: definitely_non_null_zones, upper: not_all_null_zones}` and the read planner can both skip all-null zones (the I/O win) and skip the recheck on the guaranteed-non-null zones. Verified: * lance-select --lib: 97 passed * lance-index --lib: 302 passed * lance-table --lib: 104 passed * lance --lib io::exec::{filtered_read,scalar_index}: 25 passed * `cargo clippy --workspace --tests -- -D warnings`: clean * `cargo fmt --all --check`: clean Co-Authored-By: Claude Opus 4.7 (1M context) --- rust/lance-index/src/scalar/expression.rs | 194 +++++------ .../lance-select/benches/index_expr_result.rs | 22 +- rust/lance-select/src/mask/nullable.rs | 12 +- rust/lance-select/src/result.rs | 325 ++++++++++++------ rust/lance/src/dataset/scanner.rs | 2 +- rust/lance/src/io/exec/filtered_read.rs | 136 ++++++-- rust/lance/src/io/exec/scalar_index.rs | 34 +- rust/lance/src/io/exec/utils.rs | 21 +- 8 files changed, 474 insertions(+), 272 deletions(-) diff --git a/rust/lance-index/src/scalar/expression.rs b/rust/lance-index/src/scalar/expression.rs index 00ac3ebec7d..976426c7b5a 100644 --- a/rust/lance-index/src/scalar/expression.rs +++ b/rust/lance-index/src/scalar/expression.rs @@ -7,7 +7,7 @@ use std::{ }; use arrow::array::BinaryBuilder; -use arrow_array::{Array, RecordBatch, UInt32Array}; +use arrow_array::{Array, RecordBatch}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use async_recursion::async_recursion; use async_trait::async_trait; @@ -1293,16 +1293,16 @@ impl std::fmt::Display for ScalarIndexExpr { } } -/// When we evaluate a scalar index query we return a batch with three columns and two rows +/// When we evaluate a scalar index query we return a batch with three columns and two rows. /// -/// The first column has the block list and allow list -/// The second column tells if the result is least/exact/more (we repeat the discriminant twice) -/// The third column has the fragments covered bitmap in the first row and null in the second row +/// The first two columns carry the result's `lower` and `upper` row-address +/// mask bounds (the interval form of [`IndexExprResult`]). The third column +/// has the fragments-covered bitmap in the first row and null in the second. pub static INDEX_EXPR_RESULT_SCHEMA: LazyLock = LazyLock::new(|| { Arc::new(Schema::new(vec![ - Field::new("result".to_string(), DataType::Binary, true), - Field::new("discriminant".to_string(), DataType::UInt32, true), - Field::new("fragments_covered".to_string(), DataType::Binary, true), + Field::new("lower", DataType::Binary, true), + Field::new("upper", DataType::Binary, true), + Field::new("fragments_covered", DataType::Binary, true), ])) }); @@ -1316,27 +1316,17 @@ pub use lance_select::{IndexExprResult, NullableIndexExprResult}; impl From for NullableIndexExprResult { fn from(result: SearchResult) -> Self { match result { - SearchResult::Exact(mask) => Self::Exact(NullableRowAddrMask::AllowList(mask)), - SearchResult::AtMost(mask) => Self::AtMost(NullableRowAddrMask::AllowList(mask)), - SearchResult::AtLeast(mask) => Self::AtLeast(NullableRowAddrMask::AllowList(mask)), + SearchResult::Exact(mask) => Self::exact(NullableRowAddrMask::AllowList(mask)), + SearchResult::AtMost(mask) => Self::at_most(NullableRowAddrMask::AllowList(mask)), + SearchResult::AtLeast(mask) => Self::at_least(NullableRowAddrMask::AllowList(mask)), } } } -/// Parse an `IndexExprResult` from its serialized `(mask, discriminant)` -/// representation. Counterpart to [`serialize_index_expr_result`]. -pub fn index_expr_result_from_parts( - mask: RowAddrMask, - discriminant: u32, -) -> Result { - match discriminant { - 0 => Ok(IndexExprResult::Exact(mask)), - 1 => Ok(IndexExprResult::AtMost(mask)), - 2 => Ok(IndexExprResult::AtLeast(mask)), - _ => Err(Error::invalid_input_source( - format!("Invalid IndexExprResult discriminant: {}", discriminant).into(), - )), - } +/// Build an `IndexExprResult` from the `(lower, upper)` row-mask pair +/// produced by [`serialize_index_expr_result`]. +pub fn index_expr_result_from_parts(lower: RowAddrMask, upper: RowAddrMask) -> IndexExprResult { + IndexExprResult { lower, upper } } /// Serialize an `IndexExprResult` plus its applicable-fragments bitmap @@ -1347,11 +1337,8 @@ pub fn serialize_index_expr_result( result: &IndexExprResult, fragments_covered_by_result: &RoaringBitmap, ) -> Result { - let row_addr_mask = result.row_addr_mask(); - let row_addr_mask_arr = row_addr_mask.into_arrow()?; - let discriminant = result.discriminant(); - let discriminant_arr = - Arc::new(UInt32Array::from(vec![discriminant, discriminant])) as Arc; + let lower_arr = result.lower.into_arrow()?; + let upper_arr = result.upper.into_arrow()?; let mut fragments_covered_builder = BinaryBuilder::new(); let fragments_covered_bytes_len = fragments_covered_by_result.serialized_size(); let mut fragments_covered_bytes = Vec::with_capacity(fragments_covered_bytes_len); @@ -1362,8 +1349,8 @@ pub fn serialize_index_expr_result( Ok(RecordBatch::try_new( INDEX_EXPR_RESULT_SCHEMA.clone(), vec![ - Arc::new(row_addr_mask_arr), - Arc::new(discriminant_arr), + Arc::new(lower_arr), + Arc::new(upper_arr), Arc::new(fragments_covered_arr), ], )?) @@ -2572,55 +2559,42 @@ mod tests { async fn test_not_flips_certainty() { use lance_select::{NullableRowAddrSet, RowAddrTreeMap}; - // Test that NOT flips certainty for inexact index results - // This tests the implementation in evaluate_impl for Self::Not - - // Helper function that mimics the NOT logic we just fixed - fn apply_not(result: NullableIndexExprResult) -> NullableIndexExprResult { - match result { - NullableIndexExprResult::Exact(mask) => NullableIndexExprResult::Exact(!mask), - NullableIndexExprResult::AtMost(mask) => NullableIndexExprResult::AtLeast(!mask), - NullableIndexExprResult::AtLeast(mask) => NullableIndexExprResult::AtMost(!mask), - } - } + // Test that NOT flips certainty for inexact index results. + // Under the {lower, upper} form, `!{l, u} = {!u, !l}`, which + // preserves the AtMost ↔ AtLeast swap and leaves Exact as Exact. // AtMost: superset of matches (e.g., bloom filter says "might be in [1,2]") - let at_most = NullableIndexExprResult::AtMost(NullableRowAddrMask::AllowList( + let at_most = NullableIndexExprResult::at_most(NullableRowAddrMask::AllowList( NullableRowAddrSet::new(RowAddrTreeMap::from_iter(&[1, 2]), RowAddrTreeMap::new()), )); // NOT(AtMost) should be AtLeast (definitely NOT in [1,2], might be elsewhere) - assert!(matches!( - apply_not(at_most), - NullableIndexExprResult::AtLeast(_) - )); + assert!((!at_most).is_at_least()); // AtLeast: subset of matches (e.g., definitely in [1,2], might be more) - let at_least = NullableIndexExprResult::AtLeast(NullableRowAddrMask::AllowList( + let at_least = NullableIndexExprResult::at_least(NullableRowAddrMask::AllowList( NullableRowAddrSet::new(RowAddrTreeMap::from_iter(&[1, 2]), RowAddrTreeMap::new()), )); // NOT(AtLeast) should be AtMost (might NOT be in [1,2], definitely elsewhere) - assert!(matches!( - apply_not(at_least), - NullableIndexExprResult::AtMost(_) - )); + assert!((!at_least).is_at_most()); // Exact should stay Exact - let exact = NullableIndexExprResult::Exact(NullableRowAddrMask::AllowList( + let exact = NullableIndexExprResult::exact(NullableRowAddrMask::AllowList( NullableRowAddrSet::new(RowAddrTreeMap::from_iter(&[1, 2]), RowAddrTreeMap::new()), )); - assert!(matches!( - apply_not(exact), - NullableIndexExprResult::Exact(_) - )); + assert!((!exact).is_exact()); } #[tokio::test] async fn test_and_or_preserve_certainty() { use lance_select::{NullableRowAddrSet, RowAddrTreeMap}; - // Test that AND/OR correctly propagate certainty + // Test that AND/OR correctly propagate certainty under the + // {lower, upper} algebra. Each binary op is elementwise on the + // endpoints, so degenerate shapes (Exact / AtMost / AtLeast) + // combine into a result that lands in one of those same shapes + // in every case exercised below. let make_at_most = || { - NullableIndexExprResult::AtMost(NullableRowAddrMask::AllowList( + NullableIndexExprResult::at_most(NullableRowAddrMask::AllowList( NullableRowAddrSet::new( RowAddrTreeMap::from_iter(&[1, 2, 3]), RowAddrTreeMap::new(), @@ -2629,7 +2603,7 @@ mod tests { }; let make_at_least = || { - NullableIndexExprResult::AtLeast(NullableRowAddrMask::AllowList( + NullableIndexExprResult::at_least(NullableRowAddrMask::AllowList( NullableRowAddrSet::new( RowAddrTreeMap::from_iter(&[2, 3, 4]), RowAddrTreeMap::new(), @@ -2638,59 +2612,87 @@ mod tests { }; let make_exact = || { - NullableIndexExprResult::Exact(NullableRowAddrMask::AllowList(NullableRowAddrSet::new( + NullableIndexExprResult::exact(NullableRowAddrMask::AllowList(NullableRowAddrSet::new( RowAddrTreeMap::from_iter(&[1, 2]), RowAddrTreeMap::new(), ))) }; // AtMost & AtMost → AtMost - assert!(matches!( - make_at_most() & make_at_most(), - NullableIndexExprResult::AtMost(_) - )); + assert!((make_at_most() & make_at_most()).is_at_most()); // AtLeast & AtLeast → AtLeast - assert!(matches!( - make_at_least() & make_at_least(), - NullableIndexExprResult::AtLeast(_) - )); + assert!((make_at_least() & make_at_least()).is_at_least()); - // AtMost & AtLeast → AtMost (superset remains superset) - assert!(matches!( - make_at_most() & make_at_least(), - NullableIndexExprResult::AtMost(_) - )); + // AtMost & AtLeast → AtMost (the lower side stays empty) + assert!((make_at_most() & make_at_least()).is_at_most()); // AtMost | AtMost → AtMost - assert!(matches!( - make_at_most() | make_at_most(), - NullableIndexExprResult::AtMost(_) - )); + assert!((make_at_most() | make_at_most()).is_at_most()); // AtLeast | AtLeast → AtLeast - assert!(matches!( - make_at_least() | make_at_least(), - NullableIndexExprResult::AtLeast(_) - )); + assert!((make_at_least() | make_at_least()).is_at_least()); - // AtMost | AtLeast → AtLeast (subset coverage guaranteed) - assert!(matches!( - make_at_most() | make_at_least(), - NullableIndexExprResult::AtLeast(_) - )); + // AtMost | AtLeast → AtLeast (upper stays universe) + assert!((make_at_most() | make_at_least()).is_at_least()); // Exact & AtMost → AtMost - assert!(matches!( - make_exact() & make_at_most(), - NullableIndexExprResult::AtMost(_) - )); + assert!((make_exact() & make_at_most()).is_at_most()); // Exact | AtLeast → AtLeast - assert!(matches!( - make_exact() | make_at_least(), - NullableIndexExprResult::AtLeast(_) - )); + assert!((make_exact() | make_at_least()).is_at_least()); + } + + /// The whole point of the `{lower, upper}` representation is that it + /// can express a Refined result — a non-empty `lower` strictly inside + /// a non-universe `upper` — which the old enum couldn't. This test + /// constructs one through the algebra and verifies the endpoints. + #[tokio::test] + async fn test_refined_result_constructed_through_algebra() { + use lance_select::{NullableRowAddrSet, RowAddrTreeMap}; + + let allow_set = |rows: &[u64]| { + NullableRowAddrMask::AllowList(NullableRowAddrSet::new( + RowAddrTreeMap::from_iter(rows), + RowAddrTreeMap::new(), + )) + }; + + // AtLeast({1,2}) & Exact({1,2,3}) is Refined, because: + // lower = {1,2} ∩ {1,2,3} = {1,2} (non-empty) + // upper = universe ∩ {1,2,3} = {1,2,3} (not universe) + // lower ≠ upper (not Exact) + let at_least_12 = NullableIndexExprResult::at_least(allow_set(&[1, 2])); + let exact_123 = NullableIndexExprResult::exact(allow_set(&[1, 2, 3])); + let refined = at_least_12 & exact_123; + + // None of the shape predicates should fire — that's what makes + // this a Refined result. + assert!( + !refined.is_exact(), + "Refined must not be classified as Exact" + ); + assert!( + !refined.is_at_most(), + "Refined must not be classified as AtMost" + ); + assert!( + !refined.is_at_least(), + "Refined must not be classified as AtLeast" + ); + + // Check the actual endpoints. + assert_eq!(refined.lower, allow_set(&[1, 2])); + assert_eq!(refined.upper, allow_set(&[1, 2, 3])); + + // NOT swaps the endpoints, preserving the Refined shape. + let negated = !refined; + assert!(!negated.is_exact()); + assert!(!negated.is_at_most()); + assert!(!negated.is_at_least()); + // !{l, u} = {!u, !l}. AllowList → BlockList. + assert!(matches!(negated.lower, NullableRowAddrMask::BlockList(_))); + assert!(matches!(negated.upper, NullableRowAddrMask::BlockList(_))); } #[test] diff --git a/rust/lance-select/benches/index_expr_result.rs b/rust/lance-select/benches/index_expr_result.rs index 6e37e7217ad..8d9a86c12ba 100644 --- a/rust/lance-select/benches/index_expr_result.rs +++ b/rust/lance-select/benches/index_expr_result.rs @@ -52,9 +52,9 @@ fn bench_not(c: &mut Criterion) { for label in ["Exact", "AtMost", "AtLeast"] { let id = BenchmarkId::new(label, n); let make = move || match label { - "Exact" => NullableIndexExprResult::Exact(allow_run(n)), - "AtMost" => NullableIndexExprResult::AtMost(allow_run(n)), - "AtLeast" => NullableIndexExprResult::AtLeast(allow_run(n)), + "Exact" => NullableIndexExprResult::exact(allow_run(n)), + "AtMost" => NullableIndexExprResult::at_most(allow_run(n)), + "AtLeast" => NullableIndexExprResult::at_least(allow_run(n)), _ => unreachable!(), }; group.bench_function(id, |b| { @@ -75,33 +75,35 @@ type PairFn = Box (NullableIndexExprResult, NullableIndexExprResult) /// cases that today drop information (Exact/AtMost, Exact/AtLeast, /// AtMost/AtLeast). fn pair_cases(n: u64) -> Vec<(&'static str, PairFn)> { - use NullableIndexExprResult::*; let lhs = move || allow_run(n); let rhs = move || allow_middle(n); + let exact = |m| NullableIndexExprResult::exact(m); + let at_most = |m| NullableIndexExprResult::at_most(m); + let at_least = |m| NullableIndexExprResult::at_least(m); vec![ ( "Exact_Exact", - Box::new(move || (Exact(lhs()), Exact(rhs()))), + Box::new(move || (exact(lhs()), exact(rhs()))), ), ( "AtMost_AtMost", - Box::new(move || (AtMost(lhs()), AtMost(rhs()))), + Box::new(move || (at_most(lhs()), at_most(rhs()))), ), ( "AtLeast_AtLeast", - Box::new(move || (AtLeast(lhs()), AtLeast(rhs()))), + Box::new(move || (at_least(lhs()), at_least(rhs()))), ), ( "Exact_AtMost", - Box::new(move || (Exact(lhs()), AtMost(rhs()))), + Box::new(move || (exact(lhs()), at_most(rhs()))), ), ( "Exact_AtLeast", - Box::new(move || (Exact(lhs()), AtLeast(rhs()))), + Box::new(move || (exact(lhs()), at_least(rhs()))), ), ( "AtMost_AtLeast", - Box::new(move || (AtMost(lhs()), AtLeast(rhs()))), + Box::new(move || (at_most(lhs()), at_least(rhs()))), ), ] } diff --git a/rust/lance-select/src/mask/nullable.rs b/rust/lance-select/src/mask/nullable.rs index 81615ba64b0..f76838170f3 100644 --- a/rust/lance-select/src/mask/nullable.rs +++ b/rust/lance-select/src/mask/nullable.rs @@ -126,13 +126,23 @@ impl std::ops::BitOrAssign<&Self> for NullableRowAddrSet { /// This mask handles three-valued logic for SQL expressions, where a filter can /// evaluate to TRUE, FALSE, or NULL. The `selected` set includes rows that are /// TRUE or NULL. The `nulls` set includes rows that are NULL. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub enum NullableRowAddrMask { AllowList(NullableRowAddrSet), BlockList(NullableRowAddrSet), } impl NullableRowAddrMask { + /// A mask that matches no rows (TRUE = ∅, NULL = ∅). + pub fn allow_nothing() -> Self { + Self::AllowList(NullableRowAddrSet::empty()) + } + + /// A mask that matches every row (TRUE = universe, NULL = ∅). + pub fn all_rows() -> Self { + Self::BlockList(NullableRowAddrSet::empty()) + } + pub fn selected(&self, row_id: u64) -> bool { match self { Self::AllowList(NullableRowAddrSet { selected, nulls }) => { diff --git a/rust/lance-select/src/result.rs b/rust/lance-select/src/result.rs index 759c139a0fd..8c38b75f102 100644 --- a/rust/lance-select/src/result.rs +++ b/rust/lance-select/src/result.rs @@ -1,51 +1,150 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -//! Certainty-tagged wrappers around a row-address mask returned by a +//! Interval-shaped wrappers around a row-address mask returned by a //! scalar-index expression evaluation. //! -//! These types model the three possible degrees of knowledge an index -//! search can return: +//! Each result describes a closed interval `[lower, upper]` in the +//! lattice of subsets: //! -//! * [`Exact`] — the mask is the precise answer; no recheck needed. -//! * [`AtMost`] — the mask is a *superset* of the true answer; the rows -//! inside the mask must be rechecked against the predicate. -//! * [`AtLeast`] — the mask is a *subset* of the true answer; the rows -//! outside the mask must be rechecked against the predicate. +//! * `lower` — rows the index *guarantees* are in the answer. +//! * `upper` — rows that *might* be in the answer; rows outside `upper` +//! are guaranteed not in the answer. //! -//! The boolean algebra (`Not`/`BitAnd`/`BitOr`) is implemented on both -//! [`NullableIndexExprResult`] (the form during evaluation, carrying SQL -//! three-valued logic via [`NullableRowAddrMask`]) and -//! [`IndexExprResult`] (the form consumed by the read planner, after -//! `drop_nulls` collapses NULL rows into FALSE). +//! The three pre-existing "shapes" map onto degenerate intervals: //! -//! [`Exact`]: IndexExprResult::Exact -//! [`AtMost`]: IndexExprResult::AtMost -//! [`AtLeast`]: IndexExprResult::AtLeast - -use crate::mask::{NullableRowAddrMask, RowAddrMask}; - -/// Result of an index search before NULL rows are dropped. Carries -/// three-valued-logic information via [`NullableRowAddrMask`]. -#[derive(Debug)] -pub enum NullableIndexExprResult { - Exact(NullableRowAddrMask), - AtMost(NullableRowAddrMask), - AtLeast(NullableRowAddrMask), +//! | Old variant | Interval form | +//! |-------------|----------------------------------------| +//! | `Exact(m)` | `{lower: m, upper: m}` | +//! | `AtMost(m)` | `{lower: allow_nothing(), upper: m}` | +//! | `AtLeast(m)`| `{lower: m, upper: all_rows()}` | +//! +//! Use [`IndexExprResult::exact`] / [`IndexExprResult::at_most`] / +//! [`IndexExprResult::at_least`] to construct those shapes, and the +//! matching [`IndexExprResult::is_exact`] etc. predicates to inspect +//! them. Intervals that are neither (the "Refined" case — a non-empty +//! `lower` strictly inside a non-universe `upper`) arise from indices +//! that can distinguish guaranteed-match from candidate-match rows +//! within a single search (e.g. a zone map answering `IS NOT NULL`). +//! +//! The boolean algebra (`Not` / `BitAnd` / `BitOr`) is elementwise on +//! the endpoints: +//! +//! ```text +//! !{l, u} = {!u, !l} +//! {l1, u1} & {l2, u2} = {l1 & l2, u1 & u2} +//! {l1, u1} | {l2, u2} = {l1 | l2, u1 | u2} +//! ``` +//! +//! This works for both the post-`drop_nulls` form ([`IndexExprResult`], +//! backed by [`RowAddrMask`]) and the during-evaluation form +//! ([`NullableIndexExprResult`], backed by [`NullableRowAddrMask`]) — +//! the per-endpoint algebra already implements two-valued and SQL +//! three-valued logic correctly inside each mask type. + +use crate::mask::{NullableRowAddrMask, RowAddrMask, RowSetOps}; + +/// Result of an index search before NULL rows are dropped. Each endpoint +/// is a [`NullableRowAddrMask`] carrying SQL three-valued logic info. +#[derive(Debug, Clone)] +pub struct NullableIndexExprResult { + /// Rows the index *guarantees* are TRUE. + pub lower: NullableRowAddrMask, + /// Rows that may be TRUE. Rows outside `upper` are guaranteed to be + /// FALSE / NULL (and so not in a `WHERE` answer set). + pub upper: NullableRowAddrMask, +} + +impl NullableIndexExprResult { + /// Precise result — every row in `mask` is in the answer and every + /// row outside is not. Equivalent to the old `Exact` variant. + pub fn exact(mask: NullableRowAddrMask) -> Self { + Self { + lower: mask.clone(), + upper: mask, + } + } + + /// Upper-bound-only result — rows outside `mask` are guaranteed not + /// to match; rows inside may match and require a recheck. + /// Equivalent to the old `AtMost` variant. + pub fn at_most(mask: NullableRowAddrMask) -> Self { + Self { + lower: NullableRowAddrMask::allow_nothing(), + upper: mask, + } + } + + /// Lower-bound-only result — rows in `mask` are guaranteed to match; + /// rows outside may match too and require a recheck. Equivalent to + /// the old `AtLeast` variant. + pub fn at_least(mask: NullableRowAddrMask) -> Self { + Self { + lower: mask, + upper: NullableRowAddrMask::all_rows(), + } + } + + /// True if `lower == upper` — the answer is precisely the lower + /// (== upper) mask. + /// + /// This is a **structural** check on the canonical form produced by + /// the constructors / algebra: an `Exact(m)` built with + /// [`Self::exact`] holds equal masks, and elementwise `&` / `|` / `!` + /// preserve that. It is not a semantic emptiness test — a + /// hand-constructed `IndexExprResult` whose endpoints are + /// representationally distinct but semantically equal (e.g. + /// `AllowList(universe)` vs `BlockList(empty)`) will report + /// `is_exact() == false`. All in-tree code paths construct results + /// through the canonical builders, so this is sound in practice. + /// + /// The three shape predicates are not mutually exclusive — see the + /// note on [`Self::is_at_least`] for the precedence convention. + pub fn is_exact(&self) -> bool { + self.lower == self.upper + } + + /// True if `lower` matches no rows (canonical `AllowList(∅)`) — the + /// index gives only an upper bound on the answer. + /// + /// Like [`Self::is_exact`], this is a structural check on the + /// canonical form. See that doc for the caveat. + pub fn is_at_most(&self) -> bool { + matches!(&self.lower, NullableRowAddrMask::AllowList(set) if set.is_empty()) + } + + /// True if `upper` covers every row (canonical `BlockList(∅)`) — the + /// index gives only a lower bound on the answer. + /// + /// **Precedence convention** for consumers branching on shape: check + /// [`Self::is_exact`] *first* (Exact-of-empty satisfies both + /// `is_exact` and `is_at_most`; Exact-of-universe satisfies both + /// `is_exact` and `is_at_least`); then `is_at_least`; finally treat + /// the residual as `is_at_most` or Refined. The branches in + /// `filtered_read::apply_index_to_fragment` follow this order. + pub fn is_at_least(&self) -> bool { + matches!(&self.upper, NullableRowAddrMask::BlockList(set) if set.is_empty()) + } + + /// Project NULL rows out of the result. + /// + /// Under a `WHERE` clause NULL is treated as FALSE, so `drop_nulls` + /// folds NULL rows out of the answer at each endpoint. + pub fn drop_nulls(self) -> IndexExprResult { + IndexExprResult { + lower: self.lower.drop_nulls(), + upper: self.upper.drop_nulls(), + } + } } impl std::ops::Not for NullableIndexExprResult { type Output = Self; fn not(self) -> Self { - // Flip certainty: NOT(AtMost) → AtLeast, NOT(AtLeast) → AtMost. - // NULL info is preserved by `NullableRowAddrMask::not` (it flips - // AllowList ↔ BlockList without touching the `nulls` field), which - // is the 3VL-correct negation: TRUE↔FALSE swap, NULL stays NULL. - match self { - Self::Exact(mask) => Self::Exact(!mask), - Self::AtMost(mask) => Self::AtLeast(!mask), - Self::AtLeast(mask) => Self::AtMost(!mask), + Self { + lower: !self.upper, + upper: !self.lower, } } } @@ -54,22 +153,9 @@ impl std::ops::BitAnd for NullableIndexExprResult { type Output = Self; fn bitand(self, rhs: Self) -> Self { - match (self, rhs) { - (Self::Exact(lhs), Self::Exact(rhs)) => Self::Exact(lhs & rhs), - (Self::Exact(lhs), Self::AtMost(rhs)) | (Self::AtMost(lhs), Self::Exact(rhs)) => { - Self::AtMost(lhs & rhs) - } - (Self::Exact(exact), Self::AtLeast(_)) | (Self::AtLeast(_), Self::Exact(exact)) => { - // We could do better here, elements in both lhs and rhs are known - // to be true and don't require a recheck. We only need to recheck - // elements in lhs that are not in rhs - Self::AtMost(exact) - } - (Self::AtMost(lhs), Self::AtMost(rhs)) => Self::AtMost(lhs & rhs), - (Self::AtLeast(lhs), Self::AtLeast(rhs)) => Self::AtLeast(lhs & rhs), - (Self::AtMost(most), Self::AtLeast(_)) | (Self::AtLeast(_), Self::AtMost(most)) => { - Self::AtMost(most) - } + Self { + lower: self.lower & rhs.lower, + upper: self.upper & rhs.upper, } } } @@ -78,74 +164,103 @@ impl std::ops::BitOr for NullableIndexExprResult { type Output = Self; fn bitor(self, rhs: Self) -> Self { - match (self, rhs) { - (Self::Exact(lhs), Self::Exact(rhs)) => Self::Exact(lhs | rhs), - (Self::Exact(lhs), Self::AtMost(rhs)) | (Self::AtMost(rhs), Self::Exact(lhs)) => { - // We could do better here, elements in lhs are known to be true - // and don't require a recheck. We only need to recheck elements - // in rhs that are not in lhs - Self::AtMost(lhs | rhs) - } - (Self::Exact(lhs), Self::AtLeast(rhs)) | (Self::AtLeast(rhs), Self::Exact(lhs)) => { - Self::AtLeast(lhs | rhs) - } - (Self::AtMost(lhs), Self::AtMost(rhs)) => Self::AtMost(lhs | rhs), - (Self::AtLeast(lhs), Self::AtLeast(rhs)) => Self::AtLeast(lhs | rhs), - (Self::AtMost(_), Self::AtLeast(least)) | (Self::AtLeast(least), Self::AtMost(_)) => { - Self::AtLeast(least) - } + Self { + lower: self.lower | rhs.lower, + upper: self.upper | rhs.upper, } } } -impl NullableIndexExprResult { - /// Project NULL rows out of the result. - /// - /// Under a `WHERE` clause, NULL is treated as FALSE — so `drop_nulls` - /// removes them from `AllowList`s (NULL rows are not selected) and - /// folds them into `BlockList`s (NULL rows are still blocked). - pub fn drop_nulls(self) -> IndexExprResult { - match self { - Self::Exact(mask) => IndexExprResult::Exact(mask.drop_nulls()), - Self::AtMost(mask) => IndexExprResult::AtMost(mask.drop_nulls()), - Self::AtLeast(mask) => IndexExprResult::AtLeast(mask.drop_nulls()), +/// Result of an index search after NULL rows have been dropped. This is +/// what the read planner consumes. +#[derive(Debug, Clone)] +pub struct IndexExprResult { + /// Rows the index *guarantees* are in the answer. + pub lower: RowAddrMask, + /// Rows that may be in the answer. Rows outside `upper` are + /// guaranteed not in the answer. + pub upper: RowAddrMask, +} + +impl IndexExprResult { + /// Precise result — every row in `mask` is in the answer and every + /// row outside is not. Equivalent to the old `Exact` variant. + pub fn exact(mask: RowAddrMask) -> Self { + Self { + lower: mask.clone(), + upper: mask, } } + + /// Upper-bound-only result. Equivalent to the old `AtMost` variant. + pub fn at_most(mask: RowAddrMask) -> Self { + Self { + lower: RowAddrMask::allow_nothing(), + upper: mask, + } + } + + /// Lower-bound-only result. Equivalent to the old `AtLeast` variant. + pub fn at_least(mask: RowAddrMask) -> Self { + Self { + lower: mask, + upper: RowAddrMask::all_rows(), + } + } + + /// True if `lower == upper` — the answer is precisely the lower + /// (== upper) mask. See [`NullableIndexExprResult::is_exact`] for the + /// structural-form caveat and the precedence convention shared with + /// [`Self::is_at_most`] / [`Self::is_at_least`]. + pub fn is_exact(&self) -> bool { + self.lower == self.upper + } + + /// True if `lower` matches no rows (canonical `AllowList(∅)`) — the + /// index gives only an upper bound on the answer. See + /// [`NullableIndexExprResult::is_exact`] for caveats. + pub fn is_at_most(&self) -> bool { + matches!(&self.lower, RowAddrMask::AllowList(set) if set.is_empty()) + } + + /// True if `upper` covers every row (canonical `BlockList(∅)`) — the + /// index gives only a lower bound on the answer. See + /// [`NullableIndexExprResult::is_at_least`] for the precedence + /// convention consumers should follow. + pub fn is_at_least(&self) -> bool { + matches!(&self.upper, RowAddrMask::BlockList(set) if set.is_empty()) + } } -/// Result of an index search after NULL rows have been dropped. This is -/// what the read planner consumes. -#[derive(Debug)] -pub enum IndexExprResult { - /// The answer is exactly the rows in the allow list minus the rows - /// in the block list. - Exact(RowAddrMask), - /// The answer is at most the rows in the allow list minus the rows - /// in the block list. Some of the rows in the allow list may not be - /// in the result and will need to be filtered by a recheck. Every - /// row in the block list is definitely not in the result. - AtMost(RowAddrMask), - /// The answer is at least the rows in the allow list minus the rows - /// in the block list. Some of the rows in the block list might be in - /// the result. Every row in the allow list is definitely in the - /// result. - AtLeast(RowAddrMask), +impl std::ops::Not for IndexExprResult { + type Output = Self; + + fn not(self) -> Self { + Self { + lower: !self.upper, + upper: !self.lower, + } + } } -impl IndexExprResult { - pub fn row_addr_mask(&self) -> &RowAddrMask { - match self { - Self::Exact(mask) => mask, - Self::AtMost(mask) => mask, - Self::AtLeast(mask) => mask, +impl std::ops::BitAnd for IndexExprResult { + type Output = Self; + + fn bitand(self, rhs: Self) -> Self { + Self { + lower: self.lower & rhs.lower, + upper: self.upper & rhs.upper, } } +} + +impl std::ops::BitOr for IndexExprResult { + type Output = Self; - pub fn discriminant(&self) -> u32 { - match self { - Self::Exact(_) => 0, - Self::AtMost(_) => 1, - Self::AtLeast(_) => 2, + fn bitor(self, rhs: Self) -> Self { + Self { + lower: self.lower | rhs.lower, + upper: self.upper | rhs.upper, } } } diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index d0f5983b3ce..53eb569ac02 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -2899,7 +2899,7 @@ impl Scanner { fn u64s_as_take_input(&self, u64s: Vec) -> Result> { let row_addrs = RowAddrTreeMap::from_iter(u64s); let row_addr_mask = RowAddrMask::from_allowed(row_addrs); - let index_result = IndexExprResult::Exact(row_addr_mask); + let index_result = IndexExprResult::exact(row_addr_mask); let fragments_covered = self.dataset.fragment_bitmap.as_ref().clone(); let batch = serialize_index_expr_result(&index_result, &fragments_covered)?; let stream = futures::stream::once(async move { Ok(batch) }); diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index 0b83c000ef0..44eb594e84c 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -8,7 +8,6 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::{ops::Range, sync::Arc}; use arrow::array::AsArray; -use arrow::datatypes::UInt32Type; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; use datafusion::common::runtime::SpawnedTask; @@ -99,9 +98,9 @@ impl EvaluatedIndex { .into(), )); } - let row_addr_mask = RowAddrMask::from_arrow(batch.column(0).as_binary())?; - let match_type = batch.column(1).as_primitive::().values()[0]; - let index_result = index_expr_result_from_parts(row_addr_mask, match_type)?; + let lower = RowAddrMask::from_arrow(batch.column(0).as_binary())?; + let upper = RowAddrMask::from_arrow(batch.column(1).as_binary())?; + let index_result = index_expr_result_from_parts(lower, upper); let applicable_fragments = batch.column(2).as_binary::(); let applicable_fragments = RoaringBitmap::deserialize_from(applicable_fragments.value(0))?; @@ -612,14 +611,17 @@ impl FilteredReadStream { // Resolve filter for this fragment let filter = if let Some(evaluated_index) = evaluated_index { if evaluated_index.applicable_fragments.contains(fragment_id) { - match &evaluated_index.index_result { - IndexExprResult::Exact(_) => options.refine_filter.clone(), - IndexExprResult::AtLeast(_) - if scan_planned_with_limit_pushed_down => - { - options.refine_filter.clone() - } - _ => options.full_filter.clone(), + let r = &evaluated_index.index_result; + // `Exact` results don't need a recheck. `AtLeast` + // results can also skip recheck when the + // skip/take pushdown is in play (we only read the + // guaranteed-match ranges in that case). + let can_skip_recheck = r.is_exact() + || (r.is_at_least() && scan_planned_with_limit_pushed_down); + if can_skip_recheck { + options.refine_filter.clone() + } else { + options.full_filter.clone() } } else { options.full_filter.clone() @@ -728,29 +730,40 @@ impl FilteredReadStream { if evaluated_index.applicable_fragments.contains(fragment_id) { let _span = tracing::span!(tracing::Level::DEBUG, "apply_index_result").entered(); - match &evaluated_index.index_result { - IndexExprResult::Exact(row_addr_mask) => { - let valid_ranges = row_id_sequence.mask_to_offset_ranges(row_addr_mask); - let mut matched_ranges = Self::intersect_ranges(&to_read, &valid_ranges); - fragments_to_read.insert(fragment_id, matched_ranges.clone()); - - Self::apply_skip_take_to_ranges(&mut matched_ranges, to_skip, to_take); - scan_push_down_fragments_to_read.insert(fragment_id, matched_ranges); - } - IndexExprResult::AtMost(row_addr_mask) => { - // Cannot push down skip/take for AtMost - let valid_ranges = row_id_sequence.mask_to_offset_ranges(row_addr_mask); - let matched_ranges = Self::intersect_ranges(&to_read, &valid_ranges); - fragments_to_read.insert(fragment_id, matched_ranges); - } - IndexExprResult::AtLeast(row_addr_mask) => { - let valid_ranges = row_id_sequence.mask_to_offset_ranges(row_addr_mask); - let mut guaranteed_ranges = Self::intersect_ranges(&to_read, &valid_ranges); - fragments_to_read.insert(fragment_id, guaranteed_ranges.clone()); - - Self::apply_skip_take_to_ranges(&mut guaranteed_ranges, to_skip, to_take); - scan_push_down_fragments_to_read.insert(fragment_id, guaranteed_ranges); - } + let index_result = &evaluated_index.index_result; + if index_result.is_exact() { + // lower == upper; either side gives the precise answer. + let valid_ranges = row_id_sequence.mask_to_offset_ranges(&index_result.upper); + let mut matched_ranges = Self::intersect_ranges(&to_read, &valid_ranges); + fragments_to_read.insert(fragment_id, matched_ranges.clone()); + + Self::apply_skip_take_to_ranges(&mut matched_ranges, to_skip, to_take); + scan_push_down_fragments_to_read.insert(fragment_id, matched_ranges); + } else if index_result.is_at_least() { + // upper is universe; lower is the guaranteed-match set + // used for the skip/take push-down path. + let valid_ranges = row_id_sequence.mask_to_offset_ranges(&index_result.lower); + let mut guaranteed_ranges = Self::intersect_ranges(&to_read, &valid_ranges); + fragments_to_read.insert(fragment_id, guaranteed_ranges.clone()); + + Self::apply_skip_take_to_ranges(&mut guaranteed_ranges, to_skip, to_take); + scan_push_down_fragments_to_read.insert(fragment_id, guaranteed_ranges); + } else { + // AtMost or true Refined: read everything in `upper` + // and rely on the full-filter recheck for survivors. + // + // For AtMost the index gives no lower bound, so there's + // no skip/take push-down to do. For Refined the lower + // bound *is* a guaranteed-match set, but exploiting it + // requires per-range filter push-down (different filter + // per range within a fragment), which the current plan + // doesn't support. The recheck-skip opportunity on the + // `lower` portion would also be visible up at the + // `can_skip_recheck` block — both are deferred. See + // TODO(refined-pushdown). + let valid_ranges = row_id_sequence.mask_to_offset_ranges(&index_result.upper); + let matched_ranges = Self::intersect_ranges(&to_read, &valid_ranges); + fragments_to_read.insert(fragment_id, matched_ranges); } } else { // Fragment not indexed. Normally we add the full fragment to keep @@ -2372,6 +2385,59 @@ mod tests { )) } + /// Round-trip every interval shape through the arrow wire format and + /// confirm the endpoints survive. Exercises both + /// `serialize_index_expr_result` and `EvaluatedIndex::try_from_arrow` + /// so the schema names stay in sync. + #[test] + fn test_index_expr_result_serialize_roundtrip() { + use lance_index::scalar::expression::serialize_index_expr_result; + use lance_select::{RowAddrMask, RowAddrTreeMap}; + + let mk = |rows: &[u64]| RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(rows)); + + let mut frags = RoaringBitmap::new(); + frags.insert(0); + frags.insert(7); + + let cases = vec![ + ("exact", IndexExprResult::exact(mk(&[1, 2, 3]))), + ("at_most", IndexExprResult::at_most(mk(&[1, 2, 3]))), + ("at_least", IndexExprResult::at_least(mk(&[1, 2]))), + // Refined: non-empty lower strictly inside non-universe upper. + ( + "refined", + IndexExprResult { + lower: mk(&[1, 2]), + upper: mk(&[1, 2, 3]), + }, + ), + ]; + + for (name, original) in cases { + let batch = serialize_index_expr_result(&original, &frags) + .unwrap_or_else(|e| panic!("serialize {name}: {e}")); + let decoded = EvaluatedIndex::try_from_arrow(&batch) + .unwrap_or_else(|e| panic!("try_from_arrow {name}: {e}")); + + // The underlying RowAddrTreeMap is PartialEq; compare via mask + // emptiness + symmetric difference being empty would be more + // robust, but the canonical builders preserve representation. + assert_eq!( + decoded.index_result.lower, original.lower, + "{name}: lower endpoint changed across round-trip", + ); + assert_eq!( + decoded.index_result.upper, original.upper, + "{name}: upper endpoint changed across round-trip", + ); + assert_eq!( + decoded.applicable_fragments, frags, + "{name}: applicable fragments changed across round-trip", + ); + } + } + #[test_log::test(tokio::test)] async fn test_bloom_filter_is_not_null_prefilter() { let (_tmp_path, dataset) = dataset_with_bloom_filter_nulls().await; diff --git a/rust/lance/src/io/exec/scalar_index.rs b/rust/lance/src/io/exec/scalar_index.rs index f6fd1d0a197..84053b8add4 100644 --- a/rust/lance/src/io/exec/scalar_index.rs +++ b/rust/lance/src/io/exec/scalar_index.rs @@ -361,9 +361,10 @@ impl MapIndexExec { needs_recheck: false, }); let query_result = query.evaluate(dataset.as_ref(), metrics.as_ref()).await?; - let IndexExprResult::Exact(mut row_addr_mask) = query_result else { + if !query_result.is_exact() { todo!("Support for non-exact query results as input for merge_insert") - }; + } + let mut row_addr_mask = query_result.upper; if let Some(deletion_mask) = deletion_mask.as_ref() { row_addr_mask = row_addr_mask & deletion_mask.as_ref().clone(); @@ -555,21 +556,26 @@ impl MaterializeIndexExec { // when the index was trained will still be deleted when the index is queried. DatasetPreFilter::create_deletion_mask(dataset.clone(), fragment_bitmap) }); + // MaterializeIndexExec emits a deterministic set of row ids. The + // `upper` mask of the interval is the candidate set (the answer is + // a subset of `upper`). For `Exact` results this is the exact + // answer; for `AtMost` and Refined results it's a superset that + // gets pruned downstream by `LanceFilterExec` (the full filter + // runs on the materialized batches via the scan plan, so any + // non-matching candidates in `upper` are dropped before they + // reach the user). `AtLeast` carries an unbounded upper, so the + // candidate set is the whole row space — not actionable here. + let take_upper = |result: IndexExprResult| -> Result { + if result.is_at_least() && !result.is_exact() { + todo!("Support AtLeast in MaterializeIndexExec") + } + Ok(result.upper) + }; let mask = if let Some(prefilter) = prefilter { let (expr_result, prefilter) = futures::try_join!(expr_result, prefilter)?; - let mask = match expr_result { - IndexExprResult::Exact(mask) => mask, - IndexExprResult::AtMost(mask) => mask, - IndexExprResult::AtLeast(_) => todo!("Support AtLeast in MaterializeIndexExec"), - }; - mask & (*prefilter).clone() + take_upper(expr_result)? & (*prefilter).clone() } else { - let expr_result = expr_result.await?; - match expr_result { - IndexExprResult::Exact(mask) => mask, - IndexExprResult::AtMost(mask) => mask, - IndexExprResult::AtLeast(_) => todo!("Support AtLeast in MaterializeIndexExec"), - } + take_upper(expr_result.await?)? }; let ids = row_ids_for_mask(mask, &dataset, &fragments).await?; let ids = UInt64Array::from(ids); diff --git a/rust/lance/src/io/exec/utils.rs b/rust/lance/src/io/exec/utils.rs index 004b1a95b5f..488491691c0 100644 --- a/rust/lance/src/io/exec/utils.rs +++ b/rust/lance/src/io/exec/utils.rs @@ -97,18 +97,19 @@ pub(crate) struct SelectionVectorToPrefilter(pub SendableRecordBatchStream); #[async_trait] impl FilterLoader for SelectionVectorToPrefilter { async fn load(mut self: Box) -> Result { - let batch = self - .0 - .try_next() - .await? - .ok_or_else(|| { - Error::internal("Selection vector source for prefilter did not yield any batches") - }) - .unwrap(); - RowAddrMask::from_arrow(batch["result"].as_binary_opt::().ok_or_else(|| { + let batch = self.0.try_next().await?.ok_or_else(|| { + Error::internal("Selection vector source for prefilter did not yield any batches") + })?; + // The vector-search prefilter wants the set of rows the search is + // allowed to consider. Under the {lower, upper} interval form the + // `upper` mask is exactly that — rows outside it are guaranteed + // not to match. For an `Exact` result `upper == lower`; for + // `AtLeast` `upper` is the universe (which lets the search see + // everything, matching the AtLeast semantics). + RowAddrMask::from_arrow(batch["upper"].as_binary_opt::().ok_or_else(|| { Error::internal(format!( "Expected selection vector input to yield binary arrays but got {}", - batch["result"].data_type() + batch["upper"].data_type() )) })?) } From eb3be846499a14fd0861b4e7568237b4f0a378e9 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 28 May 2026 17:05:02 +0000 Subject: [PATCH 02/10] feat(index): preserve legacy ScalarIndexExec wire format under relational-algebra v1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a backwards-compat path for the `ScalarIndexExec` → `FilteredReadExec` hand-off so the prior commit's `(lower, upper)` schema change doesn't break older read planners. Introduces `LANCE_RELATIONAL_ALGEBRA_VERSION` (currently `1`) as the knob that picks between legacy and new wire formats; while it stays at 1, `ScalarIndexExec` emits and advertises the pre-refactor `{result, discriminant, fragments_covered}` shape. Pieces: * `lance-index::scalar::expression` - `LEGACY_INDEX_EXPR_RESULT_SCHEMA`: the legacy 3-shape schema kept alongside the new `{lower, upper, fragments_covered}` one. - `legacy_serialize_index_expr_result`: writes the legacy layout from an `IndexExprResult`. Refined intervals (lower strictly inside a non-universe upper) can't be encoded in the 3-shape format, so they `tracing::warn!` and degrade to `AtMost(upper)` — `upper` is already a valid superset and `AtMost` signals the consumer to recheck. - `deserialize_index_expr_result`: centralized deserialization that handles both legacy and new schemas based on the first column name (`result` vs `lower`). - New-format `serialize_index_expr_result` now omits `upper` when `result.is_exact()` by writing two nulls. `RowAddrMask::into_arrow` always produces exactly one non-null row, so a fully-null `upper` can't collide with any real mask; the deserializer reuses `lower` in that case. Saves a full mask payload on every exact result. * `lance::io::exec` / `scalar_index.rs` - `LANCE_RELATIONAL_ALGEBRA_VERSION` constant. - Local `serialize_index_expr_result` wrapper that dispatches to legacy vs new based on the version. - Local `INDEX_EXPR_RESULT_SCHEMA` `LazyLock` that resolves to the legacy schema under v1 and the new schema after. Used by `ScalarIndexExec` for `PlanProperties`, `schema()`, `partition_statistics`, and the stream adapter so the plan's advertised schema and the emitted batch agree. - Fixed a now-dead-code bug in the prior `legacy_serialize_index_expr_result` draft that built the `RecordBatch` with the new schema while supplying legacy `(Binary, UInt32, Binary)` columns — would have failed `RecordBatch::try_new` on every call. * `lance::dataset::scanner` / `lance::io::exec::filtered_read` - `scanner::u64s_as_take_input` switched to the version-aware schema wrapper so the take-source stream's advertised schema matches what the wrapper actually emits. - `EvaluatedIndex::try_from_arrow` now calls the centralized `deserialize_index_expr_result` instead of re-doing the column-by-column decoding inline. Tests added (4): * `scalar::expression::test_serialize_index_expr_result_round_trip` — round-trips `exact`/`at_most`/`at_least` through serialize + deserialize and asserts both endpoints and the fragments bitmap survive. * `scalar::expression::test_serialize_omits_upper_when_exact` — pins the wire-format invariant: exact ⇒ upper fully null, at_most ⇒ upper carries payload, at_least (upper = all_rows) still encodes a non-null row 0 and round-trips with `is_at_least()`. * `scalar::expression::test_legacy_serialize_refined_degrades_to_at_most` — constructs a refined `IndexExprResult` and asserts the legacy serializer doesn't error: the round-tripped result is `AtMost` carrying the original `upper`. * `io::exec::scalar_index::test_scalar_index_exec_returns_legacy_format` + `test_scalar_index_exec_advertises_legacy_schema` — runs `ScalarIndexExec` end-to-end against a BTree-indexed dataset and asserts the emitted batch, `plan.schema()`, `plan.partition_statistics(None)`, and the stream's advertised schema all agree on the legacy schema. Verified: * lance-index --lib scalar::expression::tests: 11 passed * lance --lib io::exec::scalar_index: 4 passed * lance --lib io::exec::filtered_read: 24 passed * `cargo clippy -p lance -p lance-index --tests -- -D warnings`: clean * `cargo fmt --all --check`: clean Co-Authored-By: Claude Opus 4.7 (1M context) --- rust/lance-index/src/scalar/expression.rs | 280 +++++++++++++++++++++- rust/lance/src/dataset/scanner.rs | 8 +- rust/lance/src/io/exec.rs | 16 ++ rust/lance/src/io/exec/filtered_read.rs | 32 +-- rust/lance/src/io/exec/scalar_index.rs | 135 ++++++++++- 5 files changed, 431 insertions(+), 40 deletions(-) diff --git a/rust/lance-index/src/scalar/expression.rs b/rust/lance-index/src/scalar/expression.rs index 976426c7b5a..7abde56f5f3 100644 --- a/rust/lance-index/src/scalar/expression.rs +++ b/rust/lance-index/src/scalar/expression.rs @@ -6,8 +6,11 @@ use std::{ sync::{Arc, LazyLock}, }; -use arrow::array::BinaryBuilder; -use arrow_array::{Array, RecordBatch}; +use arrow::{ + array::{AsArray, BinaryBuilder}, + datatypes::UInt32Type, +}; +use arrow_array::{Array, RecordBatch, UInt32Array}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use async_recursion::async_recursion; use async_trait::async_trait; @@ -1306,6 +1309,15 @@ pub static INDEX_EXPR_RESULT_SCHEMA: LazyLock = LazyLock::new(|| { ])) }); +/// Same as [`INDEX_EXPR_RESULT_SCHEMA`], but with the legacy `result` and `discriminant` fields included. +pub static LEGACY_INDEX_EXPR_RESULT_SCHEMA: LazyLock = LazyLock::new(|| { + Arc::new(Schema::new(vec![ + Field::new("result".to_string(), DataType::Binary, true), + Field::new("discriminant".to_string(), DataType::UInt32, true), + Field::new("fragments_covered".to_string(), DataType::Binary, true), + ])) +}); + // `IndexExprResult` and `NullableIndexExprResult` themselves live in the // `lance-select` crate so that benchmarks and downstream consumers can // depend on the mask substrate without pulling in all of `lance-index`. @@ -1332,13 +1344,25 @@ pub fn index_expr_result_from_parts(lower: RowAddrMask, upper: RowAddrMask) -> I /// Serialize an `IndexExprResult` plus its applicable-fragments bitmap /// into the `INDEX_EXPR_RESULT_SCHEMA` record-batch layout used to hand /// scalar-index results to the read planner. +/// +/// When the result is exact (`lower == upper`) the `upper` column is encoded +/// as two nulls instead of duplicating the `lower` payload — the sentinel is +/// safe because `RowAddrMask::into_arrow` always produces exactly one +/// non-null row, so a fully-null `upper` cannot collide with any real mask. #[instrument(skip_all)] pub fn serialize_index_expr_result( result: &IndexExprResult, fragments_covered_by_result: &RoaringBitmap, ) -> Result { let lower_arr = result.lower.into_arrow()?; - let upper_arr = result.upper.into_arrow()?; + let upper_arr = if result.is_exact() { + let mut upper_builder = BinaryBuilder::new(); + upper_builder.append_null(); + upper_builder.append_null(); + upper_builder.finish() + } else { + result.upper.into_arrow()? + }; let mut fragments_covered_builder = BinaryBuilder::new(); let fragments_covered_bytes_len = fragments_covered_by_result.serialized_size(); let mut fragments_covered_bytes = Vec::with_capacity(fragments_covered_bytes_len); @@ -1356,6 +1380,120 @@ pub fn serialize_index_expr_result( )?) } +pub fn deserialize_index_expr_result( + batch: &RecordBatch, +) -> Result<(IndexExprResult, RoaringBitmap)> { + if batch.num_rows() != 2 { + return Err(Error::invalid_input_source( + format!( + "Expected a batch with exactly 2 rows but there are {} rows", + batch.num_rows() + ) + .into(), + )); + } + if batch.num_columns() != 3 { + return Err(Error::invalid_input_source( + format!( + "Expected a batch with exactly two columns but there are {} columns", + batch.num_columns() + ) + .into(), + )); + } + + let schema = batch.schema(); + let first_col_name = schema.field(0).name(); + let index_result = if first_col_name == "lower" { + // New style + let lower = RowAddrMask::from_arrow(batch.column(0).as_binary())?; + let upper_col = batch.column(1).as_binary::(); + // A fully-null upper column is the "exact" sentinel written by + // `serialize_index_expr_result` — reuse `lower` for `upper` instead + // of duplicating the payload on the wire. + let upper = if upper_col.is_null(0) && upper_col.is_null(1) { + lower.clone() + } else { + RowAddrMask::from_arrow(upper_col)? + }; + index_expr_result_from_parts(lower, upper) + } else if first_col_name == "result" { + // Legacy style + let row_addr_mask = RowAddrMask::from_arrow(batch.column(0).as_binary())?; + let match_type = batch.column(1).as_primitive::().values()[0]; + if match_type == 0 { + IndexExprResult::exact(row_addr_mask) + } else if match_type == 1 { + IndexExprResult::at_most(row_addr_mask) + } else if match_type == 2 { + IndexExprResult::at_least(row_addr_mask) + } else { + return Err(Error::internal(format!( + "Unexpected match type: {}", + match_type + ))); + } + } else { + return Err(Error::internal(format!( + "Unexpected column name: {}", + first_col_name + ))); + }; + + let applicable_fragments = batch.column(2).as_binary::(); + let applicable_fragments = RoaringBitmap::deserialize_from(applicable_fragments.value(0))?; + + Ok((index_result, applicable_fragments)) +} + +/// Serializes the `IndexExprResult` into the legacy `LEGACY_INDEX_EXPR_RESULT_SCHEMA` record-batch layout. +/// +/// This can be used for backwards compatibility purposes. +/// +/// Refined intervals (a non-empty `lower` strictly inside a non-universe +/// `upper`) cannot be represented in the legacy three-shape encoding, so we +/// degrade to `AtMost(upper)`: the upper bound is already a valid superset of +/// the answer, and `AtMost` signals to the consumer that a recheck is +/// required. A warning is logged when this lossy conversion happens. +pub fn legacy_serialize_index_expr_result( + result: &IndexExprResult, + fragments_covered_by_result: &RoaringBitmap, +) -> Result { + let (row_addr_mask, discriminant) = if result.is_exact() { + (&result.lower, 0u32) + } else if result.is_at_most() { + (&result.upper, 1) + } else if result.is_at_least() { + (&result.lower, 2) + } else { + tracing::warn!( + "Legacy serialization of refined index-expr result: degrading to AtMost(upper); \ + downstream will recheck the candidates" + ); + (&result.upper, 1) + }; + let row_addr_mask_arr = row_addr_mask.into_arrow()?; + let discriminant_arr = + Arc::new(UInt32Array::from(vec![discriminant, discriminant])) as Arc; + + let mut fragments_covered_builder = BinaryBuilder::new(); + let fragments_covered_bytes_len = fragments_covered_by_result.serialized_size(); + let mut fragments_covered_bytes = Vec::with_capacity(fragments_covered_bytes_len); + fragments_covered_by_result.serialize_into(&mut fragments_covered_bytes)?; + fragments_covered_builder.append_value(fragments_covered_bytes); + fragments_covered_builder.append_null(); + let fragments_covered_arr = Arc::new(fragments_covered_builder.finish()) as Arc; + + Ok(RecordBatch::try_new( + LEGACY_INDEX_EXPR_RESULT_SCHEMA.clone(), + vec![ + Arc::new(row_addr_mask_arr), + Arc::new(discriminant_arr), + Arc::new(fragments_covered_arr), + ], + )?) +} + impl ScalarIndexExpr { /// Evaluates the scalar index expression /// @@ -3011,4 +3149,140 @@ mod tests { ); } } + + /// `serialize_index_expr_result` / `deserialize_index_expr_result` are the + /// (non-legacy) wire format hand-off between `ScalarIndexExec` and the + /// read planner once `LANCE_RELATIONAL_ALGEBRA_VERSION` moves past 1. The + /// pair must round-trip every degenerate-interval shape (Exact, AtMost, + /// AtLeast) plus the fragments-covered bitmap. + #[test] + fn test_serialize_index_expr_result_round_trip() { + use lance_select::{RowAddrMask, RowAddrTreeMap}; + + let mut addrs = RowAddrTreeMap::new(); + addrs.insert_range(0..5); + addrs.insert_range(100..103); + + let mut fragments_covered = RoaringBitmap::new(); + fragments_covered.insert(0); + fragments_covered.insert(7); + + let cases = [ + ( + "exact", + IndexExprResult::exact(RowAddrMask::from_allowed(addrs.clone())), + ), + ( + "at_most", + IndexExprResult::at_most(RowAddrMask::from_allowed(addrs.clone())), + ), + ( + "at_least", + IndexExprResult::at_least(RowAddrMask::from_allowed(addrs)), + ), + ]; + + for (label, original) in cases { + let batch = serialize_index_expr_result(&original, &fragments_covered).unwrap(); + assert_eq!(batch.schema(), *INDEX_EXPR_RESULT_SCHEMA, "case {label}"); + assert_eq!(batch.num_rows(), 2, "case {label}"); + + let (round_tripped, round_tripped_frags) = + deserialize_index_expr_result(&batch).unwrap(); + assert_eq!(round_tripped.lower, original.lower, "case {label}: lower"); + assert_eq!(round_tripped.upper, original.upper, "case {label}: upper"); + assert_eq!( + round_tripped_frags, fragments_covered, + "case {label}: frags" + ); + assert_eq!( + round_tripped.is_exact(), + original.is_exact(), + "case {label}" + ); + assert_eq!( + round_tripped.is_at_most(), + original.is_at_most(), + "case {label}" + ); + assert_eq!( + round_tripped.is_at_least(), + original.is_at_least(), + "case {label}" + ); + } + } + + /// Exact results encode `upper` as a fully-null column on the wire — the + /// payload only needs to ship once. `RowAddrMask::into_arrow` never + /// produces a fully-null array (it always sets exactly one of the two + /// rows), so the sentinel can't collide with a real mask. This pins + /// both halves: exact ⇒ upper fully null, non-exact ⇒ upper carries the + /// real mask. + #[test] + fn test_serialize_omits_upper_when_exact() { + use lance_select::{RowAddrMask, RowAddrTreeMap}; + + let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(0u64..5)); + let fragments_covered = RoaringBitmap::from_iter([0u32]); + + // Exact: upper column must be fully null on the wire. + let exact_batch = + serialize_index_expr_result(&IndexExprResult::exact(mask.clone()), &fragments_covered) + .unwrap(); + let exact_upper = exact_batch.column(1).as_binary::(); + assert!(exact_upper.is_null(0) && exact_upper.is_null(1)); + + // Non-exact (at_most): upper column must carry the upper mask, so at + // least one row is non-null (`AllowList(mask)` puts the payload at + // row 1). + let at_most_batch = serialize_index_expr_result( + &IndexExprResult::at_most(mask.clone()), + &fragments_covered, + ) + .unwrap(); + let at_most_upper = at_most_batch.column(1).as_binary::(); + assert!(!(at_most_upper.is_null(0) && at_most_upper.is_null(1))); + + // Non-exact (at_least): upper = all_rows, which `into_arrow` + // encodes as `BlockList(empty)` — row 0 holds the empty-tree bytes, + // row 1 is null. Round-trip must preserve `is_at_least`. + let at_least_batch = + serialize_index_expr_result(&IndexExprResult::at_least(mask), &fragments_covered) + .unwrap(); + let at_least_upper = at_least_batch.column(1).as_binary::(); + assert!(!at_least_upper.is_null(0)); + let (round_tripped, _) = deserialize_index_expr_result(&at_least_batch).unwrap(); + assert!(round_tripped.is_at_least()); + assert!(!round_tripped.is_exact()); + } + + /// A refined `IndexExprResult` (`lower` strictly inside a non-universe + /// `upper`) has no legacy three-shape encoding. The legacy serializer + /// must not error in that case — it must degrade to `AtMost(upper)` so + /// older read planners still see a valid superset and recheck. + #[test] + fn test_legacy_serialize_refined_degrades_to_at_most() { + use lance_select::{RowAddrMask, RowAddrTreeMap}; + + let lower_addrs = RowAddrTreeMap::from_iter(0u64..3); + let upper_addrs = RowAddrTreeMap::from_iter(0u64..10); + let refined = IndexExprResult { + lower: RowAddrMask::from_allowed(lower_addrs), + upper: RowAddrMask::from_allowed(upper_addrs.clone()), + }; + assert!(!refined.is_exact() && !refined.is_at_most() && !refined.is_at_least()); + + let fragments_covered = RoaringBitmap::from_iter([0u32, 1]); + + let batch = legacy_serialize_index_expr_result(&refined, &fragments_covered).unwrap(); + assert_eq!(batch.schema(), *LEGACY_INDEX_EXPR_RESULT_SCHEMA); + + // Discriminant 1 == AtMost; the round-tripped result carries the + // original `upper` as the AtMost mask (empty lower, upper = upper). + let (round_tripped, round_tripped_frags) = deserialize_index_expr_result(&batch).unwrap(); + assert!(round_tripped.is_at_most()); + assert_eq!(round_tripped.upper, RowAddrMask::from_allowed(upper_addrs)); + assert_eq!(round_tripped_frags, fragments_covered); + } } diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 53eb569ac02..1ee7f42eaaf 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -66,9 +66,7 @@ use lance_file::reader::FileReaderOptions; use lance_index::IndexCriteria; use lance_index::scalar::FullTextSearchQuery; use lance_index::scalar::expression::ScalarIndexExpr; -use lance_index::scalar::expression::{ - INDEX_EXPR_RESULT_SCHEMA, IndexExprResult, PlannerIndexExt, serialize_index_expr_result, -}; +use lance_index::scalar::expression::{IndexExprResult, PlannerIndexExt}; use lance_index::scalar::inverted::query::{ FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery, PhraseQuery, fill_fts_query_column, }; @@ -97,7 +95,9 @@ use crate::io::exec::fts::{ BoostQueryExec, FlatMatchFilterExec, FlatMatchQueryExec, MatchQueryExec, PhraseQueryExec, }; use crate::io::exec::knn::MultivectorScoringExec; -use crate::io::exec::scalar_index::{MaterializeIndexExec, ScalarIndexExec}; +use crate::io::exec::scalar_index::{ + INDEX_EXPR_RESULT_SCHEMA, MaterializeIndexExec, ScalarIndexExec, serialize_index_expr_result, +}; use crate::io::exec::{ AddRowAddrExec, FilterPlan as ExprFilterPlan, KNNVectorDistanceExec, LancePushdownScanExec, LanceScanExec, Planner, PreFilterSource, ScanConfig, TakeExec, diff --git a/rust/lance/src/io/exec.rs b/rust/lance/src/io/exec.rs index 0b93e3c2834..817ee97aa7d 100644 --- a/rust/lance/src/io/exec.rs +++ b/rust/lance/src/io/exec.rs @@ -37,3 +37,19 @@ pub use rowids::{AddRowAddrExec, AddRowOffsetExec}; pub use scan::{LanceScanConfig, LanceScanExec}; pub use take::TakeExec; pub use utils::PreFilterSource; + +/// Declares which vesrion of the relational algebra we are producing +/// +/// In order to enable plan pushdown (executing parts of the physical plan on different nodes) +/// we must treat the physical plan serialization and the inputs and outputs of phsyical plan +/// nodes as part of the public API surface. +/// +/// We should attempt to handle as many versions as possible on read paths. This variable +/// controls which versions we support producing. +/// +/// This includes changes to the proto serialization of physical plan nodes themselves or +/// changes to the format of the inputs or outputs of the nodes. +/// +/// This does not include changes to the behavior of the nodes unless that behavior is a change +/// in semantic meaning. +pub const LANCE_RELATIONAL_ALGEBRA_VERSION: u32 = 1; diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index 44eb594e84c..75376967984 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -7,7 +7,6 @@ use std::sync::Mutex; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::{ops::Range, sync::Arc}; -use arrow::array::AsArray; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; use datafusion::common::runtime::SpawnedTask; @@ -39,11 +38,9 @@ use lance_datafusion::utils::{ ROWS_SCANNED_METRIC, TASK_WAIT_TIME_METRIC, }; use lance_file::reader::FileReaderOptions; -use lance_index::scalar::expression::{FilterPlan, IndexExprResult, index_expr_result_from_parts}; +use lance_index::scalar::expression::{FilterPlan, IndexExprResult, deserialize_index_expr_result}; use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; -use lance_select::{ - RowAddrMask, RowAddrSelection, RowAddrTreeMap, bitmap_to_ranges, ranges_to_bitmap, -}; +use lance_select::{RowAddrSelection, RowAddrTreeMap, bitmap_to_ranges, ranges_to_bitmap}; use lance_table::format::Fragment; use lance_table::rowids::RowIdSequence; use lance_table::utils::stream::ReadBatchFut; @@ -80,30 +77,7 @@ impl EvaluatedIndex { } pub fn try_from_arrow(batch: &RecordBatch) -> Result { - if batch.num_rows() != 2 { - return Err(Error::invalid_input_source( - format!( - "Expected a batch with exactly 2 rows but there are {} rows", - batch.num_rows() - ) - .into(), - )); - } - if batch.num_columns() != 3 { - return Err(Error::invalid_input_source( - format!( - "Expected a batch with exactly two columns but there are {} columns", - batch.num_columns() - ) - .into(), - )); - } - let lower = RowAddrMask::from_arrow(batch.column(0).as_binary())?; - let upper = RowAddrMask::from_arrow(batch.column(1).as_binary())?; - let index_result = index_expr_result_from_parts(lower, upper); - - let applicable_fragments = batch.column(2).as_binary::(); - let applicable_fragments = RoaringBitmap::deserialize_from(applicable_fragments.value(0))?; + let (index_result, applicable_fragments) = deserialize_index_expr_result(batch)?; Ok(Self { index_result, diff --git a/rust/lance/src/io/exec/scalar_index.rs b/rust/lance/src/io/exec/scalar_index.rs index 84053b8add4..727728739e8 100644 --- a/rust/lance/src/io/exec/scalar_index.rs +++ b/rust/lance/src/io/exec/scalar_index.rs @@ -13,6 +13,7 @@ use crate::{ prefilter::DatasetPreFilter, scalar_logical::{open_named_scalar_index, scalar_index_fragment_bitmap}, }, + io::exec::LANCE_RELATIONAL_ALGEBRA_VERSION, }; use arrow_array::{Array, RecordBatch, UInt64Array}; use arrow_schema::{Schema, SchemaRef}; @@ -41,8 +42,8 @@ use lance_index::{ scalar::{ SargableQuery, ScalarIndex, expression::{ - INDEX_EXPR_RESULT_SCHEMA, IndexExprResult, ScalarIndexExpr, ScalarIndexLoader, - ScalarIndexSearch, serialize_index_expr_result, + IndexExprResult, LEGACY_INDEX_EXPR_RESULT_SCHEMA, ScalarIndexExpr, ScalarIndexLoader, + ScalarIndexSearch, }, }, }; @@ -51,6 +52,36 @@ use lance_table::format::Fragment; use roaring::RoaringBitmap; use tracing::{debug_span, instrument}; +pub fn serialize_index_expr_result( + result: &IndexExprResult, + fragments_covered_by_result: &RoaringBitmap, +) -> Result { + if LANCE_RELATIONAL_ALGEBRA_VERSION > 1 { + lance_index::scalar::expression::serialize_index_expr_result( + result, + fragments_covered_by_result, + ) + } else { + lance_index::scalar::expression::legacy_serialize_index_expr_result( + result, + fragments_covered_by_result, + ) + } +} + +/// Schema of the record batch emitted by [`serialize_index_expr_result`] under +/// the currently-active [`LANCE_RELATIONAL_ALGEBRA_VERSION`]. Use this anywhere +/// the schema must match what `ScalarIndexExec` (or any other producer that +/// calls the wrapper) actually emits — otherwise the plan's advertised schema +/// will drift from the wire format. +pub static INDEX_EXPR_RESULT_SCHEMA: LazyLock = LazyLock::new(|| { + if LANCE_RELATIONAL_ALGEBRA_VERSION > 1 { + lance_index::scalar::expression::INDEX_EXPR_RESULT_SCHEMA.clone() + } else { + LEGACY_INDEX_EXPR_RESULT_SCHEMA.clone() + } +}); + #[async_trait] impl ScalarIndexLoader for Dataset { async fn load_index( @@ -749,7 +780,10 @@ mod tests { use std::{ops::Bound, sync::Arc}; use crate::index::DatasetIndexExt; - use arrow::datatypes::UInt64Type; + use arrow::{ + array::AsArray, + datatypes::{UInt32Type, UInt64Type}, + }; use datafusion::{ execution::TaskContext, physical_plan::ExecutionPlan, prelude::SessionConfig, scalar::ScalarValue, @@ -761,7 +795,10 @@ mod tests { IndexType, scalar::{ SargableQuery, ScalarIndexParams, - expression::{ScalarIndexExpr, ScalarIndexSearch}, + expression::{ + LEGACY_INDEX_EXPR_RESULT_SCHEMA, ScalarIndexExpr, ScalarIndexSearch, + deserialize_index_expr_result, + }, }, }; @@ -847,6 +884,96 @@ mod tests { assert_eq!(batches[0].num_rows(), 5); } + /// `ScalarIndexExec::schema()` (and the stream it emits) must advertise + /// the same schema the batch actually carries — otherwise downstream + /// consumers that trust `ExecutionPlan::schema()` will see a different + /// shape than they receive. While `LANCE_RELATIONAL_ALGEBRA_VERSION == + /// 1`, both must be the legacy layout. This test also exercises + /// `partition_statistics` and the stream's `RecordBatchStream::schema` + /// to catch drift in either advertisement path. + #[tokio::test] + async fn test_scalar_index_exec_advertises_legacy_schema() { + let TestFixture { + dataset, + _tmp_dir_guard, + } = test_fixture().await; + + let query = ScalarIndexExpr::Query(ScalarIndexSearch { + column: "ordered".to_string(), + index_name: "ordered_idx".to_string(), + index_type: "BTree".to_string(), + query: Arc::new(SargableQuery::Range( + Bound::Unbounded, + Bound::Excluded(ScalarValue::UInt64(Some(47))), + )), + needs_recheck: false, + }); + + let plan = ScalarIndexExec::new(dataset, query); + + let legacy_schema = LEGACY_INDEX_EXPR_RESULT_SCHEMA.clone(); + assert_eq!(plan.schema(), legacy_schema); + assert_eq!( + plan.partition_statistics(None) + .unwrap() + .column_statistics + .len(), + legacy_schema.fields().len(), + ); + + let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap(); + assert_eq!(stream.schema(), legacy_schema); + let batches = stream.try_collect::>().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].schema(), legacy_schema); + } + + /// The wire format that `ScalarIndexExec` hands to the read planner is + /// part of the relational-algebra public surface (see + /// `LANCE_RELATIONAL_ALGEBRA_VERSION`). While version is `1` the layout + /// must remain the pre-`{lower, upper}`-refactor `{result, discriminant, + /// fragments_covered}` shape so older read planners can still consume + /// the batch. This test pins that behavior end-to-end through + /// `ScalarIndexExec::execute`. + #[tokio::test] + async fn test_scalar_index_exec_returns_legacy_format() { + let TestFixture { + dataset, + _tmp_dir_guard, + } = test_fixture().await; + + let query = ScalarIndexExpr::Query(ScalarIndexSearch { + column: "ordered".to_string(), + index_name: "ordered_idx".to_string(), + index_type: "BTree".to_string(), + query: Arc::new(SargableQuery::Range( + Bound::Unbounded, + Bound::Excluded(ScalarValue::UInt64(Some(47))), + )), + needs_recheck: false, + }); + + let plan = ScalarIndexExec::new(dataset, query); + let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap(); + let batches = stream.try_collect::>().await.unwrap(); + + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + assert_eq!(batch.schema(), *LEGACY_INDEX_EXPR_RESULT_SCHEMA); + assert_eq!(batch.num_rows(), 2); + + // Discriminant 0 == Exact; a `<` query over a BTree index is exact. + let discriminant = batch.column(1).as_primitive::(); + assert_eq!(discriminant.value(0), 0); + assert_eq!(discriminant.value(1), 0); + + // The batch should still round-trip back to an `IndexExprResult` covering + // 47 rows (one per matching value in `ordered < 47`). + let (result, _frags) = deserialize_index_expr_result(batch).unwrap(); + assert!(result.is_exact()); + assert_eq!(result.upper.max_len().unwrap(), 47); + } + #[test] fn no_context_scalar_index() { // These tests ensure we can create nodes and call execute without a tokio Runtime From aff3accc8e4c3cbba0873a265daeb7be142320b5 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 29 May 2026 14:57:17 +0000 Subject: [PATCH 03/10] various cleanups --- Cargo.lock | 2 + rust/lance-index/src/scalar/expression.rs | 369 ++++++---------------- rust/lance-select/Cargo.toml | 2 + rust/lance-select/src/result.rs | 180 +++++++++++ rust/lance/src/dataset/scanner.rs | 43 ++- rust/lance/src/io/exec/filtered_read.rs | 12 +- rust/lance/src/io/exec/scalar_index.rs | 168 ++++------ 7 files changed, 374 insertions(+), 402 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9df2444cde1..eb8d84ca905 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4984,6 +4984,7 @@ version = "7.2.0-beta.1" dependencies = [ "arrow-array", "arrow-buffer", + "arrow-schema", "byteorder", "bytes", "criterion", @@ -4993,6 +4994,7 @@ dependencies = [ "proptest", "roaring", "rstest", + "tracing", ] [[package]] diff --git a/rust/lance-index/src/scalar/expression.rs b/rust/lance-index/src/scalar/expression.rs index 7abde56f5f3..e1d971d6855 100644 --- a/rust/lance-index/src/scalar/expression.rs +++ b/rust/lance-index/src/scalar/expression.rs @@ -1,17 +1,9 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::{ - ops::Bound, - sync::{Arc, LazyLock}, -}; +use std::{ops::Bound, sync::Arc}; -use arrow::{ - array::{AsArray, BinaryBuilder}, - datatypes::UInt32Type, -}; -use arrow_array::{Array, RecordBatch, UInt32Array}; -use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use arrow_schema::{DataType, Field}; use async_recursion::async_recursion; use async_trait::async_trait; use datafusion_common::ScalarValue; @@ -29,8 +21,7 @@ use super::{ use super::{GeoQuery, RelationQuery}; use lance_core::{Error, Result}; use lance_datafusion::{expr::safe_coerce_scalar, planner::Planner}; -use lance_select::{NullableRowAddrMask, RowAddrMask}; -use roaring::RoaringBitmap; +use lance_select::NullableRowAddrMask; use tracing::instrument; const MAX_DEPTH: usize = 500; @@ -1296,33 +1287,10 @@ impl std::fmt::Display for ScalarIndexExpr { } } -/// When we evaluate a scalar index query we return a batch with three columns and two rows. -/// -/// The first two columns carry the result's `lower` and `upper` row-address -/// mask bounds (the interval form of [`IndexExprResult`]). The third column -/// has the fragments-covered bitmap in the first row and null in the second. -pub static INDEX_EXPR_RESULT_SCHEMA: LazyLock = LazyLock::new(|| { - Arc::new(Schema::new(vec![ - Field::new("lower", DataType::Binary, true), - Field::new("upper", DataType::Binary, true), - Field::new("fragments_covered", DataType::Binary, true), - ])) -}); - -/// Same as [`INDEX_EXPR_RESULT_SCHEMA`], but with the legacy `result` and `discriminant` fields included. -pub static LEGACY_INDEX_EXPR_RESULT_SCHEMA: LazyLock = LazyLock::new(|| { - Arc::new(Schema::new(vec![ - Field::new("result".to_string(), DataType::Binary, true), - Field::new("discriminant".to_string(), DataType::UInt32, true), - Field::new("fragments_covered".to_string(), DataType::Binary, true), - ])) -}); - -// `IndexExprResult` and `NullableIndexExprResult` themselves live in the -// `lance-select` crate so that benchmarks and downstream consumers can -// depend on the mask substrate without pulling in all of `lance-index`. -// The wire-format helpers below stay here, where the `INDEX_EXPR_RESULT_SCHEMA` -// constant they reference is defined. +// `IndexExprResult` and `NullableIndexExprResult` (along with their schemas and +// wire-format methods) live in `lance-select` so that benchmarks and downstream +// consumers can depend on the mask substrate without pulling in all of +// `lance-index`. pub use lance_select::{IndexExprResult, NullableIndexExprResult}; impl From for NullableIndexExprResult { @@ -1335,165 +1303,6 @@ impl From for NullableIndexExprResult { } } -/// Build an `IndexExprResult` from the `(lower, upper)` row-mask pair -/// produced by [`serialize_index_expr_result`]. -pub fn index_expr_result_from_parts(lower: RowAddrMask, upper: RowAddrMask) -> IndexExprResult { - IndexExprResult { lower, upper } -} - -/// Serialize an `IndexExprResult` plus its applicable-fragments bitmap -/// into the `INDEX_EXPR_RESULT_SCHEMA` record-batch layout used to hand -/// scalar-index results to the read planner. -/// -/// When the result is exact (`lower == upper`) the `upper` column is encoded -/// as two nulls instead of duplicating the `lower` payload — the sentinel is -/// safe because `RowAddrMask::into_arrow` always produces exactly one -/// non-null row, so a fully-null `upper` cannot collide with any real mask. -#[instrument(skip_all)] -pub fn serialize_index_expr_result( - result: &IndexExprResult, - fragments_covered_by_result: &RoaringBitmap, -) -> Result { - let lower_arr = result.lower.into_arrow()?; - let upper_arr = if result.is_exact() { - let mut upper_builder = BinaryBuilder::new(); - upper_builder.append_null(); - upper_builder.append_null(); - upper_builder.finish() - } else { - result.upper.into_arrow()? - }; - let mut fragments_covered_builder = BinaryBuilder::new(); - let fragments_covered_bytes_len = fragments_covered_by_result.serialized_size(); - let mut fragments_covered_bytes = Vec::with_capacity(fragments_covered_bytes_len); - fragments_covered_by_result.serialize_into(&mut fragments_covered_bytes)?; - fragments_covered_builder.append_value(fragments_covered_bytes); - fragments_covered_builder.append_null(); - let fragments_covered_arr = Arc::new(fragments_covered_builder.finish()) as Arc; - Ok(RecordBatch::try_new( - INDEX_EXPR_RESULT_SCHEMA.clone(), - vec![ - Arc::new(lower_arr), - Arc::new(upper_arr), - Arc::new(fragments_covered_arr), - ], - )?) -} - -pub fn deserialize_index_expr_result( - batch: &RecordBatch, -) -> Result<(IndexExprResult, RoaringBitmap)> { - if batch.num_rows() != 2 { - return Err(Error::invalid_input_source( - format!( - "Expected a batch with exactly 2 rows but there are {} rows", - batch.num_rows() - ) - .into(), - )); - } - if batch.num_columns() != 3 { - return Err(Error::invalid_input_source( - format!( - "Expected a batch with exactly two columns but there are {} columns", - batch.num_columns() - ) - .into(), - )); - } - - let schema = batch.schema(); - let first_col_name = schema.field(0).name(); - let index_result = if first_col_name == "lower" { - // New style - let lower = RowAddrMask::from_arrow(batch.column(0).as_binary())?; - let upper_col = batch.column(1).as_binary::(); - // A fully-null upper column is the "exact" sentinel written by - // `serialize_index_expr_result` — reuse `lower` for `upper` instead - // of duplicating the payload on the wire. - let upper = if upper_col.is_null(0) && upper_col.is_null(1) { - lower.clone() - } else { - RowAddrMask::from_arrow(upper_col)? - }; - index_expr_result_from_parts(lower, upper) - } else if first_col_name == "result" { - // Legacy style - let row_addr_mask = RowAddrMask::from_arrow(batch.column(0).as_binary())?; - let match_type = batch.column(1).as_primitive::().values()[0]; - if match_type == 0 { - IndexExprResult::exact(row_addr_mask) - } else if match_type == 1 { - IndexExprResult::at_most(row_addr_mask) - } else if match_type == 2 { - IndexExprResult::at_least(row_addr_mask) - } else { - return Err(Error::internal(format!( - "Unexpected match type: {}", - match_type - ))); - } - } else { - return Err(Error::internal(format!( - "Unexpected column name: {}", - first_col_name - ))); - }; - - let applicable_fragments = batch.column(2).as_binary::(); - let applicable_fragments = RoaringBitmap::deserialize_from(applicable_fragments.value(0))?; - - Ok((index_result, applicable_fragments)) -} - -/// Serializes the `IndexExprResult` into the legacy `LEGACY_INDEX_EXPR_RESULT_SCHEMA` record-batch layout. -/// -/// This can be used for backwards compatibility purposes. -/// -/// Refined intervals (a non-empty `lower` strictly inside a non-universe -/// `upper`) cannot be represented in the legacy three-shape encoding, so we -/// degrade to `AtMost(upper)`: the upper bound is already a valid superset of -/// the answer, and `AtMost` signals to the consumer that a recheck is -/// required. A warning is logged when this lossy conversion happens. -pub fn legacy_serialize_index_expr_result( - result: &IndexExprResult, - fragments_covered_by_result: &RoaringBitmap, -) -> Result { - let (row_addr_mask, discriminant) = if result.is_exact() { - (&result.lower, 0u32) - } else if result.is_at_most() { - (&result.upper, 1) - } else if result.is_at_least() { - (&result.lower, 2) - } else { - tracing::warn!( - "Legacy serialization of refined index-expr result: degrading to AtMost(upper); \ - downstream will recheck the candidates" - ); - (&result.upper, 1) - }; - let row_addr_mask_arr = row_addr_mask.into_arrow()?; - let discriminant_arr = - Arc::new(UInt32Array::from(vec![discriminant, discriminant])) as Arc; - - let mut fragments_covered_builder = BinaryBuilder::new(); - let fragments_covered_bytes_len = fragments_covered_by_result.serialized_size(); - let mut fragments_covered_bytes = Vec::with_capacity(fragments_covered_bytes_len); - fragments_covered_by_result.serialize_into(&mut fragments_covered_bytes)?; - fragments_covered_builder.append_value(fragments_covered_bytes); - fragments_covered_builder.append_null(); - let fragments_covered_arr = Arc::new(fragments_covered_builder.finish()) as Arc; - - Ok(RecordBatch::try_new( - LEGACY_INDEX_EXPR_RESULT_SCHEMA.clone(), - vec![ - Arc::new(row_addr_mask_arr), - Arc::new(discriminant_arr), - Arc::new(fragments_covered_arr), - ], - )?) -} - impl ScalarIndexExpr { /// Evaluates the scalar index expression /// @@ -2148,11 +1957,14 @@ impl PlannerIndexExt for Planner { mod tests { use std::collections::HashMap; + use arrow_array::Array; use arrow_schema::{Field, Schema}; use chrono::Utc; use datafusion_common::{Column, DFSchema}; use datafusion_expr::simplify::SimplifyContext; use lance_datafusion::exec::{LanceExecutionOptions, get_session_context}; + use lance_select::result::IndexExprResultFormat; + use roaring::RoaringBitmap; use crate::scalar::json::{JsonQuery, JsonQueryParser}; @@ -3150,66 +2962,76 @@ mod tests { } } - /// `serialize_index_expr_result` / `deserialize_index_expr_result` are the - /// (non-legacy) wire format hand-off between `ScalarIndexExec` and the - /// read planner once `LANCE_RELATIONAL_ALGEBRA_VERSION` moves past 1. The - /// pair must round-trip every degenerate-interval shape (Exact, AtMost, - /// AtLeast) plus the fragments-covered bitmap. #[test] fn test_serialize_index_expr_result_round_trip() { use lance_select::{RowAddrMask, RowAddrTreeMap}; - let mut addrs = RowAddrTreeMap::new(); - addrs.insert_range(0..5); - addrs.insert_range(100..103); - - let mut fragments_covered = RoaringBitmap::new(); - fragments_covered.insert(0); - fragments_covered.insert(7); - - let cases = [ - ( - "exact", - IndexExprResult::exact(RowAddrMask::from_allowed(addrs.clone())), - ), - ( - "at_most", - IndexExprResult::at_most(RowAddrMask::from_allowed(addrs.clone())), - ), - ( - "at_least", - IndexExprResult::at_least(RowAddrMask::from_allowed(addrs)), - ), - ]; - - for (label, original) in cases { - let batch = serialize_index_expr_result(&original, &fragments_covered).unwrap(); - assert_eq!(batch.schema(), *INDEX_EXPR_RESULT_SCHEMA, "case {label}"); - assert_eq!(batch.num_rows(), 2, "case {label}"); - - let (round_tripped, round_tripped_frags) = - deserialize_index_expr_result(&batch).unwrap(); - assert_eq!(round_tripped.lower, original.lower, "case {label}: lower"); - assert_eq!(round_tripped.upper, original.upper, "case {label}: upper"); - assert_eq!( - round_tripped_frags, fragments_covered, - "case {label}: frags" - ); - assert_eq!( - round_tripped.is_exact(), - original.is_exact(), - "case {label}" - ); - assert_eq!( - round_tripped.is_at_most(), - original.is_at_most(), - "case {label}" - ); - assert_eq!( - round_tripped.is_at_least(), - original.is_at_least(), - "case {label}" - ); + for format in [ + IndexExprResultFormat::TwoMask, + IndexExprResultFormat::ThreeVariant, + ] { + let mut addrs = RowAddrTreeMap::new(); + addrs.insert_range(0..5); + addrs.insert_range(100..103); + + let mut fragments_covered = RoaringBitmap::new(); + fragments_covered.insert(0); + fragments_covered.insert(7); + + let cases = [ + ( + "exact", + IndexExprResult::exact(RowAddrMask::from_allowed(addrs.clone())), + ), + ( + "at_most", + IndexExprResult::at_most(RowAddrMask::from_allowed(addrs.clone())), + ), + ( + "at_least", + IndexExprResult::at_least(RowAddrMask::from_allowed(addrs)), + ), + ]; + + for (label, original) in cases { + let batch = original.serialize(&fragments_covered, format).unwrap(); + assert_eq!( + batch.schema(), + *format.schema(), + "format {format:?}, case {label}" + ); + assert_eq!(batch.num_rows(), 2, "format {format:?}, case {label}"); + + let (round_tripped, round_tripped_frags) = + IndexExprResult::deserialize(&batch).unwrap(); + assert_eq!( + round_tripped.lower, original.lower, + "format {format:?}, case {label}: lower" + ); + assert_eq!( + round_tripped.upper, original.upper, + "format {format:?}, case {label}: upper" + ); + assert_eq!( + round_tripped_frags, fragments_covered, + "format {format:?}, case {label}: frags" + ); + assert_eq!( + round_tripped.is_exact(), + original.is_exact(), + "format {format:?}, case {label}" + ); + assert_eq!( + round_tripped.is_at_most(), + original.is_at_most(), + "format {format:?}, case {label}" + ); + assert_eq!( + round_tripped.is_at_least(), + original.is_at_least(), + "format {format:?}, case {label}" + ); + } } } @@ -3226,43 +3048,43 @@ mod tests { let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(0u64..5)); let fragments_covered = RoaringBitmap::from_iter([0u32]); + use arrow::array::AsArray; + // Exact: upper column must be fully null on the wire. - let exact_batch = - serialize_index_expr_result(&IndexExprResult::exact(mask.clone()), &fragments_covered) - .unwrap(); + let exact_batch = IndexExprResult::exact(mask.clone()) + .serialize(&fragments_covered, IndexExprResultFormat::TwoMask) + .unwrap(); let exact_upper = exact_batch.column(1).as_binary::(); assert!(exact_upper.is_null(0) && exact_upper.is_null(1)); // Non-exact (at_most): upper column must carry the upper mask, so at // least one row is non-null (`AllowList(mask)` puts the payload at // row 1). - let at_most_batch = serialize_index_expr_result( - &IndexExprResult::at_most(mask.clone()), - &fragments_covered, - ) - .unwrap(); + let at_most_batch = IndexExprResult::at_most(mask.clone()) + .serialize(&fragments_covered, IndexExprResultFormat::TwoMask) + .unwrap(); let at_most_upper = at_most_batch.column(1).as_binary::(); assert!(!(at_most_upper.is_null(0) && at_most_upper.is_null(1))); // Non-exact (at_least): upper = all_rows, which `into_arrow` // encodes as `BlockList(empty)` — row 0 holds the empty-tree bytes, // row 1 is null. Round-trip must preserve `is_at_least`. - let at_least_batch = - serialize_index_expr_result(&IndexExprResult::at_least(mask), &fragments_covered) - .unwrap(); + let at_least_batch = IndexExprResult::at_least(mask) + .serialize(&fragments_covered, IndexExprResultFormat::TwoMask) + .unwrap(); let at_least_upper = at_least_batch.column(1).as_binary::(); assert!(!at_least_upper.is_null(0)); - let (round_tripped, _) = deserialize_index_expr_result(&at_least_batch).unwrap(); + let (round_tripped, _) = IndexExprResult::deserialize(&at_least_batch).unwrap(); assert!(round_tripped.is_at_least()); assert!(!round_tripped.is_exact()); } /// A refined `IndexExprResult` (`lower` strictly inside a non-universe - /// `upper`) has no legacy three-shape encoding. The legacy serializer + /// `upper`) has no legacy three-shape encoding. The serializer /// must not error in that case — it must degrade to `AtMost(upper)` so /// older read planners still see a valid superset and recheck. #[test] - fn test_legacy_serialize_refined_degrades_to_at_most() { + fn test_three_variant_serialize_refined_degrades_to_at_most() { use lance_select::{RowAddrMask, RowAddrTreeMap}; let lower_addrs = RowAddrTreeMap::from_iter(0u64..3); @@ -3275,12 +3097,17 @@ mod tests { let fragments_covered = RoaringBitmap::from_iter([0u32, 1]); - let batch = legacy_serialize_index_expr_result(&refined, &fragments_covered).unwrap(); - assert_eq!(batch.schema(), *LEGACY_INDEX_EXPR_RESULT_SCHEMA); + let batch = refined + .serialize(&fragments_covered, IndexExprResultFormat::ThreeVariant) + .unwrap(); + assert_eq!( + batch.schema(), + *IndexExprResultFormat::ThreeVariant.schema() + ); // Discriminant 1 == AtMost; the round-tripped result carries the // original `upper` as the AtMost mask (empty lower, upper = upper). - let (round_tripped, round_tripped_frags) = deserialize_index_expr_result(&batch).unwrap(); + let (round_tripped, round_tripped_frags) = IndexExprResult::deserialize(&batch).unwrap(); assert!(round_tripped.is_at_most()); assert_eq!(round_tripped.upper, RowAddrMask::from_allowed(upper_addrs)); assert_eq!(round_tripped_frags, fragments_covered); diff --git a/rust/lance-select/Cargo.toml b/rust/lance-select/Cargo.toml index c7deb6f8894..4d72b55b5e2 100644 --- a/rust/lance-select/Cargo.toml +++ b/rust/lance-select/Cargo.toml @@ -14,7 +14,9 @@ description = "Row-selection masks and index-result algebra for Lance" [dependencies] arrow-array = { workspace = true } arrow-buffer = { workspace = true } +arrow-schema = { workspace = true } byteorder = { workspace = true } +tracing = { workspace = true } bytes = { workspace = true } deepsize = { workspace = true } itertools = { workspace = true } diff --git a/rust/lance-select/src/result.rs b/rust/lance-select/src/result.rs index 8c38b75f102..340c315f243 100644 --- a/rust/lance-select/src/result.rs +++ b/rust/lance-select/src/result.rs @@ -42,6 +42,14 @@ //! the per-endpoint algebra already implements two-valued and SQL //! three-valued logic correctly inside each mask type. +use std::sync::{Arc, LazyLock}; + +use arrow_array::{Array, RecordBatch, UInt32Array, builder::BinaryBuilder}; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use roaring::RoaringBitmap; + +use lance_core::{Error, Result}; + use crate::mask::{NullableRowAddrMask, RowAddrMask, RowSetOps}; /// Result of an index search before NULL rows are dropped. Each endpoint @@ -264,3 +272,175 @@ impl std::ops::BitOr for IndexExprResult { } } } + +static TWO_MASK_RESULT_SCHEMA: LazyLock = LazyLock::new(|| { + Arc::new(Schema::new(vec![ + Field::new("lower", DataType::Binary, true), + Field::new("upper", DataType::Binary, true), + Field::new("fragments_covered", DataType::Binary, true), + ])) +}); + +static THREE_VARIANT_RESULT_SCHEMA: LazyLock = LazyLock::new(|| { + Arc::new(Schema::new(vec![ + Field::new("result".to_string(), DataType::Binary, true), + Field::new("discriminant".to_string(), DataType::UInt32, true), + Field::new("fragments_covered".to_string(), DataType::Binary, true), + ])) +}); + +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] +pub enum IndexExprResultFormat { + ThreeVariant, // A legacy format that used AtMost/AtLeast/Exact variants + #[default] + TwoMask, // The two-mask format with upper and lower +} + +impl IndexExprResultFormat { + pub fn schema(&self) -> &SchemaRef { + match self { + Self::ThreeVariant => &THREE_VARIANT_RESULT_SCHEMA, + Self::TwoMask => &TWO_MASK_RESULT_SCHEMA, + } + } +} + +impl IndexExprResult { + /// Serialize into the `INDEX_EXPR_RESULT_SCHEMA` record-batch layout used to + /// hand scalar-index results to the read planner. + #[tracing::instrument(skip_all)] + fn serialize_standard(&self, fragments_covered: &RoaringBitmap) -> Result { + let lower_arr = self.lower.into_arrow()?; + let upper_arr = if self.is_exact() { + let mut b = BinaryBuilder::new(); + b.append_null(); + b.append_null(); + b.finish() + } else { + self.upper.into_arrow()? + }; + let mut frags_builder = BinaryBuilder::new(); + let mut frags_bytes = Vec::with_capacity(fragments_covered.serialized_size()); + fragments_covered.serialize_into(&mut frags_bytes)?; + frags_builder.append_value(frags_bytes); + frags_builder.append_null(); + Ok(RecordBatch::try_new( + TWO_MASK_RESULT_SCHEMA.clone(), + vec![ + Arc::new(lower_arr), + Arc::new(upper_arr), + Arc::new(frags_builder.finish()) as Arc, + ], + )?) + } + + /// Serialize into the legacy three-variant record-batch layout. + /// + /// Refined intervals (a non-empty `lower` strictly inside a non-universe `upper`) + /// cannot be represented in the legacy encoding and are degraded to `AtMost(upper)`. + fn serialize_three_variant(&self, fragments_covered: &RoaringBitmap) -> Result { + let (mask, discriminant) = if self.is_exact() { + (&self.lower, 0u32) + } else if self.is_at_most() { + (&self.upper, 1) + } else if self.is_at_least() { + (&self.lower, 2) + } else { + tracing::warn!( + "Legacy serialization of refined index-expr result: degrading to AtMost(upper); \ + answer will remain correct but query will be more expensive" + ); + (&self.upper, 1) + }; + let mask_arr = mask.into_arrow()?; + let discriminant_arr = + Arc::new(UInt32Array::from(vec![discriminant, discriminant])) as Arc; + let mut frags_builder = BinaryBuilder::new(); + let mut frags_bytes = Vec::with_capacity(fragments_covered.serialized_size()); + fragments_covered.serialize_into(&mut frags_bytes)?; + frags_builder.append_value(frags_bytes); + frags_builder.append_null(); + Ok(RecordBatch::try_new( + THREE_VARIANT_RESULT_SCHEMA.clone(), + vec![ + Arc::new(mask_arr), + discriminant_arr, + Arc::new(frags_builder.finish()) as Arc, + ], + )?) + } + + pub fn serialize( + &self, + fragments_covered: &RoaringBitmap, + format: IndexExprResultFormat, + ) -> Result { + match format { + IndexExprResultFormat::ThreeVariant => self.serialize_three_variant(fragments_covered), + IndexExprResultFormat::TwoMask => self.serialize_standard(fragments_covered), + } + } + + /// Deserialize from a record batch produced by [`Self::serialize`] or + /// [`Self::serialize_legacy`]. + pub fn deserialize(batch: &RecordBatch) -> Result<(Self, RoaringBitmap)> { + use arrow_array::cast::AsArray; + + if batch.num_rows() != 2 { + return Err(Error::invalid_input_source( + format!( + "Expected a batch with exactly 2 rows but there are {} rows", + batch.num_rows() + ) + .into(), + )); + } + if batch.num_columns() != 3 { + return Err(Error::invalid_input_source( + format!( + "Expected a batch with exactly three columns but there are {} columns", + batch.num_columns() + ) + .into(), + )); + } + + let first_col_name = batch.schema().field(0).name().clone(); + let index_result = if first_col_name == "lower" { + let lower = RowAddrMask::from_arrow(batch.column(0).as_binary())?; + let upper_col = batch.column(1).as_binary::(); + let upper = if upper_col.is_null(0) && upper_col.is_null(1) { + lower.clone() + } else { + RowAddrMask::from_arrow(upper_col)? + }; + Self { lower, upper } + } else if first_col_name == "result" { + let row_addr_mask = RowAddrMask::from_arrow(batch.column(0).as_binary())?; + let match_type = batch + .column(1) + .as_primitive::() + .values()[0]; + if match_type == 0 { + Self::exact(row_addr_mask) + } else if match_type == 1 { + Self::at_most(row_addr_mask) + } else if match_type == 2 { + Self::at_least(row_addr_mask) + } else { + return Err(Error::internal(format!( + "Unexpected match type: {match_type}" + ))); + } + } else { + return Err(Error::internal(format!( + "Unexpected column name: {first_col_name}" + ))); + }; + + let frags_col = batch.column(2).as_binary::(); + let fragments = RoaringBitmap::deserialize_from(frags_col.value(0))?; + + Ok((index_result, fragments)) + } +} diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 1ee7f42eaaf..c0ec321e984 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -4,6 +4,7 @@ use std::collections::HashSet; use datafusion::config::ConfigOptions; +use lance_select::result::IndexExprResultFormat; use std::ops::Range; use std::pin::Pin; use std::sync::{Arc, LazyLock}; @@ -95,9 +96,7 @@ use crate::io::exec::fts::{ BoostQueryExec, FlatMatchFilterExec, FlatMatchQueryExec, MatchQueryExec, PhraseQueryExec, }; use crate::io::exec::knn::MultivectorScoringExec; -use crate::io::exec::scalar_index::{ - INDEX_EXPR_RESULT_SCHEMA, MaterializeIndexExec, ScalarIndexExec, serialize_index_expr_result, -}; +use crate::io::exec::scalar_index::{MaterializeIndexExec, ScalarIndexExec}; use crate::io::exec::{ AddRowAddrExec, FilterPlan as ExprFilterPlan, KNNVectorDistanceExec, LancePushdownScanExec, LanceScanExec, Planner, PreFilterSource, ScanConfig, TakeExec, @@ -106,7 +105,10 @@ use crate::io::exec::{ }, project, }; -use crate::io::exec::{AddRowOffsetExec, LanceFilterExec, LanceScanConfig, get_physical_optimizer}; +use crate::io::exec::{ + AddRowOffsetExec, LANCE_RELATIONAL_ALGEBRA_VERSION, LanceFilterExec, LanceScanConfig, + get_physical_optimizer, +}; use crate::{Error, Result}; use crate::{ datatypes::Schema, @@ -824,6 +826,9 @@ pub struct Scanner { aggregate: Option, + /// Which version of the relational algebra to use when generating the physical plan + relational_algebra_version: u32, + // Legacy fields to help migrate some old projection behavior to new behavior // // There are two behaviors we are moving away from: @@ -1047,6 +1052,7 @@ impl Scanner { legacy_with_row_id: false, explicit_projection: false, autoproject_scoring_columns: true, + relational_algebra_version: LANCE_RELATIONAL_ALGEBRA_VERSION, }; scanner.apply_blob_handling(); scanner @@ -2795,6 +2801,15 @@ impl Scanner { }) } + fn index_expr_result_format(&self) -> IndexExprResultFormat { + if self.relational_algebra_version > 1 { + IndexExprResultFormat::TwoMask + } else { + // In version 1 we used the legacy three-variant format for index expr results + IndexExprResultFormat::ThreeVariant + } + } + // Helper function for filtered_read // // Do not call this directly, use filtered_read instead @@ -2842,9 +2857,13 @@ impl Scanner { read_options = read_options.with_only_indexed_fragments(); } + let result_format = self.index_expr_result_format(); let index_input = filter_plan.index_query.clone().map(|index_query| { - Arc::new(ScalarIndexExec::new(self.dataset.clone(), index_query)) - as Arc + Arc::new(ScalarIndexExec::new( + self.dataset.clone(), + index_query, + result_format, + )) as Arc }); Ok(Arc::new(FilteredReadExec::try_new( @@ -2901,12 +2920,11 @@ impl Scanner { let row_addr_mask = RowAddrMask::from_allowed(row_addrs); let index_result = IndexExprResult::exact(row_addr_mask); let fragments_covered = self.dataset.fragment_bitmap.as_ref().clone(); - let batch = serialize_index_expr_result(&index_result, &fragments_covered)?; + let format = self.index_expr_result_format(); + let batch = index_result.serialize(&fragments_covered, format)?; + let schema = batch.schema(); let stream = futures::stream::once(async move { Ok(batch) }); - let stream = Box::pin(RecordBatchStreamAdapter::new( - INDEX_EXPR_RESULT_SCHEMA.clone(), - stream, - )); + let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream)); Ok(Arc::new(OneShotExec::new(stream))) } @@ -4728,13 +4746,14 @@ impl Scanner { if missing_frags.is_empty() || self.fast_search { log::trace!("prefilter entirely satisfied by exact index search"); + let result_format = self.index_expr_result_format(); // We can only avoid materializing the index for a prefilter if: // 1. The search is indexed // 2. The index search is an exact search with no recheck or refine // 3. The indices cover at least the same fragments as the vector index, // unless fast_search allows skipping uncovered fragments. return Ok(PreFilterSource::ScalarIndexQuery(Arc::new( - ScalarIndexExec::new(self.dataset.clone(), index_query.clone()), + ScalarIndexExec::new(self.dataset.clone(), index_query.clone(), result_format), ))); } else { log::trace!("exact index search did not cover all fragments"); diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index 75376967984..8088c379f0e 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -38,7 +38,7 @@ use lance_datafusion::utils::{ ROWS_SCANNED_METRIC, TASK_WAIT_TIME_METRIC, }; use lance_file::reader::FileReaderOptions; -use lance_index::scalar::expression::{FilterPlan, IndexExprResult, deserialize_index_expr_result}; +use lance_index::scalar::expression::{FilterPlan, IndexExprResult}; use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; use lance_select::{RowAddrSelection, RowAddrTreeMap, bitmap_to_ranges, ranges_to_bitmap}; use lance_table::format::Fragment; @@ -77,7 +77,7 @@ impl EvaluatedIndex { } pub fn try_from_arrow(batch: &RecordBatch) -> Result { - let (index_result, applicable_fragments) = deserialize_index_expr_result(batch)?; + let (index_result, applicable_fragments) = IndexExprResult::deserialize(batch)?; Ok(Self { index_result, @@ -2134,6 +2134,7 @@ mod tests { optimize::OptimizeOptions, scalar::{ScalarIndexParams, expression::PlannerIndexExt}, }; + use lance_select::result::IndexExprResultFormat; use crate::{ dataset::{InsertBuilder, WriteDestination, WriteMode, WriteParams}, @@ -2264,6 +2265,7 @@ mod tests { Some(Arc::new(ScalarIndexExec::new( self.dataset.clone(), index_query, + IndexExprResultFormat::default(), ))) } else { None @@ -2361,11 +2363,10 @@ mod tests { /// Round-trip every interval shape through the arrow wire format and /// confirm the endpoints survive. Exercises both - /// `serialize_index_expr_result` and `EvaluatedIndex::try_from_arrow` + /// `IndexExprResult::serialize` and `EvaluatedIndex::try_from_arrow` /// so the schema names stay in sync. #[test] fn test_index_expr_result_serialize_roundtrip() { - use lance_index::scalar::expression::serialize_index_expr_result; use lance_select::{RowAddrMask, RowAddrTreeMap}; let mk = |rows: &[u64]| RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(rows)); @@ -2389,7 +2390,8 @@ mod tests { ]; for (name, original) in cases { - let batch = serialize_index_expr_result(&original, &frags) + let batch = original + .serialize(&frags, IndexExprResultFormat::default()) .unwrap_or_else(|e| panic!("serialize {name}: {e}")); let decoded = EvaluatedIndex::try_from_arrow(&batch) .unwrap_or_else(|e| panic!("try_from_arrow {name}: {e}")); diff --git a/rust/lance/src/io/exec/scalar_index.rs b/rust/lance/src/io/exec/scalar_index.rs index 727728739e8..9a0614632af 100644 --- a/rust/lance/src/io/exec/scalar_index.rs +++ b/rust/lance/src/io/exec/scalar_index.rs @@ -13,7 +13,6 @@ use crate::{ prefilter::DatasetPreFilter, scalar_logical::{open_named_scalar_index, scalar_index_fragment_bitmap}, }, - io::exec::LANCE_RELATIONAL_ALGEBRA_VERSION, }; use arrow_array::{Array, RecordBatch, UInt64Array}; use arrow_schema::{Schema, SchemaRef}; @@ -41,47 +40,14 @@ use lance_index::{ metrics::MetricsCollector, scalar::{ SargableQuery, ScalarIndex, - expression::{ - IndexExprResult, LEGACY_INDEX_EXPR_RESULT_SCHEMA, ScalarIndexExpr, ScalarIndexLoader, - ScalarIndexSearch, - }, + expression::{IndexExprResult, ScalarIndexExpr, ScalarIndexLoader, ScalarIndexSearch}, }, }; -use lance_select::{RowAddrMask, RowAddrTreeMap, RowSetOps}; +use lance_select::{RowAddrMask, RowAddrTreeMap, RowSetOps, result::IndexExprResultFormat}; use lance_table::format::Fragment; use roaring::RoaringBitmap; use tracing::{debug_span, instrument}; -pub fn serialize_index_expr_result( - result: &IndexExprResult, - fragments_covered_by_result: &RoaringBitmap, -) -> Result { - if LANCE_RELATIONAL_ALGEBRA_VERSION > 1 { - lance_index::scalar::expression::serialize_index_expr_result( - result, - fragments_covered_by_result, - ) - } else { - lance_index::scalar::expression::legacy_serialize_index_expr_result( - result, - fragments_covered_by_result, - ) - } -} - -/// Schema of the record batch emitted by [`serialize_index_expr_result`] under -/// the currently-active [`LANCE_RELATIONAL_ALGEBRA_VERSION`]. Use this anywhere -/// the schema must match what `ScalarIndexExec` (or any other producer that -/// calls the wrapper) actually emits — otherwise the plan's advertised schema -/// will drift from the wire format. -pub static INDEX_EXPR_RESULT_SCHEMA: LazyLock = LazyLock::new(|| { - if LANCE_RELATIONAL_ALGEBRA_VERSION > 1 { - lance_index::scalar::expression::INDEX_EXPR_RESULT_SCHEMA.clone() - } else { - LEGACY_INDEX_EXPR_RESULT_SCHEMA.clone() - } -}); - #[async_trait] impl ScalarIndexLoader for Dataset { async fn load_index( @@ -107,6 +73,7 @@ pub struct ScalarIndexExec { expr: ScalarIndexExpr, properties: Arc, metrics: ExecutionPlanMetricsSet, + result_format: IndexExprResultFormat, } impl DisplayAs for ScalarIndexExec { @@ -123,9 +90,13 @@ impl DisplayAs for ScalarIndexExec { } impl ScalarIndexExec { - pub fn new(dataset: Arc, expr: ScalarIndexExpr) -> Self { + pub fn new( + dataset: Arc, + expr: ScalarIndexExpr, + result_format: IndexExprResultFormat, + ) -> Self { let properties = Arc::new(PlanProperties::new( - EquivalenceProperties::new(INDEX_EXPR_RESULT_SCHEMA.clone()), + EquivalenceProperties::new(result_format.schema().clone()), Partitioning::RoundRobinBatch(1), EmissionType::Incremental, Boundedness::Bounded, @@ -135,6 +106,7 @@ impl ScalarIndexExec { expr, properties, metrics: ExecutionPlanMetricsSet::new(), + result_format, } } @@ -172,6 +144,7 @@ impl ScalarIndexExec { expr: ScalarIndexExpr, dataset: Arc, plan_metrics: ExecutionPlanMetricsSet, + result_format: IndexExprResultFormat, ) -> Result { let metrics = IndexMetrics::new(&plan_metrics, 0); let query_result = { @@ -184,7 +157,7 @@ impl ScalarIndexExec { { let ser_time = plan_metrics.new_time(SCALAR_INDEX_SER_TIME_METRIC, 0); let _timer = ser_time.timer(); - serialize_index_expr_result(&query_result, &fragments_covered_by_result) + query_result.serialize(&fragments_covered_by_result, result_format) } } } @@ -199,7 +172,7 @@ impl ExecutionPlan for ScalarIndexExec { } fn schema(&self) -> SchemaRef { - INDEX_EXPR_RESULT_SCHEMA.clone() + self.result_format.schema().clone() } fn children(&self) -> Vec<&Arc> { @@ -228,13 +201,14 @@ impl ExecutionPlan for ScalarIndexExec { self.expr.clone(), self.dataset.clone(), self.metrics.clone(), + self.result_format, ); let stream = futures::stream::iter(vec![batch_fut]) .then(|batch_fut| batch_fut.map_err(|err| err.into())) .boxed() as BoxStream<'static, datafusion::common::Result>; Ok(Box::pin(InstrumentedRecordBatchStreamAdapter::new( - INDEX_EXPR_RESULT_SCHEMA.clone(), + self.result_format.schema().clone(), stream, partition, &self.metrics, @@ -247,7 +221,7 @@ impl ExecutionPlan for ScalarIndexExec { ) -> datafusion::error::Result { Ok(datafusion::physical_plan::Statistics { num_rows: datafusion::common::stats::Precision::Exact(2), - ..datafusion::physical_plan::Statistics::new_unknown(&INDEX_EXPR_RESULT_SCHEMA) + ..datafusion::physical_plan::Statistics::new_unknown(self.result_format.schema()) }) } @@ -780,10 +754,8 @@ mod tests { use std::{ops::Bound, sync::Arc}; use crate::index::DatasetIndexExt; - use arrow::{ - array::AsArray, - datatypes::{UInt32Type, UInt64Type}, - }; + use arrow::datatypes::UInt64Type; + use arrow_schema::Schema; use datafusion::{ execution::TaskContext, physical_plan::ExecutionPlan, prelude::SessionConfig, scalar::ScalarValue, @@ -795,12 +767,10 @@ mod tests { IndexType, scalar::{ SargableQuery, ScalarIndexParams, - expression::{ - LEGACY_INDEX_EXPR_RESULT_SCHEMA, ScalarIndexExpr, ScalarIndexSearch, - deserialize_index_expr_result, - }, + expression::{ScalarIndexExpr, ScalarIndexSearch}, }, }; + use lance_select::result::IndexExprResultFormat; use crate::{ Dataset, @@ -887,12 +857,11 @@ mod tests { /// `ScalarIndexExec::schema()` (and the stream it emits) must advertise /// the same schema the batch actually carries — otherwise downstream /// consumers that trust `ExecutionPlan::schema()` will see a different - /// shape than they receive. While `LANCE_RELATIONAL_ALGEBRA_VERSION == - /// 1`, both must be the legacy layout. This test also exercises - /// `partition_statistics` and the stream's `RecordBatchStream::schema` - /// to catch drift in either advertisement path. + /// shape than they receive. + /// + /// The schema depends on the `IndexExprResultFormat` passed to `ScalarIndexExec::new`. #[tokio::test] - async fn test_scalar_index_exec_advertises_legacy_schema() { + async fn test_scalar_index_exec_advertises_correct_schema() { let TestFixture { dataset, _tmp_dir_guard, @@ -909,69 +878,36 @@ mod tests { needs_recheck: false, }); - let plan = ScalarIndexExec::new(dataset, query); + let verify = async |plan: ScalarIndexExec, schema: Arc| { + assert_eq!(plan.schema(), schema); + assert_eq!( + plan.partition_statistics(None) + .unwrap() + .column_statistics + .len(), + schema.fields().len(), + ); + + let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap(); + assert_eq!(stream.schema(), schema); + let batches = stream.try_collect::>().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].schema(), schema); + }; - let legacy_schema = LEGACY_INDEX_EXPR_RESULT_SCHEMA.clone(); - assert_eq!(plan.schema(), legacy_schema); - assert_eq!( - plan.partition_statistics(None) - .unwrap() - .column_statistics - .len(), - legacy_schema.fields().len(), + let plan = ScalarIndexExec::new( + dataset.clone(), + query.clone(), + IndexExprResultFormat::ThreeVariant, ); + let schema = IndexExprResultFormat::ThreeVariant.schema().clone(); - let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap(); - assert_eq!(stream.schema(), legacy_schema); - let batches = stream.try_collect::>().await.unwrap(); - assert_eq!(batches.len(), 1); - assert_eq!(batches[0].schema(), legacy_schema); - } + verify(plan, schema).await; - /// The wire format that `ScalarIndexExec` hands to the read planner is - /// part of the relational-algebra public surface (see - /// `LANCE_RELATIONAL_ALGEBRA_VERSION`). While version is `1` the layout - /// must remain the pre-`{lower, upper}`-refactor `{result, discriminant, - /// fragments_covered}` shape so older read planners can still consume - /// the batch. This test pins that behavior end-to-end through - /// `ScalarIndexExec::execute`. - #[tokio::test] - async fn test_scalar_index_exec_returns_legacy_format() { - let TestFixture { - dataset, - _tmp_dir_guard, - } = test_fixture().await; + let plan = ScalarIndexExec::new(dataset, query, IndexExprResultFormat::TwoMask); + let schema = IndexExprResultFormat::TwoMask.schema().clone(); - let query = ScalarIndexExpr::Query(ScalarIndexSearch { - column: "ordered".to_string(), - index_name: "ordered_idx".to_string(), - index_type: "BTree".to_string(), - query: Arc::new(SargableQuery::Range( - Bound::Unbounded, - Bound::Excluded(ScalarValue::UInt64(Some(47))), - )), - needs_recheck: false, - }); - - let plan = ScalarIndexExec::new(dataset, query); - let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap(); - let batches = stream.try_collect::>().await.unwrap(); - - assert_eq!(batches.len(), 1); - let batch = &batches[0]; - assert_eq!(batch.schema(), *LEGACY_INDEX_EXPR_RESULT_SCHEMA); - assert_eq!(batch.num_rows(), 2); - - // Discriminant 0 == Exact; a `<` query over a BTree index is exact. - let discriminant = batch.column(1).as_primitive::(); - assert_eq!(discriminant.value(0), 0); - assert_eq!(discriminant.value(1), 0); - - // The batch should still round-trip back to an `IndexExprResult` covering - // 47 rows (one per matching value in `ordered < 47`). - let (result, _frags) = deserialize_index_expr_result(batch).unwrap(); - assert!(result.is_exact()); - assert_eq!(result.upper.max_len().unwrap(), 47); + verify(plan, schema).await; } #[test] @@ -995,7 +931,11 @@ mod tests { // These plans aren't even valid but it appears we defer all work (even validation) until // read time. - let plan = ScalarIndexExec::new(arc_dasaset.clone(), query.clone()); + let plan = ScalarIndexExec::new( + arc_dasaset.clone(), + query.clone(), + IndexExprResultFormat::default(), + ); plan.execute(0, Arc::new(TaskContext::default())).unwrap(); let plan = MapIndexExec::new( From c4f8c5793f73ac1d9f7d60a4856b8e5906570e91 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 29 May 2026 15:21:32 +0000 Subject: [PATCH 04/10] chore: regenerate python/Cargo.lock after lance-select dep additions --- python/Cargo.lock | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/python/Cargo.lock b/python/Cargo.lock index 332ff238fe5..0b0419c0c4b 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -1244,9 +1244,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.62" +version = "1.2.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1dce859f0832a7d088c4f1119888ab94ef4b5d6795d1ce05afb7fe159d79f98" +checksum = "556e016178bb5662a08681bbe0f00f8e17631781a4dfc8c45e466e4b185ec27f" dependencies = [ "find-msvc-tools", "jobserver", @@ -4504,12 +4504,14 @@ version = "7.2.0-beta.1" dependencies = [ "arrow-array", "arrow-buffer", + "arrow-schema", "byteorder", "bytes", "deepsize", "itertools 0.13.0", "lance-core", "roaring", + "tracing", ] [[package]] @@ -7286,9 +7288,9 @@ dependencies = [ [[package]] name = "shlex" -version = "1.3.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +checksum = "f8fadd59c855ef2080decdef8ff161eb6661b86933c9d82e5ba29dc602a55aba" [[package]] name = "signal-hook-registry" @@ -8292,9 +8294,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.23.1" +version = "1.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddd74a9687298c6858e9b88ec8935ec45d22e8fd5e6394fa1bd4e99a87789c76" +checksum = "d258b83ceec21034727ecee8c382cfa6c3e133699b0742c64571814fb420c9f7" dependencies = [ "getrandom 0.4.2", "js-sys", From d1a44d004ed3baa327d2e46ee0b2e7889617ea71 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 29 May 2026 08:23:32 -0700 Subject: [PATCH 05/10] Fix typo --- rust/lance/src/io/exec.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/io/exec.rs b/rust/lance/src/io/exec.rs index 817ee97aa7d..f06f575de68 100644 --- a/rust/lance/src/io/exec.rs +++ b/rust/lance/src/io/exec.rs @@ -41,7 +41,7 @@ pub use utils::PreFilterSource; /// Declares which vesrion of the relational algebra we are producing /// /// In order to enable plan pushdown (executing parts of the physical plan on different nodes) -/// we must treat the physical plan serialization and the inputs and outputs of phsyical plan +/// we must treat the physical plan serialization and the inputs and outputs of phyical plan /// nodes as part of the public API surface. /// /// We should attempt to handle as many versions as possible on read paths. This variable From 9a84f6f0d3077ee23f8d138bada227b969fd0841 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 29 May 2026 15:23:51 +0000 Subject: [PATCH 06/10] fix(docs): remove stale serialize_legacy intra-doc link --- rust/lance-select/src/result.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rust/lance-select/src/result.rs b/rust/lance-select/src/result.rs index 340c315f243..4bfc424ce55 100644 --- a/rust/lance-select/src/result.rs +++ b/rust/lance-select/src/result.rs @@ -381,8 +381,7 @@ impl IndexExprResult { } } - /// Deserialize from a record batch produced by [`Self::serialize`] or - /// [`Self::serialize_legacy`]. + /// Deserialize from a record batch produced by [`Self::serialize`]. pub fn deserialize(batch: &RecordBatch) -> Result<(Self, RoaringBitmap)> { use arrow_array::cast::AsArray; From ecaf42841beb9405c4353936f46c2230f4c7684f Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 29 May 2026 17:08:11 +0000 Subject: [PATCH 07/10] perf(select): make is_exact() O(1) with a private exact flag Replace the O(n) `self.lower == self.upper` mask comparison in `IndexExprResult::is_exact` and `NullableIndexExprResult::is_exact` with a private `exact: bool` field set at construction time. The flag is propagated elementwise through Not/BitAnd/BitOr and across `drop_nulls`, so it stays correct without ever doing a full mask comparison. `IndexExprResult::deserialize` sets it by examining the wire format rather than comparing masks. Add `IndexExprResult::new(lower, upper)` for constructing refined intervals (lower strictly inside upper) where `exact = false` by definition; update the two test sites that used struct literal syntax directly. Co-Authored-By: Claude Sonnet 4.6 --- rust/lance-index/src/scalar/expression.rs | 8 ++-- rust/lance-select/src/result.rs | 52 +++++++++++++++++++---- rust/lance/src/io/exec/filtered_read.rs | 5 +-- 3 files changed, 48 insertions(+), 17 deletions(-) diff --git a/rust/lance-index/src/scalar/expression.rs b/rust/lance-index/src/scalar/expression.rs index e1d971d6855..2215fc94764 100644 --- a/rust/lance-index/src/scalar/expression.rs +++ b/rust/lance-index/src/scalar/expression.rs @@ -3089,10 +3089,10 @@ mod tests { let lower_addrs = RowAddrTreeMap::from_iter(0u64..3); let upper_addrs = RowAddrTreeMap::from_iter(0u64..10); - let refined = IndexExprResult { - lower: RowAddrMask::from_allowed(lower_addrs), - upper: RowAddrMask::from_allowed(upper_addrs.clone()), - }; + let refined = IndexExprResult::new( + RowAddrMask::from_allowed(lower_addrs), + RowAddrMask::from_allowed(upper_addrs.clone()), + ); assert!(!refined.is_exact() && !refined.is_at_most() && !refined.is_at_least()); let fragments_covered = RoaringBitmap::from_iter([0u32, 1]); diff --git a/rust/lance-select/src/result.rs b/rust/lance-select/src/result.rs index 4bfc424ce55..14c91cea762 100644 --- a/rust/lance-select/src/result.rs +++ b/rust/lance-select/src/result.rs @@ -61,6 +61,9 @@ pub struct NullableIndexExprResult { /// Rows that may be TRUE. Rows outside `upper` are guaranteed to be /// FALSE / NULL (and so not in a `WHERE` answer set). pub upper: NullableRowAddrMask, + // O(1) cache for is_exact(). Set by constructors and propagated + // elementwise through the boolean algebra. + exact: bool, } impl NullableIndexExprResult { @@ -70,6 +73,7 @@ impl NullableIndexExprResult { Self { lower: mask.clone(), upper: mask, + exact: true, } } @@ -80,6 +84,7 @@ impl NullableIndexExprResult { Self { lower: NullableRowAddrMask::allow_nothing(), upper: mask, + exact: false, } } @@ -90,10 +95,11 @@ impl NullableIndexExprResult { Self { lower: mask, upper: NullableRowAddrMask::all_rows(), + exact: false, } } - /// True if `lower == upper` — the answer is precisely the lower + /// True if the result is exact — the answer is precisely the lower /// (== upper) mask. /// /// This is a **structural** check on the canonical form produced by @@ -109,7 +115,7 @@ impl NullableIndexExprResult { /// The three shape predicates are not mutually exclusive — see the /// note on [`Self::is_at_least`] for the precedence convention. pub fn is_exact(&self) -> bool { - self.lower == self.upper + self.exact } /// True if `lower` matches no rows (canonical `AllowList(∅)`) — the @@ -142,6 +148,7 @@ impl NullableIndexExprResult { IndexExprResult { lower: self.lower.drop_nulls(), upper: self.upper.drop_nulls(), + exact: self.exact, } } } @@ -153,6 +160,7 @@ impl std::ops::Not for NullableIndexExprResult { Self { lower: !self.upper, upper: !self.lower, + exact: self.exact, } } } @@ -164,6 +172,7 @@ impl std::ops::BitAnd for NullableIndexExprResult { Self { lower: self.lower & rhs.lower, upper: self.upper & rhs.upper, + exact: self.exact && rhs.exact, } } } @@ -175,6 +184,7 @@ impl std::ops::BitOr for NullableIndexExprResult { Self { lower: self.lower | rhs.lower, upper: self.upper | rhs.upper, + exact: self.exact && rhs.exact, } } } @@ -188,6 +198,9 @@ pub struct IndexExprResult { /// Rows that may be in the answer. Rows outside `upper` are /// guaranteed not in the answer. pub upper: RowAddrMask, + // O(1) cache for is_exact(). Set by constructors and propagated + // elementwise through the boolean algebra. + exact: bool, } impl IndexExprResult { @@ -197,6 +210,7 @@ impl IndexExprResult { Self { lower: mask.clone(), upper: mask, + exact: true, } } @@ -205,6 +219,7 @@ impl IndexExprResult { Self { lower: RowAddrMask::allow_nothing(), upper: mask, + exact: false, } } @@ -213,15 +228,30 @@ impl IndexExprResult { Self { lower: mask, upper: RowAddrMask::all_rows(), + exact: false, } } - /// True if `lower == upper` — the answer is precisely the lower + /// Construct a refined interval result — `lower` rows are guaranteed + /// matches and `upper` rows are candidates, with `lower ⊆ upper`. + /// + /// Use [`Self::exact`] / [`Self::at_most`] / [`Self::at_least`] for + /// the three degenerate shapes; this constructor is only needed when + /// both endpoints are non-trivial. + pub fn new(lower: RowAddrMask, upper: RowAddrMask) -> Self { + Self { + lower, + upper, + exact: false, + } + } + + /// True if the result is exact — the answer is precisely the lower /// (== upper) mask. See [`NullableIndexExprResult::is_exact`] for the /// structural-form caveat and the precedence convention shared with /// [`Self::is_at_most`] / [`Self::is_at_least`]. pub fn is_exact(&self) -> bool { - self.lower == self.upper + self.exact } /// True if `lower` matches no rows (canonical `AllowList(∅)`) — the @@ -247,6 +277,7 @@ impl std::ops::Not for IndexExprResult { Self { lower: !self.upper, upper: !self.lower, + exact: self.exact, } } } @@ -258,6 +289,7 @@ impl std::ops::BitAnd for IndexExprResult { Self { lower: self.lower & rhs.lower, upper: self.upper & rhs.upper, + exact: self.exact && rhs.exact, } } } @@ -269,6 +301,7 @@ impl std::ops::BitOr for IndexExprResult { Self { lower: self.lower | rhs.lower, upper: self.upper | rhs.upper, + exact: self.exact && rhs.exact, } } } @@ -408,12 +441,13 @@ impl IndexExprResult { let index_result = if first_col_name == "lower" { let lower = RowAddrMask::from_arrow(batch.column(0).as_binary())?; let upper_col = batch.column(1).as_binary::(); - let upper = if upper_col.is_null(0) && upper_col.is_null(1) { - lower.clone() + if upper_col.is_null(0) && upper_col.is_null(1) { + // Null upper column is the serialized form of an exact result. + Self::exact(lower) } else { - RowAddrMask::from_arrow(upper_col)? - }; - Self { lower, upper } + let upper = RowAddrMask::from_arrow(upper_col)?; + Self { lower, upper, exact: false } + } } else if first_col_name == "result" { let row_addr_mask = RowAddrMask::from_arrow(batch.column(0).as_binary())?; let match_type = batch diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index 8088c379f0e..3607d9b110c 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -2382,10 +2382,7 @@ mod tests { // Refined: non-empty lower strictly inside non-universe upper. ( "refined", - IndexExprResult { - lower: mk(&[1, 2]), - upper: mk(&[1, 2, 3]), - }, + IndexExprResult::new(mk(&[1, 2]), mk(&[1, 2, 3])), ), ]; From 110a0cc7eb3d2fcc362daa56c87a73e9cef27177 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 29 May 2026 17:55:53 +0000 Subject: [PATCH 08/10] style: fix cargo fmt violations from is_exact O(1) change Co-Authored-By: Claude Sonnet 4.6 --- rust/lance-select/src/result.rs | 6 +++++- rust/lance/src/io/exec/filtered_read.rs | 5 +---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/rust/lance-select/src/result.rs b/rust/lance-select/src/result.rs index 14c91cea762..8b41e3efc18 100644 --- a/rust/lance-select/src/result.rs +++ b/rust/lance-select/src/result.rs @@ -446,7 +446,11 @@ impl IndexExprResult { Self::exact(lower) } else { let upper = RowAddrMask::from_arrow(upper_col)?; - Self { lower, upper, exact: false } + Self { + lower, + upper, + exact: false, + } } } else if first_col_name == "result" { let row_addr_mask = RowAddrMask::from_arrow(batch.column(0).as_binary())?; diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index 3607d9b110c..f62fc700305 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -2380,10 +2380,7 @@ mod tests { ("at_most", IndexExprResult::at_most(mk(&[1, 2, 3]))), ("at_least", IndexExprResult::at_least(mk(&[1, 2]))), // Refined: non-empty lower strictly inside non-universe upper. - ( - "refined", - IndexExprResult::new(mk(&[1, 2]), mk(&[1, 2, 3])), - ), + ("refined", IndexExprResult::new(mk(&[1, 2]), mk(&[1, 2, 3]))), ]; for (name, original) in cases { From 0de2697de8a9b6750a1edfdfbe0c0c8618cfe132 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 29 May 2026 21:45:35 +0000 Subject: [PATCH 09/10] fix: use IndexExprResult::deserialize in SelectionVectorToPrefilter to handle both wire formats SelectionVectorToPrefilter::load() was accessing batch["upper"] directly, which only exists in the TwoMask wire format. ScalarIndexExec uses the ThreeVariant format when LANCE_RELATIONAL_ALGEBRA_VERSION <= 1. The mismatch caused batch["upper"].unwrap() to panic inside a tokio::spawn task, leaving the AsyncCell unset and causing wait_for_ready() to hang forever. Fix by delegating to IndexExprResult::deserialize(), which already handles both TwoMask and ThreeVariant formats and returns the result's upper mask. Co-Authored-By: Claude Sonnet 4.6 --- rust/lance/src/io/exec/utils.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/rust/lance/src/io/exec/utils.rs b/rust/lance/src/io/exec/utils.rs index 488491691c0..5def0fb254d 100644 --- a/rust/lance/src/io/exec/utils.rs +++ b/rust/lance/src/io/exec/utils.rs @@ -13,7 +13,6 @@ use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; -use arrow::array::AsArray; use arrow_array::{RecordBatch, UInt64Array}; use arrow_schema::SchemaRef; use async_trait::async_trait; @@ -30,7 +29,7 @@ use lance_core::error::{CloneableResult, Error}; use lance_core::utils::futures::{Capacity, SharedStreamExt}; use lance_core::{ROW_ID, Result}; use lance_index::prefilter::FilterLoader; -use lance_select::{RowAddrMask, RowAddrTreeMap}; +use lance_select::{RowAddrMask, RowAddrTreeMap, result::IndexExprResult}; use std::future::Future; use crate::Dataset; @@ -101,17 +100,15 @@ impl FilterLoader for SelectionVectorToPrefilter { Error::internal("Selection vector source for prefilter did not yield any batches") })?; // The vector-search prefilter wants the set of rows the search is - // allowed to consider. Under the {lower, upper} interval form the - // `upper` mask is exactly that — rows outside it are guaranteed - // not to match. For an `Exact` result `upper == lower`; for - // `AtLeast` `upper` is the universe (which lets the search see - // everything, matching the AtLeast semantics). - RowAddrMask::from_arrow(batch["upper"].as_binary_opt::().ok_or_else(|| { - Error::internal(format!( - "Expected selection vector input to yield binary arrays but got {}", - batch["upper"].data_type() - )) - })?) + // allowed to consider — the `upper` bound of the index expression + // result. Rows outside the upper bound are guaranteed not to match, + // so the vector search can skip them. + // + // Use deserialize() here (rather than indexing "upper" directly) to + // support both the TwoMask and the legacy ThreeVariant wire formats + // that ScalarIndexExec may emit. + let (result, _) = IndexExprResult::deserialize(&batch)?; + Ok(result.upper) } } From c64c703fdb4efda3e7146a8ba3dc5eebff4ba624 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 29 May 2026 16:41:32 -0700 Subject: [PATCH 10/10] Minor cleanup --- rust/lance-index/src/scalar/expression.rs | 24 ++++++++------------- rust/lance-select/src/result.rs | 12 ++++++----- rust/lance/src/dataset/scanner.rs | 12 +++++------ rust/lance/src/io/exec/filtered_read.rs | 12 ++++++----- rust/lance/src/io/exec/scalar_index.rs | 26 ++++++++++++----------- 5 files changed, 43 insertions(+), 43 deletions(-) diff --git a/rust/lance-index/src/scalar/expression.rs b/rust/lance-index/src/scalar/expression.rs index 2215fc94764..17cfc4a9854 100644 --- a/rust/lance-index/src/scalar/expression.rs +++ b/rust/lance-index/src/scalar/expression.rs @@ -21,7 +21,7 @@ use super::{ use super::{GeoQuery, RelationQuery}; use lance_core::{Error, Result}; use lance_datafusion::{expr::safe_coerce_scalar, planner::Planner}; -use lance_select::NullableRowAddrMask; +use lance_select::{IndexExprResult, NullableIndexExprResult, NullableRowAddrMask}; use tracing::instrument; const MAX_DEPTH: usize = 500; @@ -1287,12 +1287,6 @@ impl std::fmt::Display for ScalarIndexExpr { } } -// `IndexExprResult` and `NullableIndexExprResult` (along with their schemas and -// wire-format methods) live in `lance-select` so that benchmarks and downstream -// consumers can depend on the mask substrate without pulling in all of -// `lance-index`. -pub use lance_select::{IndexExprResult, NullableIndexExprResult}; - impl From for NullableIndexExprResult { fn from(result: SearchResult) -> Self { match result { @@ -1963,7 +1957,7 @@ mod tests { use datafusion_common::{Column, DFSchema}; use datafusion_expr::simplify::SimplifyContext; use lance_datafusion::exec::{LanceExecutionOptions, get_session_context}; - use lance_select::result::IndexExprResultFormat; + use lance_select::result::IndexExprResultWireFormat; use roaring::RoaringBitmap; use crate::scalar::json::{JsonQuery, JsonQueryParser}; @@ -2967,8 +2961,8 @@ mod tests { use lance_select::{RowAddrMask, RowAddrTreeMap}; for format in [ - IndexExprResultFormat::TwoMask, - IndexExprResultFormat::ThreeVariant, + IndexExprResultWireFormat::TwoMask, + IndexExprResultWireFormat::ThreeVariant, ] { let mut addrs = RowAddrTreeMap::new(); addrs.insert_range(0..5); @@ -3052,7 +3046,7 @@ mod tests { // Exact: upper column must be fully null on the wire. let exact_batch = IndexExprResult::exact(mask.clone()) - .serialize(&fragments_covered, IndexExprResultFormat::TwoMask) + .serialize(&fragments_covered, IndexExprResultWireFormat::TwoMask) .unwrap(); let exact_upper = exact_batch.column(1).as_binary::(); assert!(exact_upper.is_null(0) && exact_upper.is_null(1)); @@ -3061,7 +3055,7 @@ mod tests { // least one row is non-null (`AllowList(mask)` puts the payload at // row 1). let at_most_batch = IndexExprResult::at_most(mask.clone()) - .serialize(&fragments_covered, IndexExprResultFormat::TwoMask) + .serialize(&fragments_covered, IndexExprResultWireFormat::TwoMask) .unwrap(); let at_most_upper = at_most_batch.column(1).as_binary::(); assert!(!(at_most_upper.is_null(0) && at_most_upper.is_null(1))); @@ -3070,7 +3064,7 @@ mod tests { // encodes as `BlockList(empty)` — row 0 holds the empty-tree bytes, // row 1 is null. Round-trip must preserve `is_at_least`. let at_least_batch = IndexExprResult::at_least(mask) - .serialize(&fragments_covered, IndexExprResultFormat::TwoMask) + .serialize(&fragments_covered, IndexExprResultWireFormat::TwoMask) .unwrap(); let at_least_upper = at_least_batch.column(1).as_binary::(); assert!(!at_least_upper.is_null(0)); @@ -3098,11 +3092,11 @@ mod tests { let fragments_covered = RoaringBitmap::from_iter([0u32, 1]); let batch = refined - .serialize(&fragments_covered, IndexExprResultFormat::ThreeVariant) + .serialize(&fragments_covered, IndexExprResultWireFormat::ThreeVariant) .unwrap(); assert_eq!( batch.schema(), - *IndexExprResultFormat::ThreeVariant.schema() + *IndexExprResultWireFormat::ThreeVariant.schema() ); // Discriminant 1 == AtMost; the round-tripped result carries the diff --git a/rust/lance-select/src/result.rs b/rust/lance-select/src/result.rs index 8b41e3efc18..7b33eda2e4e 100644 --- a/rust/lance-select/src/result.rs +++ b/rust/lance-select/src/result.rs @@ -323,13 +323,13 @@ static THREE_VARIANT_RESULT_SCHEMA: LazyLock = LazyLock::new(|| { }); #[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] -pub enum IndexExprResultFormat { +pub enum IndexExprResultWireFormat { ThreeVariant, // A legacy format that used AtMost/AtLeast/Exact variants #[default] TwoMask, // The two-mask format with upper and lower } -impl IndexExprResultFormat { +impl IndexExprResultWireFormat { pub fn schema(&self) -> &SchemaRef { match self { Self::ThreeVariant => &THREE_VARIANT_RESULT_SCHEMA, @@ -406,11 +406,13 @@ impl IndexExprResult { pub fn serialize( &self, fragments_covered: &RoaringBitmap, - format: IndexExprResultFormat, + format: IndexExprResultWireFormat, ) -> Result { match format { - IndexExprResultFormat::ThreeVariant => self.serialize_three_variant(fragments_covered), - IndexExprResultFormat::TwoMask => self.serialize_standard(fragments_covered), + IndexExprResultWireFormat::ThreeVariant => { + self.serialize_three_variant(fragments_covered) + } + IndexExprResultWireFormat::TwoMask => self.serialize_standard(fragments_covered), } } diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index c0ec321e984..a01e34d28a1 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -4,7 +4,7 @@ use std::collections::HashSet; use datafusion::config::ConfigOptions; -use lance_select::result::IndexExprResultFormat; +use lance_select::result::IndexExprResultWireFormat; use std::ops::Range; use std::pin::Pin; use std::sync::{Arc, LazyLock}; @@ -66,8 +66,8 @@ use lance_datafusion::projection::ProjectionPlan; use lance_file::reader::FileReaderOptions; use lance_index::IndexCriteria; use lance_index::scalar::FullTextSearchQuery; +use lance_index::scalar::expression::PlannerIndexExt; use lance_index::scalar::expression::ScalarIndexExpr; -use lance_index::scalar::expression::{IndexExprResult, PlannerIndexExt}; use lance_index::scalar::inverted::query::{ FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery, PhraseQuery, fill_fts_query_column, }; @@ -76,7 +76,7 @@ use lance_index::vector::{DEFAULT_QUERY_PARALLELISM, DIST_COL, Query}; use lance_index::{metrics::NoOpMetricsCollector, scalar::inverted::FTS_SCHEMA}; use lance_io::stream::RecordBatchStream; use lance_linalg::distance::MetricType; -use lance_select::{RowAddrMask, RowAddrTreeMap}; +use lance_select::{IndexExprResult, RowAddrMask, RowAddrTreeMap}; use lance_table::format::{Fragment, IndexMetadata}; use roaring::RoaringBitmap; use tracing::{Span, info_span, instrument}; @@ -2801,12 +2801,12 @@ impl Scanner { }) } - fn index_expr_result_format(&self) -> IndexExprResultFormat { + fn index_expr_result_format(&self) -> IndexExprResultWireFormat { if self.relational_algebra_version > 1 { - IndexExprResultFormat::TwoMask + IndexExprResultWireFormat::TwoMask } else { // In version 1 we used the legacy three-variant format for index expr results - IndexExprResultFormat::ThreeVariant + IndexExprResultWireFormat::ThreeVariant } } diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index f62fc700305..c491e3f194b 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -38,9 +38,11 @@ use lance_datafusion::utils::{ ROWS_SCANNED_METRIC, TASK_WAIT_TIME_METRIC, }; use lance_file::reader::FileReaderOptions; -use lance_index::scalar::expression::{FilterPlan, IndexExprResult}; +use lance_index::scalar::expression::FilterPlan; use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; -use lance_select::{RowAddrSelection, RowAddrTreeMap, bitmap_to_ranges, ranges_to_bitmap}; +use lance_select::{ + IndexExprResult, RowAddrSelection, RowAddrTreeMap, bitmap_to_ranges, ranges_to_bitmap, +}; use lance_table::format::Fragment; use lance_table::rowids::RowIdSequence; use lance_table::utils::stream::ReadBatchFut; @@ -2134,7 +2136,7 @@ mod tests { optimize::OptimizeOptions, scalar::{ScalarIndexParams, expression::PlannerIndexExt}, }; - use lance_select::result::IndexExprResultFormat; + use lance_select::result::IndexExprResultWireFormat; use crate::{ dataset::{InsertBuilder, WriteDestination, WriteMode, WriteParams}, @@ -2265,7 +2267,7 @@ mod tests { Some(Arc::new(ScalarIndexExec::new( self.dataset.clone(), index_query, - IndexExprResultFormat::default(), + IndexExprResultWireFormat::default(), ))) } else { None @@ -2385,7 +2387,7 @@ mod tests { for (name, original) in cases { let batch = original - .serialize(&frags, IndexExprResultFormat::default()) + .serialize(&frags, IndexExprResultWireFormat::default()) .unwrap_or_else(|e| panic!("serialize {name}: {e}")); let decoded = EvaluatedIndex::try_from_arrow(&batch) .unwrap_or_else(|e| panic!("try_from_arrow {name}: {e}")); diff --git a/rust/lance/src/io/exec/scalar_index.rs b/rust/lance/src/io/exec/scalar_index.rs index 9a0614632af..7a8f3301f8f 100644 --- a/rust/lance/src/io/exec/scalar_index.rs +++ b/rust/lance/src/io/exec/scalar_index.rs @@ -40,10 +40,12 @@ use lance_index::{ metrics::MetricsCollector, scalar::{ SargableQuery, ScalarIndex, - expression::{IndexExprResult, ScalarIndexExpr, ScalarIndexLoader, ScalarIndexSearch}, + expression::{ScalarIndexExpr, ScalarIndexLoader, ScalarIndexSearch}, }, }; -use lance_select::{RowAddrMask, RowAddrTreeMap, RowSetOps, result::IndexExprResultFormat}; +use lance_select::{ + IndexExprResult, RowAddrMask, RowAddrTreeMap, RowSetOps, result::IndexExprResultWireFormat, +}; use lance_table::format::Fragment; use roaring::RoaringBitmap; use tracing::{debug_span, instrument}; @@ -73,7 +75,7 @@ pub struct ScalarIndexExec { expr: ScalarIndexExpr, properties: Arc, metrics: ExecutionPlanMetricsSet, - result_format: IndexExprResultFormat, + result_format: IndexExprResultWireFormat, } impl DisplayAs for ScalarIndexExec { @@ -93,7 +95,7 @@ impl ScalarIndexExec { pub fn new( dataset: Arc, expr: ScalarIndexExpr, - result_format: IndexExprResultFormat, + result_format: IndexExprResultWireFormat, ) -> Self { let properties = Arc::new(PlanProperties::new( EquivalenceProperties::new(result_format.schema().clone()), @@ -144,7 +146,7 @@ impl ScalarIndexExec { expr: ScalarIndexExpr, dataset: Arc, plan_metrics: ExecutionPlanMetricsSet, - result_format: IndexExprResultFormat, + result_format: IndexExprResultWireFormat, ) -> Result { let metrics = IndexMetrics::new(&plan_metrics, 0); let query_result = { @@ -770,7 +772,7 @@ mod tests { expression::{ScalarIndexExpr, ScalarIndexSearch}, }, }; - use lance_select::result::IndexExprResultFormat; + use lance_select::result::IndexExprResultWireFormat; use crate::{ Dataset, @@ -859,7 +861,7 @@ mod tests { /// consumers that trust `ExecutionPlan::schema()` will see a different /// shape than they receive. /// - /// The schema depends on the `IndexExprResultFormat` passed to `ScalarIndexExec::new`. + /// The schema depends on the `IndexExprResultWireFormat` passed to `ScalarIndexExec::new`. #[tokio::test] async fn test_scalar_index_exec_advertises_correct_schema() { let TestFixture { @@ -898,14 +900,14 @@ mod tests { let plan = ScalarIndexExec::new( dataset.clone(), query.clone(), - IndexExprResultFormat::ThreeVariant, + IndexExprResultWireFormat::ThreeVariant, ); - let schema = IndexExprResultFormat::ThreeVariant.schema().clone(); + let schema = IndexExprResultWireFormat::ThreeVariant.schema().clone(); verify(plan, schema).await; - let plan = ScalarIndexExec::new(dataset, query, IndexExprResultFormat::TwoMask); - let schema = IndexExprResultFormat::TwoMask.schema().clone(); + let plan = ScalarIndexExec::new(dataset, query, IndexExprResultWireFormat::TwoMask); + let schema = IndexExprResultWireFormat::TwoMask.schema().clone(); verify(plan, schema).await; } @@ -934,7 +936,7 @@ mod tests { let plan = ScalarIndexExec::new( arc_dasaset.clone(), query.clone(), - IndexExprResultFormat::default(), + IndexExprResultWireFormat::default(), ); plan.execute(0, Arc::new(TaskContext::default())).unwrap();