diff --git a/Cargo.lock b/Cargo.lock index 9df2444cde1..eb8d84ca905 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4984,6 +4984,7 @@ version = "7.2.0-beta.1" dependencies = [ "arrow-array", "arrow-buffer", + "arrow-schema", "byteorder", "bytes", "criterion", @@ -4993,6 +4994,7 @@ dependencies = [ "proptest", "roaring", "rstest", + "tracing", ] [[package]] diff --git a/python/Cargo.lock b/python/Cargo.lock index 332ff238fe5..0b0419c0c4b 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -1244,9 +1244,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.62" +version = "1.2.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1dce859f0832a7d088c4f1119888ab94ef4b5d6795d1ce05afb7fe159d79f98" +checksum = "556e016178bb5662a08681bbe0f00f8e17631781a4dfc8c45e466e4b185ec27f" dependencies = [ "find-msvc-tools", "jobserver", @@ -4504,12 +4504,14 @@ version = "7.2.0-beta.1" dependencies = [ "arrow-array", "arrow-buffer", + "arrow-schema", "byteorder", "bytes", "deepsize", "itertools 0.13.0", "lance-core", "roaring", + "tracing", ] [[package]] @@ -7286,9 +7288,9 @@ dependencies = [ [[package]] name = "shlex" -version = "1.3.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +checksum = "f8fadd59c855ef2080decdef8ff161eb6661b86933c9d82e5ba29dc602a55aba" [[package]] name = "signal-hook-registry" @@ -8292,9 +8294,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.23.1" +version = "1.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddd74a9687298c6858e9b88ec8935ec45d22e8fd5e6394fa1bd4e99a87789c76" +checksum = "d258b83ceec21034727ecee8c382cfa6c3e133699b0742c64571814fb420c9f7" dependencies = [ "getrandom 0.4.2", "js-sys", diff --git a/rust/lance-index/src/scalar/expression.rs b/rust/lance-index/src/scalar/expression.rs index 00ac3ebec7d..17cfc4a9854 100644 --- a/rust/lance-index/src/scalar/expression.rs +++ b/rust/lance-index/src/scalar/expression.rs @@ -1,14 +1,9 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::{ - ops::Bound, - sync::{Arc, LazyLock}, -}; +use std::{ops::Bound, sync::Arc}; -use arrow::array::BinaryBuilder; -use arrow_array::{Array, RecordBatch, UInt32Array}; -use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use arrow_schema::{DataType, Field}; use async_recursion::async_recursion; use async_trait::async_trait; use datafusion_common::ScalarValue; @@ -26,8 +21,7 @@ use super::{ use super::{GeoQuery, RelationQuery}; use lance_core::{Error, Result}; use lance_datafusion::{expr::safe_coerce_scalar, planner::Planner}; -use lance_select::{NullableRowAddrMask, RowAddrMask}; -use roaring::RoaringBitmap; +use lance_select::{IndexExprResult, NullableIndexExprResult, NullableRowAddrMask}; use tracing::instrument; const MAX_DEPTH: usize = 500; @@ -1293,82 +1287,16 @@ impl std::fmt::Display for ScalarIndexExpr { } } -/// When we evaluate a scalar index query we return a batch with three columns and two rows -/// -/// The first column has the block list and allow list -/// The second column tells if the result is least/exact/more (we repeat the discriminant twice) -/// The third column has the fragments covered bitmap in the first row and null in the second row -pub static INDEX_EXPR_RESULT_SCHEMA: LazyLock = LazyLock::new(|| { - Arc::new(Schema::new(vec![ - Field::new("result".to_string(), DataType::Binary, true), - Field::new("discriminant".to_string(), DataType::UInt32, true), - Field::new("fragments_covered".to_string(), DataType::Binary, true), - ])) -}); - -// `IndexExprResult` and `NullableIndexExprResult` themselves live in the -// `lance-select` crate so that benchmarks and downstream consumers can -// depend on the mask substrate without pulling in all of `lance-index`. -// The wire-format helpers below stay here, where the `INDEX_EXPR_RESULT_SCHEMA` -// constant they reference is defined. -pub use lance_select::{IndexExprResult, NullableIndexExprResult}; - impl From for NullableIndexExprResult { fn from(result: SearchResult) -> Self { match result { - SearchResult::Exact(mask) => Self::Exact(NullableRowAddrMask::AllowList(mask)), - SearchResult::AtMost(mask) => Self::AtMost(NullableRowAddrMask::AllowList(mask)), - SearchResult::AtLeast(mask) => Self::AtLeast(NullableRowAddrMask::AllowList(mask)), + SearchResult::Exact(mask) => Self::exact(NullableRowAddrMask::AllowList(mask)), + SearchResult::AtMost(mask) => Self::at_most(NullableRowAddrMask::AllowList(mask)), + SearchResult::AtLeast(mask) => Self::at_least(NullableRowAddrMask::AllowList(mask)), } } } -/// Parse an `IndexExprResult` from its serialized `(mask, discriminant)` -/// representation. Counterpart to [`serialize_index_expr_result`]. -pub fn index_expr_result_from_parts( - mask: RowAddrMask, - discriminant: u32, -) -> Result { - match discriminant { - 0 => Ok(IndexExprResult::Exact(mask)), - 1 => Ok(IndexExprResult::AtMost(mask)), - 2 => Ok(IndexExprResult::AtLeast(mask)), - _ => Err(Error::invalid_input_source( - format!("Invalid IndexExprResult discriminant: {}", discriminant).into(), - )), - } -} - -/// Serialize an `IndexExprResult` plus its applicable-fragments bitmap -/// into the `INDEX_EXPR_RESULT_SCHEMA` record-batch layout used to hand -/// scalar-index results to the read planner. -#[instrument(skip_all)] -pub fn serialize_index_expr_result( - result: &IndexExprResult, - fragments_covered_by_result: &RoaringBitmap, -) -> Result { - let row_addr_mask = result.row_addr_mask(); - let row_addr_mask_arr = row_addr_mask.into_arrow()?; - let discriminant = result.discriminant(); - let discriminant_arr = - Arc::new(UInt32Array::from(vec![discriminant, discriminant])) as Arc; - let mut fragments_covered_builder = BinaryBuilder::new(); - let fragments_covered_bytes_len = fragments_covered_by_result.serialized_size(); - let mut fragments_covered_bytes = Vec::with_capacity(fragments_covered_bytes_len); - fragments_covered_by_result.serialize_into(&mut fragments_covered_bytes)?; - fragments_covered_builder.append_value(fragments_covered_bytes); - fragments_covered_builder.append_null(); - let fragments_covered_arr = Arc::new(fragments_covered_builder.finish()) as Arc; - Ok(RecordBatch::try_new( - INDEX_EXPR_RESULT_SCHEMA.clone(), - vec![ - Arc::new(row_addr_mask_arr), - Arc::new(discriminant_arr), - Arc::new(fragments_covered_arr), - ], - )?) -} - impl ScalarIndexExpr { /// Evaluates the scalar index expression /// @@ -2023,11 +1951,14 @@ impl PlannerIndexExt for Planner { mod tests { use std::collections::HashMap; + use arrow_array::Array; use arrow_schema::{Field, Schema}; use chrono::Utc; use datafusion_common::{Column, DFSchema}; use datafusion_expr::simplify::SimplifyContext; use lance_datafusion::exec::{LanceExecutionOptions, get_session_context}; + use lance_select::result::IndexExprResultWireFormat; + use roaring::RoaringBitmap; use crate::scalar::json::{JsonQuery, JsonQueryParser}; @@ -2572,55 +2503,42 @@ mod tests { async fn test_not_flips_certainty() { use lance_select::{NullableRowAddrSet, RowAddrTreeMap}; - // Test that NOT flips certainty for inexact index results - // This tests the implementation in evaluate_impl for Self::Not - - // Helper function that mimics the NOT logic we just fixed - fn apply_not(result: NullableIndexExprResult) -> NullableIndexExprResult { - match result { - NullableIndexExprResult::Exact(mask) => NullableIndexExprResult::Exact(!mask), - NullableIndexExprResult::AtMost(mask) => NullableIndexExprResult::AtLeast(!mask), - NullableIndexExprResult::AtLeast(mask) => NullableIndexExprResult::AtMost(!mask), - } - } + // Test that NOT flips certainty for inexact index results. + // Under the {lower, upper} form, `!{l, u} = {!u, !l}`, which + // preserves the AtMost ↔ AtLeast swap and leaves Exact as Exact. // AtMost: superset of matches (e.g., bloom filter says "might be in [1,2]") - let at_most = NullableIndexExprResult::AtMost(NullableRowAddrMask::AllowList( + let at_most = NullableIndexExprResult::at_most(NullableRowAddrMask::AllowList( NullableRowAddrSet::new(RowAddrTreeMap::from_iter(&[1, 2]), RowAddrTreeMap::new()), )); // NOT(AtMost) should be AtLeast (definitely NOT in [1,2], might be elsewhere) - assert!(matches!( - apply_not(at_most), - NullableIndexExprResult::AtLeast(_) - )); + assert!((!at_most).is_at_least()); // AtLeast: subset of matches (e.g., definitely in [1,2], might be more) - let at_least = NullableIndexExprResult::AtLeast(NullableRowAddrMask::AllowList( + let at_least = NullableIndexExprResult::at_least(NullableRowAddrMask::AllowList( NullableRowAddrSet::new(RowAddrTreeMap::from_iter(&[1, 2]), RowAddrTreeMap::new()), )); // NOT(AtLeast) should be AtMost (might NOT be in [1,2], definitely elsewhere) - assert!(matches!( - apply_not(at_least), - NullableIndexExprResult::AtMost(_) - )); + assert!((!at_least).is_at_most()); // Exact should stay Exact - let exact = NullableIndexExprResult::Exact(NullableRowAddrMask::AllowList( + let exact = NullableIndexExprResult::exact(NullableRowAddrMask::AllowList( NullableRowAddrSet::new(RowAddrTreeMap::from_iter(&[1, 2]), RowAddrTreeMap::new()), )); - assert!(matches!( - apply_not(exact), - NullableIndexExprResult::Exact(_) - )); + assert!((!exact).is_exact()); } #[tokio::test] async fn test_and_or_preserve_certainty() { use lance_select::{NullableRowAddrSet, RowAddrTreeMap}; - // Test that AND/OR correctly propagate certainty + // Test that AND/OR correctly propagate certainty under the + // {lower, upper} algebra. Each binary op is elementwise on the + // endpoints, so degenerate shapes (Exact / AtMost / AtLeast) + // combine into a result that lands in one of those same shapes + // in every case exercised below. let make_at_most = || { - NullableIndexExprResult::AtMost(NullableRowAddrMask::AllowList( + NullableIndexExprResult::at_most(NullableRowAddrMask::AllowList( NullableRowAddrSet::new( RowAddrTreeMap::from_iter(&[1, 2, 3]), RowAddrTreeMap::new(), @@ -2629,7 +2547,7 @@ mod tests { }; let make_at_least = || { - NullableIndexExprResult::AtLeast(NullableRowAddrMask::AllowList( + NullableIndexExprResult::at_least(NullableRowAddrMask::AllowList( NullableRowAddrSet::new( RowAddrTreeMap::from_iter(&[2, 3, 4]), RowAddrTreeMap::new(), @@ -2638,59 +2556,87 @@ mod tests { }; let make_exact = || { - NullableIndexExprResult::Exact(NullableRowAddrMask::AllowList(NullableRowAddrSet::new( + NullableIndexExprResult::exact(NullableRowAddrMask::AllowList(NullableRowAddrSet::new( RowAddrTreeMap::from_iter(&[1, 2]), RowAddrTreeMap::new(), ))) }; // AtMost & AtMost → AtMost - assert!(matches!( - make_at_most() & make_at_most(), - NullableIndexExprResult::AtMost(_) - )); + assert!((make_at_most() & make_at_most()).is_at_most()); // AtLeast & AtLeast → AtLeast - assert!(matches!( - make_at_least() & make_at_least(), - NullableIndexExprResult::AtLeast(_) - )); + assert!((make_at_least() & make_at_least()).is_at_least()); - // AtMost & AtLeast → AtMost (superset remains superset) - assert!(matches!( - make_at_most() & make_at_least(), - NullableIndexExprResult::AtMost(_) - )); + // AtMost & AtLeast → AtMost (the lower side stays empty) + assert!((make_at_most() & make_at_least()).is_at_most()); // AtMost | AtMost → AtMost - assert!(matches!( - make_at_most() | make_at_most(), - NullableIndexExprResult::AtMost(_) - )); + assert!((make_at_most() | make_at_most()).is_at_most()); // AtLeast | AtLeast → AtLeast - assert!(matches!( - make_at_least() | make_at_least(), - NullableIndexExprResult::AtLeast(_) - )); + assert!((make_at_least() | make_at_least()).is_at_least()); - // AtMost | AtLeast → AtLeast (subset coverage guaranteed) - assert!(matches!( - make_at_most() | make_at_least(), - NullableIndexExprResult::AtLeast(_) - )); + // AtMost | AtLeast → AtLeast (upper stays universe) + assert!((make_at_most() | make_at_least()).is_at_least()); // Exact & AtMost → AtMost - assert!(matches!( - make_exact() & make_at_most(), - NullableIndexExprResult::AtMost(_) - )); + assert!((make_exact() & make_at_most()).is_at_most()); // Exact | AtLeast → AtLeast - assert!(matches!( - make_exact() | make_at_least(), - NullableIndexExprResult::AtLeast(_) - )); + assert!((make_exact() | make_at_least()).is_at_least()); + } + + /// The whole point of the `{lower, upper}` representation is that it + /// can express a Refined result — a non-empty `lower` strictly inside + /// a non-universe `upper` — which the old enum couldn't. This test + /// constructs one through the algebra and verifies the endpoints. + #[tokio::test] + async fn test_refined_result_constructed_through_algebra() { + use lance_select::{NullableRowAddrSet, RowAddrTreeMap}; + + let allow_set = |rows: &[u64]| { + NullableRowAddrMask::AllowList(NullableRowAddrSet::new( + RowAddrTreeMap::from_iter(rows), + RowAddrTreeMap::new(), + )) + }; + + // AtLeast({1,2}) & Exact({1,2,3}) is Refined, because: + // lower = {1,2} ∩ {1,2,3} = {1,2} (non-empty) + // upper = universe ∩ {1,2,3} = {1,2,3} (not universe) + // lower ≠ upper (not Exact) + let at_least_12 = NullableIndexExprResult::at_least(allow_set(&[1, 2])); + let exact_123 = NullableIndexExprResult::exact(allow_set(&[1, 2, 3])); + let refined = at_least_12 & exact_123; + + // None of the shape predicates should fire — that's what makes + // this a Refined result. + assert!( + !refined.is_exact(), + "Refined must not be classified as Exact" + ); + assert!( + !refined.is_at_most(), + "Refined must not be classified as AtMost" + ); + assert!( + !refined.is_at_least(), + "Refined must not be classified as AtLeast" + ); + + // Check the actual endpoints. + assert_eq!(refined.lower, allow_set(&[1, 2])); + assert_eq!(refined.upper, allow_set(&[1, 2, 3])); + + // NOT swaps the endpoints, preserving the Refined shape. + let negated = !refined; + assert!(!negated.is_exact()); + assert!(!negated.is_at_most()); + assert!(!negated.is_at_least()); + // !{l, u} = {!u, !l}. AllowList → BlockList. + assert!(matches!(negated.lower, NullableRowAddrMask::BlockList(_))); + assert!(matches!(negated.upper, NullableRowAddrMask::BlockList(_))); } #[test] @@ -3009,4 +2955,155 @@ mod tests { ); } } + + #[test] + fn test_serialize_index_expr_result_round_trip() { + use lance_select::{RowAddrMask, RowAddrTreeMap}; + + for format in [ + IndexExprResultWireFormat::TwoMask, + IndexExprResultWireFormat::ThreeVariant, + ] { + let mut addrs = RowAddrTreeMap::new(); + addrs.insert_range(0..5); + addrs.insert_range(100..103); + + let mut fragments_covered = RoaringBitmap::new(); + fragments_covered.insert(0); + fragments_covered.insert(7); + + let cases = [ + ( + "exact", + IndexExprResult::exact(RowAddrMask::from_allowed(addrs.clone())), + ), + ( + "at_most", + IndexExprResult::at_most(RowAddrMask::from_allowed(addrs.clone())), + ), + ( + "at_least", + IndexExprResult::at_least(RowAddrMask::from_allowed(addrs)), + ), + ]; + + for (label, original) in cases { + let batch = original.serialize(&fragments_covered, format).unwrap(); + assert_eq!( + batch.schema(), + *format.schema(), + "format {format:?}, case {label}" + ); + assert_eq!(batch.num_rows(), 2, "format {format:?}, case {label}"); + + let (round_tripped, round_tripped_frags) = + IndexExprResult::deserialize(&batch).unwrap(); + assert_eq!( + round_tripped.lower, original.lower, + "format {format:?}, case {label}: lower" + ); + assert_eq!( + round_tripped.upper, original.upper, + "format {format:?}, case {label}: upper" + ); + assert_eq!( + round_tripped_frags, fragments_covered, + "format {format:?}, case {label}: frags" + ); + assert_eq!( + round_tripped.is_exact(), + original.is_exact(), + "format {format:?}, case {label}" + ); + assert_eq!( + round_tripped.is_at_most(), + original.is_at_most(), + "format {format:?}, case {label}" + ); + assert_eq!( + round_tripped.is_at_least(), + original.is_at_least(), + "format {format:?}, case {label}" + ); + } + } + } + + /// Exact results encode `upper` as a fully-null column on the wire — the + /// payload only needs to ship once. `RowAddrMask::into_arrow` never + /// produces a fully-null array (it always sets exactly one of the two + /// rows), so the sentinel can't collide with a real mask. This pins + /// both halves: exact ⇒ upper fully null, non-exact ⇒ upper carries the + /// real mask. + #[test] + fn test_serialize_omits_upper_when_exact() { + use lance_select::{RowAddrMask, RowAddrTreeMap}; + + let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(0u64..5)); + let fragments_covered = RoaringBitmap::from_iter([0u32]); + + use arrow::array::AsArray; + + // Exact: upper column must be fully null on the wire. + let exact_batch = IndexExprResult::exact(mask.clone()) + .serialize(&fragments_covered, IndexExprResultWireFormat::TwoMask) + .unwrap(); + let exact_upper = exact_batch.column(1).as_binary::(); + assert!(exact_upper.is_null(0) && exact_upper.is_null(1)); + + // Non-exact (at_most): upper column must carry the upper mask, so at + // least one row is non-null (`AllowList(mask)` puts the payload at + // row 1). + let at_most_batch = IndexExprResult::at_most(mask.clone()) + .serialize(&fragments_covered, IndexExprResultWireFormat::TwoMask) + .unwrap(); + let at_most_upper = at_most_batch.column(1).as_binary::(); + assert!(!(at_most_upper.is_null(0) && at_most_upper.is_null(1))); + + // Non-exact (at_least): upper = all_rows, which `into_arrow` + // encodes as `BlockList(empty)` — row 0 holds the empty-tree bytes, + // row 1 is null. Round-trip must preserve `is_at_least`. + let at_least_batch = IndexExprResult::at_least(mask) + .serialize(&fragments_covered, IndexExprResultWireFormat::TwoMask) + .unwrap(); + let at_least_upper = at_least_batch.column(1).as_binary::(); + assert!(!at_least_upper.is_null(0)); + let (round_tripped, _) = IndexExprResult::deserialize(&at_least_batch).unwrap(); + assert!(round_tripped.is_at_least()); + assert!(!round_tripped.is_exact()); + } + + /// A refined `IndexExprResult` (`lower` strictly inside a non-universe + /// `upper`) has no legacy three-shape encoding. The serializer + /// must not error in that case — it must degrade to `AtMost(upper)` so + /// older read planners still see a valid superset and recheck. + #[test] + fn test_three_variant_serialize_refined_degrades_to_at_most() { + use lance_select::{RowAddrMask, RowAddrTreeMap}; + + let lower_addrs = RowAddrTreeMap::from_iter(0u64..3); + let upper_addrs = RowAddrTreeMap::from_iter(0u64..10); + let refined = IndexExprResult::new( + RowAddrMask::from_allowed(lower_addrs), + RowAddrMask::from_allowed(upper_addrs.clone()), + ); + assert!(!refined.is_exact() && !refined.is_at_most() && !refined.is_at_least()); + + let fragments_covered = RoaringBitmap::from_iter([0u32, 1]); + + let batch = refined + .serialize(&fragments_covered, IndexExprResultWireFormat::ThreeVariant) + .unwrap(); + assert_eq!( + batch.schema(), + *IndexExprResultWireFormat::ThreeVariant.schema() + ); + + // Discriminant 1 == AtMost; the round-tripped result carries the + // original `upper` as the AtMost mask (empty lower, upper = upper). + let (round_tripped, round_tripped_frags) = IndexExprResult::deserialize(&batch).unwrap(); + assert!(round_tripped.is_at_most()); + assert_eq!(round_tripped.upper, RowAddrMask::from_allowed(upper_addrs)); + assert_eq!(round_tripped_frags, fragments_covered); + } } diff --git a/rust/lance-select/Cargo.toml b/rust/lance-select/Cargo.toml index c7deb6f8894..4d72b55b5e2 100644 --- a/rust/lance-select/Cargo.toml +++ b/rust/lance-select/Cargo.toml @@ -14,7 +14,9 @@ description = "Row-selection masks and index-result algebra for Lance" [dependencies] arrow-array = { workspace = true } arrow-buffer = { workspace = true } +arrow-schema = { workspace = true } byteorder = { workspace = true } +tracing = { workspace = true } bytes = { workspace = true } deepsize = { workspace = true } itertools = { workspace = true } diff --git a/rust/lance-select/benches/index_expr_result.rs b/rust/lance-select/benches/index_expr_result.rs index 6e37e7217ad..8d9a86c12ba 100644 --- a/rust/lance-select/benches/index_expr_result.rs +++ b/rust/lance-select/benches/index_expr_result.rs @@ -52,9 +52,9 @@ fn bench_not(c: &mut Criterion) { for label in ["Exact", "AtMost", "AtLeast"] { let id = BenchmarkId::new(label, n); let make = move || match label { - "Exact" => NullableIndexExprResult::Exact(allow_run(n)), - "AtMost" => NullableIndexExprResult::AtMost(allow_run(n)), - "AtLeast" => NullableIndexExprResult::AtLeast(allow_run(n)), + "Exact" => NullableIndexExprResult::exact(allow_run(n)), + "AtMost" => NullableIndexExprResult::at_most(allow_run(n)), + "AtLeast" => NullableIndexExprResult::at_least(allow_run(n)), _ => unreachable!(), }; group.bench_function(id, |b| { @@ -75,33 +75,35 @@ type PairFn = Box (NullableIndexExprResult, NullableIndexExprResult) /// cases that today drop information (Exact/AtMost, Exact/AtLeast, /// AtMost/AtLeast). fn pair_cases(n: u64) -> Vec<(&'static str, PairFn)> { - use NullableIndexExprResult::*; let lhs = move || allow_run(n); let rhs = move || allow_middle(n); + let exact = |m| NullableIndexExprResult::exact(m); + let at_most = |m| NullableIndexExprResult::at_most(m); + let at_least = |m| NullableIndexExprResult::at_least(m); vec![ ( "Exact_Exact", - Box::new(move || (Exact(lhs()), Exact(rhs()))), + Box::new(move || (exact(lhs()), exact(rhs()))), ), ( "AtMost_AtMost", - Box::new(move || (AtMost(lhs()), AtMost(rhs()))), + Box::new(move || (at_most(lhs()), at_most(rhs()))), ), ( "AtLeast_AtLeast", - Box::new(move || (AtLeast(lhs()), AtLeast(rhs()))), + Box::new(move || (at_least(lhs()), at_least(rhs()))), ), ( "Exact_AtMost", - Box::new(move || (Exact(lhs()), AtMost(rhs()))), + Box::new(move || (exact(lhs()), at_most(rhs()))), ), ( "Exact_AtLeast", - Box::new(move || (Exact(lhs()), AtLeast(rhs()))), + Box::new(move || (exact(lhs()), at_least(rhs()))), ), ( "AtMost_AtLeast", - Box::new(move || (AtMost(lhs()), AtLeast(rhs()))), + Box::new(move || (at_most(lhs()), at_least(rhs()))), ), ] } diff --git a/rust/lance-select/src/mask/nullable.rs b/rust/lance-select/src/mask/nullable.rs index 81615ba64b0..f76838170f3 100644 --- a/rust/lance-select/src/mask/nullable.rs +++ b/rust/lance-select/src/mask/nullable.rs @@ -126,13 +126,23 @@ impl std::ops::BitOrAssign<&Self> for NullableRowAddrSet { /// This mask handles three-valued logic for SQL expressions, where a filter can /// evaluate to TRUE, FALSE, or NULL. The `selected` set includes rows that are /// TRUE or NULL. The `nulls` set includes rows that are NULL. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub enum NullableRowAddrMask { AllowList(NullableRowAddrSet), BlockList(NullableRowAddrSet), } impl NullableRowAddrMask { + /// A mask that matches no rows (TRUE = ∅, NULL = ∅). + pub fn allow_nothing() -> Self { + Self::AllowList(NullableRowAddrSet::empty()) + } + + /// A mask that matches every row (TRUE = universe, NULL = ∅). + pub fn all_rows() -> Self { + Self::BlockList(NullableRowAddrSet::empty()) + } + pub fn selected(&self, row_id: u64) -> bool { match self { Self::AllowList(NullableRowAddrSet { selected, nulls }) => { diff --git a/rust/lance-select/src/result.rs b/rust/lance-select/src/result.rs index 759c139a0fd..7b33eda2e4e 100644 --- a/rust/lance-select/src/result.rs +++ b/rust/lance-select/src/result.rs @@ -1,51 +1,166 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -//! Certainty-tagged wrappers around a row-address mask returned by a +//! Interval-shaped wrappers around a row-address mask returned by a //! scalar-index expression evaluation. //! -//! These types model the three possible degrees of knowledge an index -//! search can return: +//! Each result describes a closed interval `[lower, upper]` in the +//! lattice of subsets: //! -//! * [`Exact`] — the mask is the precise answer; no recheck needed. -//! * [`AtMost`] — the mask is a *superset* of the true answer; the rows -//! inside the mask must be rechecked against the predicate. -//! * [`AtLeast`] — the mask is a *subset* of the true answer; the rows -//! outside the mask must be rechecked against the predicate. +//! * `lower` — rows the index *guarantees* are in the answer. +//! * `upper` — rows that *might* be in the answer; rows outside `upper` +//! are guaranteed not in the answer. //! -//! The boolean algebra (`Not`/`BitAnd`/`BitOr`) is implemented on both -//! [`NullableIndexExprResult`] (the form during evaluation, carrying SQL -//! three-valued logic via [`NullableRowAddrMask`]) and -//! [`IndexExprResult`] (the form consumed by the read planner, after -//! `drop_nulls` collapses NULL rows into FALSE). +//! The three pre-existing "shapes" map onto degenerate intervals: //! -//! [`Exact`]: IndexExprResult::Exact -//! [`AtMost`]: IndexExprResult::AtMost -//! [`AtLeast`]: IndexExprResult::AtLeast - -use crate::mask::{NullableRowAddrMask, RowAddrMask}; - -/// Result of an index search before NULL rows are dropped. Carries -/// three-valued-logic information via [`NullableRowAddrMask`]. -#[derive(Debug)] -pub enum NullableIndexExprResult { - Exact(NullableRowAddrMask), - AtMost(NullableRowAddrMask), - AtLeast(NullableRowAddrMask), +//! | Old variant | Interval form | +//! |-------------|----------------------------------------| +//! | `Exact(m)` | `{lower: m, upper: m}` | +//! | `AtMost(m)` | `{lower: allow_nothing(), upper: m}` | +//! | `AtLeast(m)`| `{lower: m, upper: all_rows()}` | +//! +//! Use [`IndexExprResult::exact`] / [`IndexExprResult::at_most`] / +//! [`IndexExprResult::at_least`] to construct those shapes, and the +//! matching [`IndexExprResult::is_exact`] etc. predicates to inspect +//! them. Intervals that are neither (the "Refined" case — a non-empty +//! `lower` strictly inside a non-universe `upper`) arise from indices +//! that can distinguish guaranteed-match from candidate-match rows +//! within a single search (e.g. a zone map answering `IS NOT NULL`). +//! +//! The boolean algebra (`Not` / `BitAnd` / `BitOr`) is elementwise on +//! the endpoints: +//! +//! ```text +//! !{l, u} = {!u, !l} +//! {l1, u1} & {l2, u2} = {l1 & l2, u1 & u2} +//! {l1, u1} | {l2, u2} = {l1 | l2, u1 | u2} +//! ``` +//! +//! This works for both the post-`drop_nulls` form ([`IndexExprResult`], +//! backed by [`RowAddrMask`]) and the during-evaluation form +//! ([`NullableIndexExprResult`], backed by [`NullableRowAddrMask`]) — +//! the per-endpoint algebra already implements two-valued and SQL +//! three-valued logic correctly inside each mask type. + +use std::sync::{Arc, LazyLock}; + +use arrow_array::{Array, RecordBatch, UInt32Array, builder::BinaryBuilder}; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use roaring::RoaringBitmap; + +use lance_core::{Error, Result}; + +use crate::mask::{NullableRowAddrMask, RowAddrMask, RowSetOps}; + +/// Result of an index search before NULL rows are dropped. Each endpoint +/// is a [`NullableRowAddrMask`] carrying SQL three-valued logic info. +#[derive(Debug, Clone)] +pub struct NullableIndexExprResult { + /// Rows the index *guarantees* are TRUE. + pub lower: NullableRowAddrMask, + /// Rows that may be TRUE. Rows outside `upper` are guaranteed to be + /// FALSE / NULL (and so not in a `WHERE` answer set). + pub upper: NullableRowAddrMask, + // O(1) cache for is_exact(). Set by constructors and propagated + // elementwise through the boolean algebra. + exact: bool, +} + +impl NullableIndexExprResult { + /// Precise result — every row in `mask` is in the answer and every + /// row outside is not. Equivalent to the old `Exact` variant. + pub fn exact(mask: NullableRowAddrMask) -> Self { + Self { + lower: mask.clone(), + upper: mask, + exact: true, + } + } + + /// Upper-bound-only result — rows outside `mask` are guaranteed not + /// to match; rows inside may match and require a recheck. + /// Equivalent to the old `AtMost` variant. + pub fn at_most(mask: NullableRowAddrMask) -> Self { + Self { + lower: NullableRowAddrMask::allow_nothing(), + upper: mask, + exact: false, + } + } + + /// Lower-bound-only result — rows in `mask` are guaranteed to match; + /// rows outside may match too and require a recheck. Equivalent to + /// the old `AtLeast` variant. + pub fn at_least(mask: NullableRowAddrMask) -> Self { + Self { + lower: mask, + upper: NullableRowAddrMask::all_rows(), + exact: false, + } + } + + /// True if the result is exact — the answer is precisely the lower + /// (== upper) mask. + /// + /// This is a **structural** check on the canonical form produced by + /// the constructors / algebra: an `Exact(m)` built with + /// [`Self::exact`] holds equal masks, and elementwise `&` / `|` / `!` + /// preserve that. It is not a semantic emptiness test — a + /// hand-constructed `IndexExprResult` whose endpoints are + /// representationally distinct but semantically equal (e.g. + /// `AllowList(universe)` vs `BlockList(empty)`) will report + /// `is_exact() == false`. All in-tree code paths construct results + /// through the canonical builders, so this is sound in practice. + /// + /// The three shape predicates are not mutually exclusive — see the + /// note on [`Self::is_at_least`] for the precedence convention. + pub fn is_exact(&self) -> bool { + self.exact + } + + /// True if `lower` matches no rows (canonical `AllowList(∅)`) — the + /// index gives only an upper bound on the answer. + /// + /// Like [`Self::is_exact`], this is a structural check on the + /// canonical form. See that doc for the caveat. + pub fn is_at_most(&self) -> bool { + matches!(&self.lower, NullableRowAddrMask::AllowList(set) if set.is_empty()) + } + + /// True if `upper` covers every row (canonical `BlockList(∅)`) — the + /// index gives only a lower bound on the answer. + /// + /// **Precedence convention** for consumers branching on shape: check + /// [`Self::is_exact`] *first* (Exact-of-empty satisfies both + /// `is_exact` and `is_at_most`; Exact-of-universe satisfies both + /// `is_exact` and `is_at_least`); then `is_at_least`; finally treat + /// the residual as `is_at_most` or Refined. The branches in + /// `filtered_read::apply_index_to_fragment` follow this order. + pub fn is_at_least(&self) -> bool { + matches!(&self.upper, NullableRowAddrMask::BlockList(set) if set.is_empty()) + } + + /// Project NULL rows out of the result. + /// + /// Under a `WHERE` clause NULL is treated as FALSE, so `drop_nulls` + /// folds NULL rows out of the answer at each endpoint. + pub fn drop_nulls(self) -> IndexExprResult { + IndexExprResult { + lower: self.lower.drop_nulls(), + upper: self.upper.drop_nulls(), + exact: self.exact, + } + } } impl std::ops::Not for NullableIndexExprResult { type Output = Self; fn not(self) -> Self { - // Flip certainty: NOT(AtMost) → AtLeast, NOT(AtLeast) → AtMost. - // NULL info is preserved by `NullableRowAddrMask::not` (it flips - // AllowList ↔ BlockList without touching the `nulls` field), which - // is the 3VL-correct negation: TRUE↔FALSE swap, NULL stays NULL. - match self { - Self::Exact(mask) => Self::Exact(!mask), - Self::AtMost(mask) => Self::AtLeast(!mask), - Self::AtLeast(mask) => Self::AtMost(!mask), + Self { + lower: !self.upper, + upper: !self.lower, + exact: self.exact, } } } @@ -54,22 +169,10 @@ impl std::ops::BitAnd for NullableIndexExprResult { type Output = Self; fn bitand(self, rhs: Self) -> Self { - match (self, rhs) { - (Self::Exact(lhs), Self::Exact(rhs)) => Self::Exact(lhs & rhs), - (Self::Exact(lhs), Self::AtMost(rhs)) | (Self::AtMost(lhs), Self::Exact(rhs)) => { - Self::AtMost(lhs & rhs) - } - (Self::Exact(exact), Self::AtLeast(_)) | (Self::AtLeast(_), Self::Exact(exact)) => { - // We could do better here, elements in both lhs and rhs are known - // to be true and don't require a recheck. We only need to recheck - // elements in lhs that are not in rhs - Self::AtMost(exact) - } - (Self::AtMost(lhs), Self::AtMost(rhs)) => Self::AtMost(lhs & rhs), - (Self::AtLeast(lhs), Self::AtLeast(rhs)) => Self::AtLeast(lhs & rhs), - (Self::AtMost(most), Self::AtLeast(_)) | (Self::AtLeast(_), Self::AtMost(most)) => { - Self::AtMost(most) - } + Self { + lower: self.lower & rhs.lower, + upper: self.upper & rhs.upper, + exact: self.exact && rhs.exact, } } } @@ -78,74 +181,305 @@ impl std::ops::BitOr for NullableIndexExprResult { type Output = Self; fn bitor(self, rhs: Self) -> Self { - match (self, rhs) { - (Self::Exact(lhs), Self::Exact(rhs)) => Self::Exact(lhs | rhs), - (Self::Exact(lhs), Self::AtMost(rhs)) | (Self::AtMost(rhs), Self::Exact(lhs)) => { - // We could do better here, elements in lhs are known to be true - // and don't require a recheck. We only need to recheck elements - // in rhs that are not in lhs - Self::AtMost(lhs | rhs) - } - (Self::Exact(lhs), Self::AtLeast(rhs)) | (Self::AtLeast(rhs), Self::Exact(lhs)) => { - Self::AtLeast(lhs | rhs) - } - (Self::AtMost(lhs), Self::AtMost(rhs)) => Self::AtMost(lhs | rhs), - (Self::AtLeast(lhs), Self::AtLeast(rhs)) => Self::AtLeast(lhs | rhs), - (Self::AtMost(_), Self::AtLeast(least)) | (Self::AtLeast(least), Self::AtMost(_)) => { - Self::AtLeast(least) - } + Self { + lower: self.lower | rhs.lower, + upper: self.upper | rhs.upper, + exact: self.exact && rhs.exact, } } } -impl NullableIndexExprResult { - /// Project NULL rows out of the result. +/// Result of an index search after NULL rows have been dropped. This is +/// what the read planner consumes. +#[derive(Debug, Clone)] +pub struct IndexExprResult { + /// Rows the index *guarantees* are in the answer. + pub lower: RowAddrMask, + /// Rows that may be in the answer. Rows outside `upper` are + /// guaranteed not in the answer. + pub upper: RowAddrMask, + // O(1) cache for is_exact(). Set by constructors and propagated + // elementwise through the boolean algebra. + exact: bool, +} + +impl IndexExprResult { + /// Precise result — every row in `mask` is in the answer and every + /// row outside is not. Equivalent to the old `Exact` variant. + pub fn exact(mask: RowAddrMask) -> Self { + Self { + lower: mask.clone(), + upper: mask, + exact: true, + } + } + + /// Upper-bound-only result. Equivalent to the old `AtMost` variant. + pub fn at_most(mask: RowAddrMask) -> Self { + Self { + lower: RowAddrMask::allow_nothing(), + upper: mask, + exact: false, + } + } + + /// Lower-bound-only result. Equivalent to the old `AtLeast` variant. + pub fn at_least(mask: RowAddrMask) -> Self { + Self { + lower: mask, + upper: RowAddrMask::all_rows(), + exact: false, + } + } + + /// Construct a refined interval result — `lower` rows are guaranteed + /// matches and `upper` rows are candidates, with `lower ⊆ upper`. /// - /// Under a `WHERE` clause, NULL is treated as FALSE — so `drop_nulls` - /// removes them from `AllowList`s (NULL rows are not selected) and - /// folds them into `BlockList`s (NULL rows are still blocked). - pub fn drop_nulls(self) -> IndexExprResult { - match self { - Self::Exact(mask) => IndexExprResult::Exact(mask.drop_nulls()), - Self::AtMost(mask) => IndexExprResult::AtMost(mask.drop_nulls()), - Self::AtLeast(mask) => IndexExprResult::AtLeast(mask.drop_nulls()), + /// Use [`Self::exact`] / [`Self::at_most`] / [`Self::at_least`] for + /// the three degenerate shapes; this constructor is only needed when + /// both endpoints are non-trivial. + pub fn new(lower: RowAddrMask, upper: RowAddrMask) -> Self { + Self { + lower, + upper, + exact: false, } } + + /// True if the result is exact — the answer is precisely the lower + /// (== upper) mask. See [`NullableIndexExprResult::is_exact`] for the + /// structural-form caveat and the precedence convention shared with + /// [`Self::is_at_most`] / [`Self::is_at_least`]. + pub fn is_exact(&self) -> bool { + self.exact + } + + /// True if `lower` matches no rows (canonical `AllowList(∅)`) — the + /// index gives only an upper bound on the answer. See + /// [`NullableIndexExprResult::is_exact`] for caveats. + pub fn is_at_most(&self) -> bool { + matches!(&self.lower, RowAddrMask::AllowList(set) if set.is_empty()) + } + + /// True if `upper` covers every row (canonical `BlockList(∅)`) — the + /// index gives only a lower bound on the answer. See + /// [`NullableIndexExprResult::is_at_least`] for the precedence + /// convention consumers should follow. + pub fn is_at_least(&self) -> bool { + matches!(&self.upper, RowAddrMask::BlockList(set) if set.is_empty()) + } } -/// Result of an index search after NULL rows have been dropped. This is -/// what the read planner consumes. -#[derive(Debug)] -pub enum IndexExprResult { - /// The answer is exactly the rows in the allow list minus the rows - /// in the block list. - Exact(RowAddrMask), - /// The answer is at most the rows in the allow list minus the rows - /// in the block list. Some of the rows in the allow list may not be - /// in the result and will need to be filtered by a recheck. Every - /// row in the block list is definitely not in the result. - AtMost(RowAddrMask), - /// The answer is at least the rows in the allow list minus the rows - /// in the block list. Some of the rows in the block list might be in - /// the result. Every row in the allow list is definitely in the - /// result. - AtLeast(RowAddrMask), +impl std::ops::Not for IndexExprResult { + type Output = Self; + + fn not(self) -> Self { + Self { + lower: !self.upper, + upper: !self.lower, + exact: self.exact, + } + } } -impl IndexExprResult { - pub fn row_addr_mask(&self) -> &RowAddrMask { - match self { - Self::Exact(mask) => mask, - Self::AtMost(mask) => mask, - Self::AtLeast(mask) => mask, +impl std::ops::BitAnd for IndexExprResult { + type Output = Self; + + fn bitand(self, rhs: Self) -> Self { + Self { + lower: self.lower & rhs.lower, + upper: self.upper & rhs.upper, + exact: self.exact && rhs.exact, + } + } +} + +impl std::ops::BitOr for IndexExprResult { + type Output = Self; + + fn bitor(self, rhs: Self) -> Self { + Self { + lower: self.lower | rhs.lower, + upper: self.upper | rhs.upper, + exact: self.exact && rhs.exact, } } +} + +static TWO_MASK_RESULT_SCHEMA: LazyLock = LazyLock::new(|| { + Arc::new(Schema::new(vec![ + Field::new("lower", DataType::Binary, true), + Field::new("upper", DataType::Binary, true), + Field::new("fragments_covered", DataType::Binary, true), + ])) +}); + +static THREE_VARIANT_RESULT_SCHEMA: LazyLock = LazyLock::new(|| { + Arc::new(Schema::new(vec![ + Field::new("result".to_string(), DataType::Binary, true), + Field::new("discriminant".to_string(), DataType::UInt32, true), + Field::new("fragments_covered".to_string(), DataType::Binary, true), + ])) +}); + +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] +pub enum IndexExprResultWireFormat { + ThreeVariant, // A legacy format that used AtMost/AtLeast/Exact variants + #[default] + TwoMask, // The two-mask format with upper and lower +} - pub fn discriminant(&self) -> u32 { +impl IndexExprResultWireFormat { + pub fn schema(&self) -> &SchemaRef { match self { - Self::Exact(_) => 0, - Self::AtMost(_) => 1, - Self::AtLeast(_) => 2, + Self::ThreeVariant => &THREE_VARIANT_RESULT_SCHEMA, + Self::TwoMask => &TWO_MASK_RESULT_SCHEMA, + } + } +} + +impl IndexExprResult { + /// Serialize into the `INDEX_EXPR_RESULT_SCHEMA` record-batch layout used to + /// hand scalar-index results to the read planner. + #[tracing::instrument(skip_all)] + fn serialize_standard(&self, fragments_covered: &RoaringBitmap) -> Result { + let lower_arr = self.lower.into_arrow()?; + let upper_arr = if self.is_exact() { + let mut b = BinaryBuilder::new(); + b.append_null(); + b.append_null(); + b.finish() + } else { + self.upper.into_arrow()? + }; + let mut frags_builder = BinaryBuilder::new(); + let mut frags_bytes = Vec::with_capacity(fragments_covered.serialized_size()); + fragments_covered.serialize_into(&mut frags_bytes)?; + frags_builder.append_value(frags_bytes); + frags_builder.append_null(); + Ok(RecordBatch::try_new( + TWO_MASK_RESULT_SCHEMA.clone(), + vec![ + Arc::new(lower_arr), + Arc::new(upper_arr), + Arc::new(frags_builder.finish()) as Arc, + ], + )?) + } + + /// Serialize into the legacy three-variant record-batch layout. + /// + /// Refined intervals (a non-empty `lower` strictly inside a non-universe `upper`) + /// cannot be represented in the legacy encoding and are degraded to `AtMost(upper)`. + fn serialize_three_variant(&self, fragments_covered: &RoaringBitmap) -> Result { + let (mask, discriminant) = if self.is_exact() { + (&self.lower, 0u32) + } else if self.is_at_most() { + (&self.upper, 1) + } else if self.is_at_least() { + (&self.lower, 2) + } else { + tracing::warn!( + "Legacy serialization of refined index-expr result: degrading to AtMost(upper); \ + answer will remain correct but query will be more expensive" + ); + (&self.upper, 1) + }; + let mask_arr = mask.into_arrow()?; + let discriminant_arr = + Arc::new(UInt32Array::from(vec![discriminant, discriminant])) as Arc; + let mut frags_builder = BinaryBuilder::new(); + let mut frags_bytes = Vec::with_capacity(fragments_covered.serialized_size()); + fragments_covered.serialize_into(&mut frags_bytes)?; + frags_builder.append_value(frags_bytes); + frags_builder.append_null(); + Ok(RecordBatch::try_new( + THREE_VARIANT_RESULT_SCHEMA.clone(), + vec![ + Arc::new(mask_arr), + discriminant_arr, + Arc::new(frags_builder.finish()) as Arc, + ], + )?) + } + + pub fn serialize( + &self, + fragments_covered: &RoaringBitmap, + format: IndexExprResultWireFormat, + ) -> Result { + match format { + IndexExprResultWireFormat::ThreeVariant => { + self.serialize_three_variant(fragments_covered) + } + IndexExprResultWireFormat::TwoMask => self.serialize_standard(fragments_covered), + } + } + + /// Deserialize from a record batch produced by [`Self::serialize`]. + pub fn deserialize(batch: &RecordBatch) -> Result<(Self, RoaringBitmap)> { + use arrow_array::cast::AsArray; + + if batch.num_rows() != 2 { + return Err(Error::invalid_input_source( + format!( + "Expected a batch with exactly 2 rows but there are {} rows", + batch.num_rows() + ) + .into(), + )); + } + if batch.num_columns() != 3 { + return Err(Error::invalid_input_source( + format!( + "Expected a batch with exactly three columns but there are {} columns", + batch.num_columns() + ) + .into(), + )); } + + let first_col_name = batch.schema().field(0).name().clone(); + let index_result = if first_col_name == "lower" { + let lower = RowAddrMask::from_arrow(batch.column(0).as_binary())?; + let upper_col = batch.column(1).as_binary::(); + if upper_col.is_null(0) && upper_col.is_null(1) { + // Null upper column is the serialized form of an exact result. + Self::exact(lower) + } else { + let upper = RowAddrMask::from_arrow(upper_col)?; + Self { + lower, + upper, + exact: false, + } + } + } else if first_col_name == "result" { + let row_addr_mask = RowAddrMask::from_arrow(batch.column(0).as_binary())?; + let match_type = batch + .column(1) + .as_primitive::() + .values()[0]; + if match_type == 0 { + Self::exact(row_addr_mask) + } else if match_type == 1 { + Self::at_most(row_addr_mask) + } else if match_type == 2 { + Self::at_least(row_addr_mask) + } else { + return Err(Error::internal(format!( + "Unexpected match type: {match_type}" + ))); + } + } else { + return Err(Error::internal(format!( + "Unexpected column name: {first_col_name}" + ))); + }; + + let frags_col = batch.column(2).as_binary::(); + let fragments = RoaringBitmap::deserialize_from(frags_col.value(0))?; + + Ok((index_result, fragments)) } } diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index d0f5983b3ce..a01e34d28a1 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -4,6 +4,7 @@ use std::collections::HashSet; use datafusion::config::ConfigOptions; +use lance_select::result::IndexExprResultWireFormat; use std::ops::Range; use std::pin::Pin; use std::sync::{Arc, LazyLock}; @@ -65,10 +66,8 @@ use lance_datafusion::projection::ProjectionPlan; use lance_file::reader::FileReaderOptions; use lance_index::IndexCriteria; use lance_index::scalar::FullTextSearchQuery; +use lance_index::scalar::expression::PlannerIndexExt; use lance_index::scalar::expression::ScalarIndexExpr; -use lance_index::scalar::expression::{ - INDEX_EXPR_RESULT_SCHEMA, IndexExprResult, PlannerIndexExt, serialize_index_expr_result, -}; use lance_index::scalar::inverted::query::{ FtsQuery, FtsQueryNode, FtsSearchParams, MatchQuery, PhraseQuery, fill_fts_query_column, }; @@ -77,7 +76,7 @@ use lance_index::vector::{DEFAULT_QUERY_PARALLELISM, DIST_COL, Query}; use lance_index::{metrics::NoOpMetricsCollector, scalar::inverted::FTS_SCHEMA}; use lance_io::stream::RecordBatchStream; use lance_linalg::distance::MetricType; -use lance_select::{RowAddrMask, RowAddrTreeMap}; +use lance_select::{IndexExprResult, RowAddrMask, RowAddrTreeMap}; use lance_table::format::{Fragment, IndexMetadata}; use roaring::RoaringBitmap; use tracing::{Span, info_span, instrument}; @@ -106,7 +105,10 @@ use crate::io::exec::{ }, project, }; -use crate::io::exec::{AddRowOffsetExec, LanceFilterExec, LanceScanConfig, get_physical_optimizer}; +use crate::io::exec::{ + AddRowOffsetExec, LANCE_RELATIONAL_ALGEBRA_VERSION, LanceFilterExec, LanceScanConfig, + get_physical_optimizer, +}; use crate::{Error, Result}; use crate::{ datatypes::Schema, @@ -824,6 +826,9 @@ pub struct Scanner { aggregate: Option, + /// Which version of the relational algebra to use when generating the physical plan + relational_algebra_version: u32, + // Legacy fields to help migrate some old projection behavior to new behavior // // There are two behaviors we are moving away from: @@ -1047,6 +1052,7 @@ impl Scanner { legacy_with_row_id: false, explicit_projection: false, autoproject_scoring_columns: true, + relational_algebra_version: LANCE_RELATIONAL_ALGEBRA_VERSION, }; scanner.apply_blob_handling(); scanner @@ -2795,6 +2801,15 @@ impl Scanner { }) } + fn index_expr_result_format(&self) -> IndexExprResultWireFormat { + if self.relational_algebra_version > 1 { + IndexExprResultWireFormat::TwoMask + } else { + // In version 1 we used the legacy three-variant format for index expr results + IndexExprResultWireFormat::ThreeVariant + } + } + // Helper function for filtered_read // // Do not call this directly, use filtered_read instead @@ -2842,9 +2857,13 @@ impl Scanner { read_options = read_options.with_only_indexed_fragments(); } + let result_format = self.index_expr_result_format(); let index_input = filter_plan.index_query.clone().map(|index_query| { - Arc::new(ScalarIndexExec::new(self.dataset.clone(), index_query)) - as Arc + Arc::new(ScalarIndexExec::new( + self.dataset.clone(), + index_query, + result_format, + )) as Arc }); Ok(Arc::new(FilteredReadExec::try_new( @@ -2899,14 +2918,13 @@ impl Scanner { fn u64s_as_take_input(&self, u64s: Vec) -> Result> { let row_addrs = RowAddrTreeMap::from_iter(u64s); let row_addr_mask = RowAddrMask::from_allowed(row_addrs); - let index_result = IndexExprResult::Exact(row_addr_mask); + let index_result = IndexExprResult::exact(row_addr_mask); let fragments_covered = self.dataset.fragment_bitmap.as_ref().clone(); - let batch = serialize_index_expr_result(&index_result, &fragments_covered)?; + let format = self.index_expr_result_format(); + let batch = index_result.serialize(&fragments_covered, format)?; + let schema = batch.schema(); let stream = futures::stream::once(async move { Ok(batch) }); - let stream = Box::pin(RecordBatchStreamAdapter::new( - INDEX_EXPR_RESULT_SCHEMA.clone(), - stream, - )); + let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream)); Ok(Arc::new(OneShotExec::new(stream))) } @@ -4728,13 +4746,14 @@ impl Scanner { if missing_frags.is_empty() || self.fast_search { log::trace!("prefilter entirely satisfied by exact index search"); + let result_format = self.index_expr_result_format(); // We can only avoid materializing the index for a prefilter if: // 1. The search is indexed // 2. The index search is an exact search with no recheck or refine // 3. The indices cover at least the same fragments as the vector index, // unless fast_search allows skipping uncovered fragments. return Ok(PreFilterSource::ScalarIndexQuery(Arc::new( - ScalarIndexExec::new(self.dataset.clone(), index_query.clone()), + ScalarIndexExec::new(self.dataset.clone(), index_query.clone(), result_format), ))); } else { log::trace!("exact index search did not cover all fragments"); diff --git a/rust/lance/src/io/exec.rs b/rust/lance/src/io/exec.rs index 0b93e3c2834..f06f575de68 100644 --- a/rust/lance/src/io/exec.rs +++ b/rust/lance/src/io/exec.rs @@ -37,3 +37,19 @@ pub use rowids::{AddRowAddrExec, AddRowOffsetExec}; pub use scan::{LanceScanConfig, LanceScanExec}; pub use take::TakeExec; pub use utils::PreFilterSource; + +/// Declares which vesrion of the relational algebra we are producing +/// +/// In order to enable plan pushdown (executing parts of the physical plan on different nodes) +/// we must treat the physical plan serialization and the inputs and outputs of phyical plan +/// nodes as part of the public API surface. +/// +/// We should attempt to handle as many versions as possible on read paths. This variable +/// controls which versions we support producing. +/// +/// This includes changes to the proto serialization of physical plan nodes themselves or +/// changes to the format of the inputs or outputs of the nodes. +/// +/// This does not include changes to the behavior of the nodes unless that behavior is a change +/// in semantic meaning. +pub const LANCE_RELATIONAL_ALGEBRA_VERSION: u32 = 1; diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index 0b83c000ef0..c491e3f194b 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -7,8 +7,6 @@ use std::sync::Mutex; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::{ops::Range, sync::Arc}; -use arrow::array::AsArray; -use arrow::datatypes::UInt32Type; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; use datafusion::common::runtime::SpawnedTask; @@ -40,10 +38,10 @@ use lance_datafusion::utils::{ ROWS_SCANNED_METRIC, TASK_WAIT_TIME_METRIC, }; use lance_file::reader::FileReaderOptions; -use lance_index::scalar::expression::{FilterPlan, IndexExprResult, index_expr_result_from_parts}; +use lance_index::scalar::expression::FilterPlan; use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; use lance_select::{ - RowAddrMask, RowAddrSelection, RowAddrTreeMap, bitmap_to_ranges, ranges_to_bitmap, + IndexExprResult, RowAddrSelection, RowAddrTreeMap, bitmap_to_ranges, ranges_to_bitmap, }; use lance_table::format::Fragment; use lance_table::rowids::RowIdSequence; @@ -81,30 +79,7 @@ impl EvaluatedIndex { } pub fn try_from_arrow(batch: &RecordBatch) -> Result { - if batch.num_rows() != 2 { - return Err(Error::invalid_input_source( - format!( - "Expected a batch with exactly 2 rows but there are {} rows", - batch.num_rows() - ) - .into(), - )); - } - if batch.num_columns() != 3 { - return Err(Error::invalid_input_source( - format!( - "Expected a batch with exactly two columns but there are {} columns", - batch.num_columns() - ) - .into(), - )); - } - let row_addr_mask = RowAddrMask::from_arrow(batch.column(0).as_binary())?; - let match_type = batch.column(1).as_primitive::().values()[0]; - let index_result = index_expr_result_from_parts(row_addr_mask, match_type)?; - - let applicable_fragments = batch.column(2).as_binary::(); - let applicable_fragments = RoaringBitmap::deserialize_from(applicable_fragments.value(0))?; + let (index_result, applicable_fragments) = IndexExprResult::deserialize(batch)?; Ok(Self { index_result, @@ -612,14 +587,17 @@ impl FilteredReadStream { // Resolve filter for this fragment let filter = if let Some(evaluated_index) = evaluated_index { if evaluated_index.applicable_fragments.contains(fragment_id) { - match &evaluated_index.index_result { - IndexExprResult::Exact(_) => options.refine_filter.clone(), - IndexExprResult::AtLeast(_) - if scan_planned_with_limit_pushed_down => - { - options.refine_filter.clone() - } - _ => options.full_filter.clone(), + let r = &evaluated_index.index_result; + // `Exact` results don't need a recheck. `AtLeast` + // results can also skip recheck when the + // skip/take pushdown is in play (we only read the + // guaranteed-match ranges in that case). + let can_skip_recheck = r.is_exact() + || (r.is_at_least() && scan_planned_with_limit_pushed_down); + if can_skip_recheck { + options.refine_filter.clone() + } else { + options.full_filter.clone() } } else { options.full_filter.clone() @@ -728,29 +706,40 @@ impl FilteredReadStream { if evaluated_index.applicable_fragments.contains(fragment_id) { let _span = tracing::span!(tracing::Level::DEBUG, "apply_index_result").entered(); - match &evaluated_index.index_result { - IndexExprResult::Exact(row_addr_mask) => { - let valid_ranges = row_id_sequence.mask_to_offset_ranges(row_addr_mask); - let mut matched_ranges = Self::intersect_ranges(&to_read, &valid_ranges); - fragments_to_read.insert(fragment_id, matched_ranges.clone()); - - Self::apply_skip_take_to_ranges(&mut matched_ranges, to_skip, to_take); - scan_push_down_fragments_to_read.insert(fragment_id, matched_ranges); - } - IndexExprResult::AtMost(row_addr_mask) => { - // Cannot push down skip/take for AtMost - let valid_ranges = row_id_sequence.mask_to_offset_ranges(row_addr_mask); - let matched_ranges = Self::intersect_ranges(&to_read, &valid_ranges); - fragments_to_read.insert(fragment_id, matched_ranges); - } - IndexExprResult::AtLeast(row_addr_mask) => { - let valid_ranges = row_id_sequence.mask_to_offset_ranges(row_addr_mask); - let mut guaranteed_ranges = Self::intersect_ranges(&to_read, &valid_ranges); - fragments_to_read.insert(fragment_id, guaranteed_ranges.clone()); - - Self::apply_skip_take_to_ranges(&mut guaranteed_ranges, to_skip, to_take); - scan_push_down_fragments_to_read.insert(fragment_id, guaranteed_ranges); - } + let index_result = &evaluated_index.index_result; + if index_result.is_exact() { + // lower == upper; either side gives the precise answer. + let valid_ranges = row_id_sequence.mask_to_offset_ranges(&index_result.upper); + let mut matched_ranges = Self::intersect_ranges(&to_read, &valid_ranges); + fragments_to_read.insert(fragment_id, matched_ranges.clone()); + + Self::apply_skip_take_to_ranges(&mut matched_ranges, to_skip, to_take); + scan_push_down_fragments_to_read.insert(fragment_id, matched_ranges); + } else if index_result.is_at_least() { + // upper is universe; lower is the guaranteed-match set + // used for the skip/take push-down path. + let valid_ranges = row_id_sequence.mask_to_offset_ranges(&index_result.lower); + let mut guaranteed_ranges = Self::intersect_ranges(&to_read, &valid_ranges); + fragments_to_read.insert(fragment_id, guaranteed_ranges.clone()); + + Self::apply_skip_take_to_ranges(&mut guaranteed_ranges, to_skip, to_take); + scan_push_down_fragments_to_read.insert(fragment_id, guaranteed_ranges); + } else { + // AtMost or true Refined: read everything in `upper` + // and rely on the full-filter recheck for survivors. + // + // For AtMost the index gives no lower bound, so there's + // no skip/take push-down to do. For Refined the lower + // bound *is* a guaranteed-match set, but exploiting it + // requires per-range filter push-down (different filter + // per range within a fragment), which the current plan + // doesn't support. The recheck-skip opportunity on the + // `lower` portion would also be visible up at the + // `can_skip_recheck` block — both are deferred. See + // TODO(refined-pushdown). + let valid_ranges = row_id_sequence.mask_to_offset_ranges(&index_result.upper); + let matched_ranges = Self::intersect_ranges(&to_read, &valid_ranges); + fragments_to_read.insert(fragment_id, matched_ranges); } } else { // Fragment not indexed. Normally we add the full fragment to keep @@ -2147,6 +2136,7 @@ mod tests { optimize::OptimizeOptions, scalar::{ScalarIndexParams, expression::PlannerIndexExt}, }; + use lance_select::result::IndexExprResultWireFormat; use crate::{ dataset::{InsertBuilder, WriteDestination, WriteMode, WriteParams}, @@ -2277,6 +2267,7 @@ mod tests { Some(Arc::new(ScalarIndexExec::new( self.dataset.clone(), index_query, + IndexExprResultWireFormat::default(), ))) } else { None @@ -2372,6 +2363,53 @@ mod tests { )) } + /// Round-trip every interval shape through the arrow wire format and + /// confirm the endpoints survive. Exercises both + /// `IndexExprResult::serialize` and `EvaluatedIndex::try_from_arrow` + /// so the schema names stay in sync. + #[test] + fn test_index_expr_result_serialize_roundtrip() { + use lance_select::{RowAddrMask, RowAddrTreeMap}; + + let mk = |rows: &[u64]| RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(rows)); + + let mut frags = RoaringBitmap::new(); + frags.insert(0); + frags.insert(7); + + let cases = vec![ + ("exact", IndexExprResult::exact(mk(&[1, 2, 3]))), + ("at_most", IndexExprResult::at_most(mk(&[1, 2, 3]))), + ("at_least", IndexExprResult::at_least(mk(&[1, 2]))), + // Refined: non-empty lower strictly inside non-universe upper. + ("refined", IndexExprResult::new(mk(&[1, 2]), mk(&[1, 2, 3]))), + ]; + + for (name, original) in cases { + let batch = original + .serialize(&frags, IndexExprResultWireFormat::default()) + .unwrap_or_else(|e| panic!("serialize {name}: {e}")); + let decoded = EvaluatedIndex::try_from_arrow(&batch) + .unwrap_or_else(|e| panic!("try_from_arrow {name}: {e}")); + + // The underlying RowAddrTreeMap is PartialEq; compare via mask + // emptiness + symmetric difference being empty would be more + // robust, but the canonical builders preserve representation. + assert_eq!( + decoded.index_result.lower, original.lower, + "{name}: lower endpoint changed across round-trip", + ); + assert_eq!( + decoded.index_result.upper, original.upper, + "{name}: upper endpoint changed across round-trip", + ); + assert_eq!( + decoded.applicable_fragments, frags, + "{name}: applicable fragments changed across round-trip", + ); + } + } + #[test_log::test(tokio::test)] async fn test_bloom_filter_is_not_null_prefilter() { let (_tmp_path, dataset) = dataset_with_bloom_filter_nulls().await; diff --git a/rust/lance/src/io/exec/scalar_index.rs b/rust/lance/src/io/exec/scalar_index.rs index f6fd1d0a197..7a8f3301f8f 100644 --- a/rust/lance/src/io/exec/scalar_index.rs +++ b/rust/lance/src/io/exec/scalar_index.rs @@ -40,13 +40,12 @@ use lance_index::{ metrics::MetricsCollector, scalar::{ SargableQuery, ScalarIndex, - expression::{ - INDEX_EXPR_RESULT_SCHEMA, IndexExprResult, ScalarIndexExpr, ScalarIndexLoader, - ScalarIndexSearch, serialize_index_expr_result, - }, + expression::{ScalarIndexExpr, ScalarIndexLoader, ScalarIndexSearch}, }, }; -use lance_select::{RowAddrMask, RowAddrTreeMap, RowSetOps}; +use lance_select::{ + IndexExprResult, RowAddrMask, RowAddrTreeMap, RowSetOps, result::IndexExprResultWireFormat, +}; use lance_table::format::Fragment; use roaring::RoaringBitmap; use tracing::{debug_span, instrument}; @@ -76,6 +75,7 @@ pub struct ScalarIndexExec { expr: ScalarIndexExpr, properties: Arc, metrics: ExecutionPlanMetricsSet, + result_format: IndexExprResultWireFormat, } impl DisplayAs for ScalarIndexExec { @@ -92,9 +92,13 @@ impl DisplayAs for ScalarIndexExec { } impl ScalarIndexExec { - pub fn new(dataset: Arc, expr: ScalarIndexExpr) -> Self { + pub fn new( + dataset: Arc, + expr: ScalarIndexExpr, + result_format: IndexExprResultWireFormat, + ) -> Self { let properties = Arc::new(PlanProperties::new( - EquivalenceProperties::new(INDEX_EXPR_RESULT_SCHEMA.clone()), + EquivalenceProperties::new(result_format.schema().clone()), Partitioning::RoundRobinBatch(1), EmissionType::Incremental, Boundedness::Bounded, @@ -104,6 +108,7 @@ impl ScalarIndexExec { expr, properties, metrics: ExecutionPlanMetricsSet::new(), + result_format, } } @@ -141,6 +146,7 @@ impl ScalarIndexExec { expr: ScalarIndexExpr, dataset: Arc, plan_metrics: ExecutionPlanMetricsSet, + result_format: IndexExprResultWireFormat, ) -> Result { let metrics = IndexMetrics::new(&plan_metrics, 0); let query_result = { @@ -153,7 +159,7 @@ impl ScalarIndexExec { { let ser_time = plan_metrics.new_time(SCALAR_INDEX_SER_TIME_METRIC, 0); let _timer = ser_time.timer(); - serialize_index_expr_result(&query_result, &fragments_covered_by_result) + query_result.serialize(&fragments_covered_by_result, result_format) } } } @@ -168,7 +174,7 @@ impl ExecutionPlan for ScalarIndexExec { } fn schema(&self) -> SchemaRef { - INDEX_EXPR_RESULT_SCHEMA.clone() + self.result_format.schema().clone() } fn children(&self) -> Vec<&Arc> { @@ -197,13 +203,14 @@ impl ExecutionPlan for ScalarIndexExec { self.expr.clone(), self.dataset.clone(), self.metrics.clone(), + self.result_format, ); let stream = futures::stream::iter(vec![batch_fut]) .then(|batch_fut| batch_fut.map_err(|err| err.into())) .boxed() as BoxStream<'static, datafusion::common::Result>; Ok(Box::pin(InstrumentedRecordBatchStreamAdapter::new( - INDEX_EXPR_RESULT_SCHEMA.clone(), + self.result_format.schema().clone(), stream, partition, &self.metrics, @@ -216,7 +223,7 @@ impl ExecutionPlan for ScalarIndexExec { ) -> datafusion::error::Result { Ok(datafusion::physical_plan::Statistics { num_rows: datafusion::common::stats::Precision::Exact(2), - ..datafusion::physical_plan::Statistics::new_unknown(&INDEX_EXPR_RESULT_SCHEMA) + ..datafusion::physical_plan::Statistics::new_unknown(self.result_format.schema()) }) } @@ -361,9 +368,10 @@ impl MapIndexExec { needs_recheck: false, }); let query_result = query.evaluate(dataset.as_ref(), metrics.as_ref()).await?; - let IndexExprResult::Exact(mut row_addr_mask) = query_result else { + if !query_result.is_exact() { todo!("Support for non-exact query results as input for merge_insert") - }; + } + let mut row_addr_mask = query_result.upper; if let Some(deletion_mask) = deletion_mask.as_ref() { row_addr_mask = row_addr_mask & deletion_mask.as_ref().clone(); @@ -555,21 +563,26 @@ impl MaterializeIndexExec { // when the index was trained will still be deleted when the index is queried. DatasetPreFilter::create_deletion_mask(dataset.clone(), fragment_bitmap) }); + // MaterializeIndexExec emits a deterministic set of row ids. The + // `upper` mask of the interval is the candidate set (the answer is + // a subset of `upper`). For `Exact` results this is the exact + // answer; for `AtMost` and Refined results it's a superset that + // gets pruned downstream by `LanceFilterExec` (the full filter + // runs on the materialized batches via the scan plan, so any + // non-matching candidates in `upper` are dropped before they + // reach the user). `AtLeast` carries an unbounded upper, so the + // candidate set is the whole row space — not actionable here. + let take_upper = |result: IndexExprResult| -> Result { + if result.is_at_least() && !result.is_exact() { + todo!("Support AtLeast in MaterializeIndexExec") + } + Ok(result.upper) + }; let mask = if let Some(prefilter) = prefilter { let (expr_result, prefilter) = futures::try_join!(expr_result, prefilter)?; - let mask = match expr_result { - IndexExprResult::Exact(mask) => mask, - IndexExprResult::AtMost(mask) => mask, - IndexExprResult::AtLeast(_) => todo!("Support AtLeast in MaterializeIndexExec"), - }; - mask & (*prefilter).clone() + take_upper(expr_result)? & (*prefilter).clone() } else { - let expr_result = expr_result.await?; - match expr_result { - IndexExprResult::Exact(mask) => mask, - IndexExprResult::AtMost(mask) => mask, - IndexExprResult::AtLeast(_) => todo!("Support AtLeast in MaterializeIndexExec"), - } + take_upper(expr_result.await?)? }; let ids = row_ids_for_mask(mask, &dataset, &fragments).await?; let ids = UInt64Array::from(ids); @@ -744,6 +757,7 @@ mod tests { use crate::index::DatasetIndexExt; use arrow::datatypes::UInt64Type; + use arrow_schema::Schema; use datafusion::{ execution::TaskContext, physical_plan::ExecutionPlan, prelude::SessionConfig, scalar::ScalarValue, @@ -758,6 +772,7 @@ mod tests { expression::{ScalarIndexExpr, ScalarIndexSearch}, }, }; + use lance_select::result::IndexExprResultWireFormat; use crate::{ Dataset, @@ -841,6 +856,62 @@ mod tests { assert_eq!(batches[0].num_rows(), 5); } + /// `ScalarIndexExec::schema()` (and the stream it emits) must advertise + /// the same schema the batch actually carries — otherwise downstream + /// consumers that trust `ExecutionPlan::schema()` will see a different + /// shape than they receive. + /// + /// The schema depends on the `IndexExprResultWireFormat` passed to `ScalarIndexExec::new`. + #[tokio::test] + async fn test_scalar_index_exec_advertises_correct_schema() { + let TestFixture { + dataset, + _tmp_dir_guard, + } = test_fixture().await; + + let query = ScalarIndexExpr::Query(ScalarIndexSearch { + column: "ordered".to_string(), + index_name: "ordered_idx".to_string(), + index_type: "BTree".to_string(), + query: Arc::new(SargableQuery::Range( + Bound::Unbounded, + Bound::Excluded(ScalarValue::UInt64(Some(47))), + )), + needs_recheck: false, + }); + + let verify = async |plan: ScalarIndexExec, schema: Arc| { + assert_eq!(plan.schema(), schema); + assert_eq!( + plan.partition_statistics(None) + .unwrap() + .column_statistics + .len(), + schema.fields().len(), + ); + + let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap(); + assert_eq!(stream.schema(), schema); + let batches = stream.try_collect::>().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].schema(), schema); + }; + + let plan = ScalarIndexExec::new( + dataset.clone(), + query.clone(), + IndexExprResultWireFormat::ThreeVariant, + ); + let schema = IndexExprResultWireFormat::ThreeVariant.schema().clone(); + + verify(plan, schema).await; + + let plan = ScalarIndexExec::new(dataset, query, IndexExprResultWireFormat::TwoMask); + let schema = IndexExprResultWireFormat::TwoMask.schema().clone(); + + verify(plan, schema).await; + } + #[test] fn no_context_scalar_index() { // These tests ensure we can create nodes and call execute without a tokio Runtime @@ -862,7 +933,11 @@ mod tests { // These plans aren't even valid but it appears we defer all work (even validation) until // read time. - let plan = ScalarIndexExec::new(arc_dasaset.clone(), query.clone()); + let plan = ScalarIndexExec::new( + arc_dasaset.clone(), + query.clone(), + IndexExprResultWireFormat::default(), + ); plan.execute(0, Arc::new(TaskContext::default())).unwrap(); let plan = MapIndexExec::new( diff --git a/rust/lance/src/io/exec/utils.rs b/rust/lance/src/io/exec/utils.rs index 004b1a95b5f..5def0fb254d 100644 --- a/rust/lance/src/io/exec/utils.rs +++ b/rust/lance/src/io/exec/utils.rs @@ -13,7 +13,6 @@ use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; -use arrow::array::AsArray; use arrow_array::{RecordBatch, UInt64Array}; use arrow_schema::SchemaRef; use async_trait::async_trait; @@ -30,7 +29,7 @@ use lance_core::error::{CloneableResult, Error}; use lance_core::utils::futures::{Capacity, SharedStreamExt}; use lance_core::{ROW_ID, Result}; use lance_index::prefilter::FilterLoader; -use lance_select::{RowAddrMask, RowAddrTreeMap}; +use lance_select::{RowAddrMask, RowAddrTreeMap, result::IndexExprResult}; use std::future::Future; use crate::Dataset; @@ -97,20 +96,19 @@ pub(crate) struct SelectionVectorToPrefilter(pub SendableRecordBatchStream); #[async_trait] impl FilterLoader for SelectionVectorToPrefilter { async fn load(mut self: Box) -> Result { - let batch = self - .0 - .try_next() - .await? - .ok_or_else(|| { - Error::internal("Selection vector source for prefilter did not yield any batches") - }) - .unwrap(); - RowAddrMask::from_arrow(batch["result"].as_binary_opt::().ok_or_else(|| { - Error::internal(format!( - "Expected selection vector input to yield binary arrays but got {}", - batch["result"].data_type() - )) - })?) + let batch = self.0.try_next().await?.ok_or_else(|| { + Error::internal("Selection vector source for prefilter did not yield any batches") + })?; + // The vector-search prefilter wants the set of rows the search is + // allowed to consider — the `upper` bound of the index expression + // result. Rows outside the upper bound are guaranteed not to match, + // so the vector search can skip them. + // + // Use deserialize() here (rather than indexing "upper" directly) to + // support both the TwoMask and the legacy ThreeVariant wire formats + // that ScalarIndexExec may emit. + let (result, _) = IndexExprResult::deserialize(&batch)?; + Ok(result.upper) } }