diff --git a/docs/src/guide/distributed_indexing.md b/docs/src/guide/distributed_indexing.md index 83da11dbe01..ae17b9bb0f2 100644 --- a/docs/src/guide/distributed_indexing.md +++ b/docs/src/guide/distributed_indexing.md @@ -65,7 +65,7 @@ indices// These physical segments are then committed together as one logical index. In the common no-merge case, the input segments are already the physical -segments and `build_all()` returns them unchanged. +segments and can be committed directly. ## Roles @@ -73,9 +73,8 @@ There are two parties involved in distributed indexing: - **Workers** build segments - **The caller** launches workers, chooses how those segments should be turned - into physical segments, provides any additional inputs requested by the - segment build APIs, and - commits the final result + into final segments, optionally merges caller-defined groups, and commits the + final result Lance does not provide a distributed scheduler. The caller is responsible for launching workers and driving the overall workflow. diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index d338ff0fc4b..214faa2866f 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -1119,61 +1119,6 @@ fn inner_merge_index_metadata( Ok(()) } -#[unsafe(no_mangle)] -pub extern "system" fn Java_org_lance_Dataset_nativeBuildIndexSegments<'local>( - mut env: JNIEnv<'local>, - java_dataset: JObject, - java_segments: JObject, - index_type: jint, - target_segment_bytes_jobj: JObject, -) -> JObject<'local> { - ok_or_throw!( - env, - inner_build_index_segments( - &mut env, - java_dataset, - java_segments, - index_type, - target_segment_bytes_jobj - ) - ) -} - -fn inner_build_index_segments<'local>( - env: &mut JNIEnv<'local>, - java_dataset: JObject, - java_segments: JObject, - index_type: jint, - target_segment_bytes_jobj: JObject, -) -> Result> { - let segments = import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?; - let index_type = IndexType::try_from(index_type)?; - let target_segment_bytes = env - .get_long_opt(&target_segment_bytes_jobj)? - .map(|v| v as u64); - let template = segment_template(&segments)?; - - let built_segments = { - let dataset_guard = - unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; - let mut builder = dataset_guard - .inner - .create_index_segment_builder() - .with_index_type(index_type) - .with_segments(segments); - if let Some(target_segment_bytes) = target_segment_bytes { - builder = builder.with_target_segment_bytes(target_segment_bytes); - } - RT.block_on(builder.build_all())? - }; - - let built_metadata = built_segments - .into_iter() - .map(|segment| index_segment_to_metadata(&template, segment)) - .collect::>(); - export_vec(env, &built_metadata) -} - #[unsafe(no_mangle)] pub extern "system" fn Java_org_lance_Dataset_nativeMergeExistingIndexSegments<'local>( mut env: JNIEnv<'local>, @@ -1250,44 +1195,6 @@ fn inner_commit_existing_index_segments<'local>( export_vec(env, &committed) } -struct SegmentTemplate { - name: String, - fields: Vec, - dataset_version: u64, -} - -fn segment_template(segments: &[IndexMetadata]) -> Result { - let first = segments - .first() - .ok_or_else(|| Error::input_error("segments cannot be empty".to_string()))?; - for segment in &segments[1..] { - if segment.name != first.name { - return Err(Error::input_error(format!( - "All segments must share the same index name, got '{}' and '{}'", - first.name, segment.name - ))); - } - if segment.fields != first.fields { - return Err(Error::input_error(format!( - "All segments must target the same field ids, got {:?} and {:?}", - first.fields, segment.fields - ))); - } - if segment.dataset_version != first.dataset_version { - return Err(Error::input_error(format!( - "All segments must share the same dataset version, got {} and {}", - first.dataset_version, segment.dataset_version - ))); - } - } - - Ok(SegmentTemplate { - name: first.name.clone(), - fields: first.fields.clone(), - dataset_version: first.dataset_version, - }) -} - fn index_metadata_to_segment(metadata: &IndexMetadata) -> Result { let fragment_bitmap = metadata.fragment_bitmap.clone().ok_or_else(|| { Error::input_error(format!( @@ -1310,22 +1217,6 @@ fn index_metadata_to_segment(metadata: &IndexMetadata) -> Result { )) } -fn index_segment_to_metadata(template: &SegmentTemplate, segment: IndexSegment) -> IndexMetadata { - let (uuid, fragment_bitmap, index_details, index_version) = segment.into_parts(); - IndexMetadata { - uuid, - fields: template.fields.clone(), - name: template.name.clone(), - dataset_version: template.dataset_version, - fragment_bitmap: Some(fragment_bitmap), - index_details: Some(index_details), - index_version, - created_at: Some(Utc::now()), - base_id: None, - files: None, - } -} - #[unsafe(no_mangle)] pub extern "system" fn Java_org_lance_Dataset_nativeOptimizeIndices( mut env: JNIEnv, diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index c4a9b591efb..23341283861 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -1152,44 +1152,6 @@ public void mergeIndexMetadata( private native void innerMergeIndexMetadata( String indexUUID, int indexType, Optional batchReadHead); - /** - * Build physical vector index segments from previously-created fragment-level index outputs. - * - * @param segments segment metadata returned by {@link #createIndex(IndexOptions)} when - * fragmentIds are provided - * @param indexType concrete index type for the staged segments - * @param targetSegmentBytes optional size target for merged physical segments - * @return built physical segment metadata - */ - public List buildIndexSegments( - List segments, IndexType indexType, Optional targetSegmentBytes) { - Preconditions.checkNotNull(segments, "segments cannot be null"); - Preconditions.checkArgument(!segments.isEmpty(), "segments cannot be empty"); - Preconditions.checkNotNull(indexType, "indexType cannot be null"); - try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) { - Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); - return nativeBuildIndexSegments(segments, indexType.getValue(), targetSegmentBytes); - } - } - - /** - * Build physical vector index segments from previously-created fragment-level index outputs. - * - * @param segments segment metadata returned by {@link #createIndex(IndexOptions)} when - * fragmentIds are provided - * @param targetSegmentBytes optional size target for merged physical segments - * @return built physical segment metadata - */ - @Deprecated - public List buildIndexSegments(List segments, Optional targetSegmentBytes) { - throw new IllegalArgumentException( - "buildIndexSegments now requires an explicit index type; call " - + "buildIndexSegments(segments, indexType, targetSegmentBytes)"); - } - - private native List nativeBuildIndexSegments( - List segments, int indexType, Optional targetSegmentBytes); - /** Merge one caller-defined group of existing uncommitted vector index segments. */ public Index mergeExistingIndexSegments(List segments) { Preconditions.checkNotNull(segments, "segments cannot be null"); diff --git a/java/src/test/java/org/lance/index/VectorIndexTest.java b/java/src/test/java/org/lance/index/VectorIndexTest.java index 87241b6696b..81c3554eaf1 100755 --- a/java/src/test/java/org/lance/index/VectorIndexTest.java +++ b/java/src/test/java/org/lance/index/VectorIndexTest.java @@ -29,7 +29,6 @@ import java.nio.file.Path; import java.util.Collections; import java.util.List; -import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -95,14 +94,11 @@ public void testCreateIvfFlatIndexDistributively(@TempDir Path tempDir) throws E dataset.listIndexes().contains(TestVectorDataset.indexName), "Partially created IVF_FLAT index should not present before commit"); - List builtSegments = - dataset.buildIndexSegments( - List.of(firstSegment, secondSegment), IndexType.IVF_FLAT, Optional.empty()); - assertEquals(2, builtSegments.size()); - List committed = dataset.commitExistingIndexSegments( - TestVectorDataset.indexName, TestVectorDataset.vectorColumnName, builtSegments); + TestVectorDataset.indexName, + TestVectorDataset.vectorColumnName, + List.of(firstSegment, secondSegment)); assertEquals(2, committed.size()); assertTrue(dataset.listIndexes().contains(TestVectorDataset.indexName)); } @@ -188,14 +184,11 @@ public void testCreateIvfPqIndexDistributively(@TempDir Path tempDir) throws Exc dataset.listIndexes().contains(TestVectorDataset.indexName), "Partially created IVF_PQ index should not present before commit"); - List builtSegments = - dataset.buildIndexSegments( - List.of(firstSegment, secondSegment), IndexType.IVF_PQ, Optional.empty()); - assertEquals(2, builtSegments.size()); - List committed = dataset.commitExistingIndexSegments( - TestVectorDataset.indexName, TestVectorDataset.vectorColumnName, builtSegments); + TestVectorDataset.indexName, + TestVectorDataset.vectorColumnName, + List.of(firstSegment, secondSegment)); assertEquals(2, committed.size()); assertTrue(dataset.listIndexes().contains(TestVectorDataset.indexName)); } @@ -265,14 +258,11 @@ public void testCreateIvfSqIndexDistributively(@TempDir Path tempDir) throws Exc dataset.listIndexes().contains(TestVectorDataset.indexName), "Partially created IVF_SQ index should not present before commit"); - List builtSegments = - dataset.buildIndexSegments( - List.of(firstSegment, secondSegment), IndexType.IVF_SQ, Optional.empty()); - assertEquals(2, builtSegments.size()); - List committed = dataset.commitExistingIndexSegments( - TestVectorDataset.indexName, TestVectorDataset.vectorColumnName, builtSegments); + TestVectorDataset.indexName, + TestVectorDataset.vectorColumnName, + List.of(firstSegment, secondSegment)); assertEquals(2, committed.size()); assertTrue(dataset.listIndexes().contains(TestVectorDataset.indexName)); } diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 4cbd39fdebb..2694bcfbc86 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3075,8 +3075,9 @@ def create_scalar_index( If provided, the index will be created only on the specified fragments. This enables distributed/fragment-level indexing. When provided, the method returns metadata for one segment but does not commit - the index to the dataset. The segment can be planned, merged, and - committed later using the segment builder and commit APIs. + the index to the dataset. The segment can be optionally merged with + other segments and committed later with + ``commit_existing_index_segments(...)``. This parameter is passed via kwargs internally. index_uuid : str, optional A UUID to use for the segment written by this call. @@ -3751,8 +3752,8 @@ def create_index( This enables distributed/fragment-level indexing. When provided, the method creates one segment but does not commit the index to the dataset. The returned metadata can be passed to - ``create_index_segment_builder().with_index_type(...).with_segments(...)`` - and then committed with ``commit_existing_index_segments(...)``. + ``merge_existing_index_segments(...)`` if grouping is needed and then + committed with ``commit_existing_index_segments(...)``. index_uuid : str, optional A UUID to use for the segment written by this call. If not provided, a new UUID will be generated. @@ -3935,10 +3936,9 @@ def create_index_uncommitted( 1. run :meth:`create_index_uncommitted` on each worker with that worker's assigned ``fragment_ids`` 2. collect the returned :class:`Index` objects - 3. call :meth:`IndexSegmentBuilder.with_index_type` with the concrete - distributed index type - 4. pass them to :meth:`IndexSegmentBuilder.with_segments` - 5. build one or more physical segments and commit them with + 3. optionally merge caller-defined groups with + :meth:`merge_existing_index_segments` + 4. commit the final segment list with :meth:`commit_existing_index_segments` Parameters are the same as :meth:`create_index`, with one additional @@ -4019,9 +4019,9 @@ def merge_index_metadata( Merge distributed scalar index metadata. Vector distributed indexing no longer uses this API. For vector indices, - build segments with :meth:`create_index_uncommitted`, plan or - merge them with :meth:`create_index_segment_builder`, and publish them - with :meth:`commit_existing_index_segments`. + build segments with :meth:`create_index_uncommitted`, optionally merge + them with :meth:`merge_existing_index_segments`, and publish them with + :meth:`commit_existing_index_segments`. This method does NOT commit changes. @@ -4058,17 +4058,6 @@ def merge_index_metadata( self._ds.merge_index_metadata(index_uuid, t, batch_readhead, progress_callback) return None - def create_index_segment_builder(self): - """ - Create a builder for turning existing segments into physical segments. - - Provide the segment metadata returned by - :meth:`create_index_uncommitted` through - :meth:`IndexSegmentBuilder.with_segments`, and declare the segment type - with :meth:`IndexSegmentBuilder.with_index_type`. - """ - return self._ds.create_index_segment_builder() - def merge_existing_index_segments(self, segments: List[Index]) -> Index: """ Merge one caller-defined group of existing uncommitted segments. diff --git a/python/python/lance/indices/__init__.py b/python/python/lance/indices/__init__.py index 04ef8f806b2..40dc9ed93ac 100644 --- a/python/python/lance/indices/__init__.py +++ b/python/python/lance/indices/__init__.py @@ -10,7 +10,6 @@ IndexSegmentDescription = _lance.indices.IndexSegmentDescription IndexSegment = _lance.indices.IndexSegment -IndexSegmentPlan = _lance.indices.IndexSegmentPlan __all__ = [ "IndicesBuilder", @@ -19,7 +18,6 @@ "IvfModel", "IndexFileVersion", "IndexSegment", - "IndexSegmentPlan", "IndexSegmentDescription", ] diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index 2af79b4d072..0421a1c2565 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -63,7 +63,6 @@ from .fragment import ( ) from .indices import IndexDescription as IndexDescription from .indices import IndexSegment as IndexSegment -from .indices import IndexSegmentPlan as IndexSegmentPlan from .lance import PySearchFilter from .optimize import ( Compaction as Compaction, @@ -191,14 +190,6 @@ class LanceColumnStatistics: class _Session: def size_bytes(self) -> int: ... -class IndexSegmentBuilder: - def with_index_type(self, index_type: str) -> Self: ... - def with_segments(self, segments: List[Index]) -> Self: ... - def with_target_segment_bytes(self, bytes: int) -> Self: ... - def plan(self) -> List[IndexSegmentPlan]: ... - def build(self, plan: IndexSegmentPlan) -> IndexSegment: ... - def build_all(self) -> List[IndexSegment]: ... - class LanceBlobFile: def close(self): ... def is_closed(self) -> bool: ... @@ -410,7 +401,6 @@ class _Dataset: batch_readhead: Optional[int] = None, progress_callback: Optional[Callable[[IndexProgress], None]] = None, ): ... - def create_index_segment_builder(self) -> IndexSegmentBuilder: ... def merge_existing_index_segments(self, segments: List[Index]) -> Index: ... def commit_existing_index_segments( self, index_name: str, column: str, segments: List[Union[IndexSegment, Index]] diff --git a/python/python/lance/lance/indices/__init__.pyi b/python/python/lance/lance/indices/__init__.pyi index fc5d03b80bd..384e2528e99 100644 --- a/python/python/lance/lance/indices/__init__.pyi +++ b/python/python/lance/lance/indices/__init__.pyi @@ -28,14 +28,6 @@ class IndexSegment: def __repr__(self) -> str: ... -class IndexSegmentPlan: - segment: IndexSegment - segments: list[object] - estimated_bytes: int - requested_index_type: Optional[str] - - def __repr__(self) -> str: ... - def train_ivf_model( dataset, column: str, diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index 4cf1c4947e3..630d4827520 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -2389,12 +2389,6 @@ def build_distributed_vector_index( ) ) - segments = ( - dataset.create_index_segment_builder() - .with_index_type(index_type) - .with_segments(segments) - .build_all() - ) return dataset.commit_existing_index_segments(f"{column}_idx", column, segments) @@ -2766,12 +2760,6 @@ def test_metadata_merge_pq_success(tmp_path): ivf_centroids=pre["ivf_centroids"], pq_codebook=pre["pq_codebook"], ) - segments = ( - ds.create_index_segment_builder() - .with_index_type("IVF_PQ") - .with_segments(segments) - .build_all() - ) ds = _commit_segments_helper(ds, segments, "vector") q = np.random.rand(128).astype(np.float32) results = ds.to_table(nearest={"column": "vector", "q": q, "k": 10}) @@ -2810,12 +2798,6 @@ def test_distributed_workflow_merge_and_search(tmp_path): ivf_centroids=pre["ivf_centroids"], pq_codebook=pre["pq_codebook"], ) - segments = ( - ds.create_index_segment_builder() - .with_index_type("IVF_PQ") - .with_segments(segments) - .build_all() - ) ds = _commit_segments_helper(ds, segments, "vector") q = np.random.rand(128).astype(np.float32) results = ds.to_table(nearest={"column": "vector", "q": q, "k": 10}) @@ -2851,12 +2833,6 @@ def test_vector_merge_two_shards_success_flat(tmp_path): ivf_centroids=preprocessed["ivf_centroids"], pq_codebook=preprocessed["pq_codebook"], ) - segments = ( - ds.create_index_segment_builder() - .with_index_type("IVF_FLAT") - .with_segments(segments) - .build_all() - ) ds = _commit_segments_helper(ds, segments, column="vector") q = np.random.rand(128).astype(np.float32) result = ds.to_table(nearest={"column": "vector", "q": q, "k": 5}) @@ -2909,12 +2885,6 @@ def test_distributed_ivf_parameterized(tmp_path, index_type, num_sub_vectors): ds.create_index_uncommitted(**kwargs1), ds.create_index_uncommitted(**kwargs2), ] - segments = ( - ds.create_index_segment_builder() - .with_index_type(index_type) - .with_segments(segments) - .build_all() - ) ds = _commit_segments_helper(ds, segments, "vector") q = np.random.rand(128).astype(np.float32) @@ -2975,12 +2945,7 @@ def test_merge_two_shards_parameterized(tmp_path, index_type, num_sub_vectors): kwargs2["pq_codebook"] = pre["pq_codebook"] segment2 = ds.create_index_uncommitted(**kwargs2) - segments = ( - ds.create_index_segment_builder() - .with_index_type(index_type) - .with_segments([segment1, segment2]) - .build_all() - ) + segments = [segment1, segment2] ds = _commit_segments_helper(ds, segments, column="vector") q = np.random.rand(128).astype(np.float32) @@ -3023,8 +2988,8 @@ def test_commit_existing_index_segments_accepts_index_metadata(tmp_path): assert 0 < len(results) <= 5 -def test_index_segment_builder_builds_vector_segments(tmp_path): - ds = _make_sample_dataset_base(tmp_path, "segment_builder_ds", 2000, 128) +def test_commit_existing_index_segments_accepts_uncommitted_vector_segments(tmp_path): + ds = _make_sample_dataset_base(tmp_path, "segment_commit_ds", 2000, 128) frags = ds.get_fragments() assert len(frags) >= 2 builder = IndicesBuilder(ds, "vector") @@ -3051,16 +3016,6 @@ def test_index_segment_builder_builds_vector_segments(tmp_path): for fragment in frags[:2] ] - segment_builder = ( - ds.create_index_segment_builder() - .with_index_type("IVF_FLAT") - .with_segments(segments) - ) - plans = segment_builder.plan() - assert len(plans) == 2 - assert all(len(plan.segments) == 1 for plan in plans) - - segments = segment_builder.build_all() assert len(segments) == 2 ds = ds.commit_existing_index_segments("vector_idx", "vector", segments) @@ -3123,12 +3078,6 @@ def build_distributed_ivf_pq(ds_copy, shard_order): ivf_centroids=pre["ivf_centroids"], pq_codebook=pre["pq_codebook"], ) - segments = ( - ds_copy.create_index_segment_builder() - .with_index_type("IVF_PQ") - .with_segments(segments) - .build_all() - ) return _commit_segments_helper(ds_copy, segments, column="vector") except ValueError as e: raise e diff --git a/python/src/dataset.rs b/python/src/dataset.rs index d31cb870d0b..332d35d68c9 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -96,7 +96,7 @@ use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler; use crate::error::PythonErrorExt; use crate::file::object_store_from_uri_or_path; use crate::fragment::FileFragment; -use crate::indices::{PyIndexConfig, PyIndexDescription, PyIndexSegment, PyIndexSegmentPlan}; +use crate::indices::{PyIndexConfig, PyIndexDescription, PyIndexSegment}; use crate::namespace::extract_namespace_arc; use crate::rt; use crate::scanner::ScanStatistics; @@ -463,36 +463,6 @@ impl MergeInsertBuilder { } } -#[pyclass( - name = "IndexSegmentBuilder", - module = "lance", - subclass, - skip_from_py_object -)] -#[derive(Clone)] -pub struct PyIndexSegmentBuilder { - dataset: Arc, - index_type: Option, - segments: Vec, - target_segment_bytes: Option, -} - -impl PyIndexSegmentBuilder { - fn builder(&self) -> ::IndexSegmentBuilder<'_> { - let mut builder = self - .dataset - .create_index_segment_builder() - .with_segments(self.segments.clone()); - if let Some(index_type) = self.index_type { - builder = builder.with_index_type(index_type); - } - if let Some(target_segment_bytes) = self.target_segment_bytes { - builder = builder.with_target_segment_bytes(target_segment_bytes); - } - builder - } -} - fn index_metadata_to_segment(metadata: IndexMetadata) -> PyResult { let fragment_bitmap = metadata.fragment_bitmap.ok_or_else(|| { PyValueError::new_err(format!( @@ -535,81 +505,6 @@ fn extract_index_segments(segments: &Bound<'_, PyAny>) -> PyResult( - mut slf: PyRefMut<'a, Self>, - index_type: &str, - ) -> PyResult> { - let normalized = index_type.to_uppercase(); - slf.index_type = Some(match normalized.as_str() { - "INVERTED" | "FTS" => IndexType::Inverted, - "VECTOR" => IndexType::Vector, - "IVF_FLAT" => IndexType::IvfFlat, - "IVF_PQ" => IndexType::IvfPq, - "IVF_SQ" => IndexType::IvfSq, - "IVF_RQ" => IndexType::IvfRq, - "IVF_HNSW_FLAT" => IndexType::IvfHnswFlat, - "IVF_HNSW_PQ" => IndexType::IvfHnswPq, - "IVF_HNSW_SQ" => IndexType::IvfHnswSq, - _ => { - return Err(PyValueError::new_err(format!( - "Unsupported index type for segment builder: {index_type}" - ))); - } - }); - Ok(slf) - } - - fn with_segments<'a>( - mut slf: PyRefMut<'a, Self>, - segments: &Bound<'_, PyAny>, - ) -> PyResult> { - let mut indices = Vec::new(); - for item in segments.try_iter()? { - indices.push(item?.extract::>()?.0); - } - slf.segments = indices; - Ok(slf) - } - - fn with_target_segment_bytes<'a>( - mut slf: PyRefMut<'a, Self>, - bytes: u64, - ) -> PyResult> { - slf.target_segment_bytes = Some(bytes); - Ok(slf) - } - - fn plan(&self, py: Python<'_>) -> PyResult>> { - let plans = rt() - .block_on(Some(py), self.builder().plan())? - .infer_error()?; - plans - .into_iter() - .map(|plan| Py::new(py, PyIndexSegmentPlan::from_inner(plan))) - .collect() - } - - fn build(&self, py: Python<'_>, plan: &Bound<'_, PyAny>) -> PyResult> { - let plan = plan.extract::>()?; - let segment = rt() - .block_on(Some(py), self.builder().build(&plan.inner))? - .infer_error()?; - Py::new(py, PyIndexSegment::from_inner(segment)) - } - - fn build_all(&self, py: Python<'_>) -> PyResult>> { - let segments = rt() - .block_on(Some(py), self.builder().build_all())? - .infer_error()?; - segments - .into_iter() - .map(|segment| Py::new(py, PyIndexSegment::from_inner(segment))) - .collect() - } -} - impl MergeInsertBuilder { fn build_stats<'a>(stats: &MergeStats, py: Python<'a>) -> PyResult> { let dict = PyDict::new(py); @@ -2502,15 +2397,6 @@ impl Dataset { Ok(PyLance(index_metadata)) } - fn create_index_segment_builder(&self) -> PyResult { - Ok(PyIndexSegmentBuilder { - dataset: self.ds.clone(), - index_type: None, - segments: Vec::new(), - target_segment_bytes: None, - }) - } - fn merge_existing_index_segments( &self, segments: Vec>, diff --git a/python/src/indices.rs b/python/src/indices.rs index cf93579b867..fe988206117 100644 --- a/python/src/indices.rs +++ b/python/src/indices.rs @@ -11,9 +11,9 @@ use arrow_data::ArrayData; use chrono::{DateTime, Utc}; use lance::dataset::Dataset as LanceDataset; use lance::index::DatasetIndexExt; +use lance::index::IndexSegment; use lance::index::vector::ivf::builder::write_vector_storage; use lance::index::vector::pq::build_pq_model_in_fragments; -use lance::index::{IndexSegment, IndexSegmentPlan}; use lance::io::ObjectStore; use lance_index::progress::NoopIndexBuildProgress; use lance_index::vector::ivf::shuffler::{IvfShuffler, shuffle_vectors}; @@ -36,7 +36,7 @@ use pyo3::{ use lance::index::DatasetIndexInternalExt; use crate::fragment::FileFragment; -use crate::utils::{PyJson, PyLance}; +use crate::utils::PyJson; use crate::{ dataset::Dataset, error::PythonErrorExt, file::object_store_from_uri_or_path_no_options, rt, }; @@ -73,12 +73,6 @@ pub struct PyIndexSegment { pub(crate) inner: IndexSegment, } -impl PyIndexSegment { - pub(crate) fn from_inner(inner: IndexSegment) -> Self { - Self { inner } - } -} - #[pymethods] impl PyIndexSegment { #[getter] @@ -106,56 +100,6 @@ impl PyIndexSegment { } } -#[pyclass( - name = "IndexSegmentPlan", - module = "lance.indices", - skip_from_py_object -)] -#[derive(Debug, Clone)] -pub struct PyIndexSegmentPlan { - pub(crate) inner: IndexSegmentPlan, -} - -impl PyIndexSegmentPlan { - pub(crate) fn from_inner(inner: IndexSegmentPlan) -> Self { - Self { inner } - } -} - -#[pymethods] -impl PyIndexSegmentPlan { - #[getter] - fn segment(&self) -> PyIndexSegment { - PyIndexSegment::from_inner(self.inner.segment().clone()) - } - - #[getter] - fn segments(&self) -> Vec> { - self.inner.segments().iter().cloned().map(PyLance).collect() - } - - #[getter] - fn estimated_bytes(&self) -> u64 { - self.inner.estimated_bytes() - } - - #[getter] - fn requested_index_type(&self) -> Option { - self.inner - .requested_index_type() - .map(|index_type| index_type.to_string()) - } - - fn __repr__(&self) -> String { - format!( - "IndexSegmentPlan(segments={}, estimated_bytes={}, requested_index_type={:?})", - self.inner.segments().len(), - self.estimated_bytes(), - self.requested_index_type() - ) - } -} - #[pyclass(name = "IvfModel", module = "lance.indices", skip_from_py_object)] #[derive(Debug, Clone)] pub struct PyIvfModel { @@ -758,7 +702,6 @@ pub fn register_indices(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { indices.add_class::()?; indices.add_class::()?; indices.add_class::()?; - indices.add_class::()?; indices.add_class::()?; indices.add_class::()?; indices.add_wrapped(wrap_pyfunction!(get_ivf_model))?; diff --git a/rust/lance/benches/fts_search.rs b/rust/lance/benches/fts_search.rs index 6d35b45ef89..f4eb6224cd2 100644 --- a/rust/lance/benches/fts_search.rs +++ b/rust/lance/benches/fts_search.rs @@ -145,15 +145,8 @@ async fn build_segmented_fts_dataset(segment_count: usize) -> BenchDataset { .unwrap(); staged_segments.push(segment); } - let segments = dataset - .create_index_segment_builder() - .with_index_type(IndexType::Inverted) - .with_segments(staged_segments) - .build_all() - .await - .unwrap(); dataset - .commit_existing_index_segments(INDEX_NAME, "text", segments) + .commit_existing_index_segments(INDEX_NAME, "text", staged_segments) .await .unwrap(); @@ -228,15 +221,8 @@ async fn build_partition_compare_dataset_with_memory_limit( .unwrap(); staged_segments.push(segment); } - let segments = dataset - .create_index_segment_builder() - .with_index_type(IndexType::Inverted) - .with_segments(staged_segments) - .build_all() - .await - .unwrap(); dataset - .commit_existing_index_segments(PARTITION_COMPARE_INDEX_NAME, "text", segments) + .commit_existing_index_segments(PARTITION_COMPARE_INDEX_NAME, "text", staged_segments) .await .unwrap(); diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index d0f5983b3ce..43e00711937 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -5092,13 +5092,6 @@ pub mod test_dataset { .iter() .map(|segment| segment.uuid) .collect::>(); - let segments = self - .dataset - .create_index_segment_builder() - .with_index_type(params.index_type()) - .with_segments(segments) - .build_all() - .await?; self.dataset .commit_existing_index_segments("idx", "vec", segments) .await?; diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index c65fe13a23e..93cb1aa07d3 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -85,7 +85,7 @@ use crate::dataset::index::LanceIndexStoreExt; use crate::dataset::optimize::RemappedIndex; use crate::dataset::optimize::remapping::RemapResult; use crate::dataset::transaction::{Operation, Transaction, TransactionBuilder}; -pub use crate::index::api::{DatasetIndexExt, IndexSegment, IndexSegmentPlan, IntoIndexSegment}; +pub use crate::index::api::{DatasetIndexExt, IndexSegment, IntoIndexSegment}; use crate::index::frag_reuse::{load_frag_reuse_index_details, open_frag_reuse_index}; use crate::index::mem_wal::open_mem_wal_index; pub use crate::index::prefilter::{FilterLoader, PreFilter}; @@ -164,6 +164,22 @@ pub(crate) async fn build_index_metadata_from_segments( let mut new_indices = Vec::with_capacity(segments.len()); for segment in segments { let (uuid, fragment_bitmap, index_details, index_version) = segment.into_parts(); + if index_details.type_url.ends_with("InvertedIndexDetails") { + let metadata = IndexMetadata { + uuid, + name: index_name.to_string(), + fields: vec![field_id], + dataset_version: dataset.manifest.version, + fragment_bitmap: Some(fragment_bitmap.clone()), + index_details: Some(index_details.clone()), + index_version, + created_at: Some(chrono::Utc::now()), + base_id: None, + files: None, + }; + crate::index::scalar::inverted::finalize_segment_files_if_needed(dataset, &metadata) + .await?; + } let index_dir = dataset.indices_dir().clone().join(uuid.to_string()); let files = list_index_files_with_sizes(&dataset.object_store, &index_dir).await?; new_indices.push(IndexMetadata { @@ -758,8 +774,6 @@ impl IndexDescription for IndexDescriptionImpl { #[async_trait] impl DatasetIndexExt for Dataset { type IndexBuilder<'a> = CreateIndexBuilder<'a>; - type IndexSegmentBuilder<'a> = create::IndexSegmentBuilder<'a>; - /// Create a builder for creating an index on columns. /// /// This returns a builder that can be configured with additional options @@ -806,10 +820,6 @@ impl DatasetIndexExt for Dataset { CreateIndexBuilder::new(self, columns, index_type, params) } - fn create_index_segment_builder<'a>(&'a self) -> create::IndexSegmentBuilder<'a> { - create::IndexSegmentBuilder::new(self) - } - #[instrument(skip_all)] async fn create_index( &mut self, @@ -2658,13 +2668,6 @@ mod tests { .iter() .map(|segment| segment.uuid) .collect::>(); - let segments = dataset - .create_index_segment_builder() - .with_index_type(params.index_type()) - .with_segments(segments) - .build_all() - .await - .unwrap(); dataset .commit_existing_index_segments(index_name, column, segments) .await diff --git a/rust/lance/src/index/api.rs b/rust/lance/src/index/api.rs index 4ae652624c1..0119db7ac01 100644 --- a/rust/lance/src/index/api.rs +++ b/rust/lance/src/index/api.rs @@ -116,60 +116,10 @@ impl IntoIndexSegment for IndexMetadata { } } -/// A plan for building one physical segment from one or more existing -/// uncommitted index segments. -#[derive(Debug, Clone, PartialEq)] -pub struct IndexSegmentPlan { - segment: IndexSegment, - segments: Vec, - estimated_bytes: u64, - requested_index_type: Option, -} - -impl IndexSegmentPlan { - /// Create a plan for one built segment. - pub fn new( - segment: IndexSegment, - segments: Vec, - estimated_bytes: u64, - requested_index_type: Option, - ) -> Self { - Self { - segment, - segments, - estimated_bytes, - requested_index_type, - } - } - - /// Return the segment metadata that should be committed after this plan is built. - pub fn segment(&self) -> &IndexSegment { - &self.segment - } - - /// Return the input segment metadata that should be combined into the segment. - pub fn segments(&self) -> &[IndexMetadata] { - &self.segments - } - - /// Return the estimated number of bytes covered by this plan. - pub fn estimated_bytes(&self) -> u64 { - self.estimated_bytes - } - - /// Return the requested logical index type, if one was supplied to the planner. - pub fn requested_index_type(&self) -> Option { - self.requested_index_type - } -} - /// Extends [`crate::Dataset`] with secondary index APIs. #[async_trait] pub trait DatasetIndexExt { type IndexBuilder<'a> - where - Self: 'a; - type IndexSegmentBuilder<'a> where Self: 'a; @@ -184,19 +134,6 @@ pub trait DatasetIndexExt { params: &'a dyn IndexParams, ) -> Self::IndexBuilder<'a>; - /// Create a builder for building physical index segments from uncommitted - /// index outputs. - /// - /// The caller supplies the uncommitted index metadata returned by - /// `execute_uncommitted()` and then declares the concrete index type with - /// `with_index_type(...)` so the builder can plan segment grouping without - /// rediscovering fragment coverage. - /// - /// This is the canonical entry point for segment-based index build. - /// After building the physical segments, publish them as a - /// logical index with [`Self::commit_existing_index_segments`]. - fn create_index_segment_builder<'a>(&'a self) -> Self::IndexSegmentBuilder<'a>; - /// Create indices on columns. /// /// Upon finish, a new dataset version is generated. @@ -320,29 +257,3 @@ pub trait DatasetIndexExt { with_vector: bool, ) -> Result; } - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use super::{IndexSegment, IndexSegmentPlan}; - use lance_index::IndexType; - use uuid::Uuid; - - #[test] - fn test_index_segment_plan_accessors() { - let uuid = Uuid::new_v4(); - let segment = IndexSegment::new(uuid, [1_u32, 3], Arc::new(prost_types::Any::default()), 7); - let plan = IndexSegmentPlan::new(segment.clone(), vec![], 128, Some(IndexType::BTree)); - - assert_eq!(segment.uuid(), uuid); - assert_eq!( - segment.fragment_bitmap().iter().collect::>(), - vec![1, 3] - ); - assert_eq!(segment.index_version(), 7); - assert_eq!(plan.segment().uuid(), uuid); - assert_eq!(plan.estimated_bytes(), 128); - assert_eq!(plan.requested_index_type(), Some(IndexType::BTree)); - } -} diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 842252f9a45..bdbd493ebd0 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -8,8 +8,7 @@ use crate::{ transaction::{Operation, TransactionBuilder}, }, index::{ - DatasetIndexExt, DatasetIndexInternalExt, - api::{IndexSegment, IndexSegmentPlan}, + DatasetIndexExt, DatasetIndexInternalExt, IntoIndexSegment, build_index_metadata_from_segments, scalar::build_scalar_index, vector::{ @@ -19,7 +18,7 @@ use crate::{ vector_index_details, vector_index_details_default, }, }; -use futures::future::{BoxFuture, try_join_all}; +use futures::future::BoxFuture; use lance_core::datatypes::format_field_path; use lance_index::progress::{IndexBuildProgress, NoopIndexBuildProgress}; use lance_index::{IndexParams, IndexType, scalar::CreatedIndex}; @@ -28,11 +27,7 @@ use lance_index::{ scalar::{LANCE_SCALAR_INDEX, ScalarIndexParams, inverted::tokenizer::InvertedIndexParams}, }; use lance_table::format::{IndexMetadata, list_index_files_with_sizes}; -use std::{ - collections::{HashMap, HashSet}, - future::IntoFuture, - sync::Arc, -}; +use std::{collections::HashMap, future::IntoFuture, sync::Arc}; use tracing::instrument; use uuid::Uuid; @@ -486,41 +481,14 @@ impl<'a> CreateIndexBuilder<'a> { new_idx.name )) })?; - let segment_index_type = match self.index_type { - IndexType::Vector - | IndexType::IvfPq - | IndexType::IvfSq - | IndexType::IvfFlat - | IndexType::IvfRq - | IndexType::IvfHnswFlat - | IndexType::IvfHnswPq - | IndexType::IvfHnswSq => self - .params - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::index("Vector index type must take a VectorIndexParams".to_string()) - })? - .index_type(), - unsupported => { - return Err(Error::internal(format!( - "Segment commit path does not support index type {}", - unsupported - ))); - } - }; - let segments = self - .dataset - .create_index_segment_builder() - .with_index_type(segment_index_type) - .with_segments(vec![new_idx.clone()]) - .build_all() - .await?; + let index_name = new_idx.name.clone(); + let dataset_version = new_idx.dataset_version; + let segments = vec![new_idx.into_index_segment()?]; let new_indices = - build_index_metadata_from_segments(self.dataset, &new_idx.name, field_id, segments) + build_index_metadata_from_segments(self.dataset, &index_name, field_id, segments) .await?; TransactionBuilder::new( - new_idx.dataset_version, + dataset_version, Operation::CreateIndex { new_indices, removed_indices, @@ -591,164 +559,11 @@ impl<'a> IntoFuture for CreateIndexBuilder<'a> { } } -/// Build physical index segments from previously-written uncommitted index outputs. -/// -/// Use [`DatasetIndexExt::create_index_segment_builder`] and then either: -/// -/// - call [`Self::with_index_type`] with the concrete segment type first, then -/// - call [`Self::plan`] and orchestrate individual segment builds externally, or -/// - call [`Self::build_all`] to build all segments on the current node. -/// -/// This builder only builds physical segments. Publishing those segments as -/// a logical index still requires [`DatasetIndexExt::commit_existing_index_segments`]. -/// Together these two APIs form the canonical segment-based index build workflow. -#[derive(Clone)] -pub struct IndexSegmentBuilder<'a> { - dataset: &'a Dataset, - index_type: Option, - segments: Vec, - target_segment_bytes: Option, -} - -impl<'a> IndexSegmentBuilder<'a> { - pub(crate) fn new(dataset: &'a Dataset) -> Self { - Self { - dataset, - index_type: None, - segments: Vec::new(), - target_segment_bytes: None, - } - } - - /// Declare the concrete index type of the staged segments. - pub fn with_index_type(mut self, index_type: IndexType) -> Self { - self.index_type = Some(index_type); - self - } - - /// Provide the segment metadata returned by `execute_uncommitted()`. - /// - /// These segments must already exist in storage and must not have been - /// published into a logical index yet. - pub fn with_segments(mut self, segments: Vec) -> Self { - self.segments = segments; - self - } - - /// Set the target size, in bytes, for merged physical segments. - /// - /// When set, input segments will be grouped into larger physical segments - /// up to approximately this size. When unset, each input segment becomes - /// one physical segment. - pub fn with_target_segment_bytes(mut self, bytes: u64) -> Self { - self.target_segment_bytes = Some(bytes); - self - } - - /// Plan how input segments should be grouped into physical segments. - pub async fn plan(&self) -> Result> { - if self.segments.is_empty() { - return Err(Error::invalid_input( - "IndexSegmentBuilder requires at least one segment; \ - call with_segments(...) with execute_uncommitted() outputs" - .to_string(), - )); - } - let index_type = self.index_type.ok_or_else(|| { - Error::invalid_input( - "IndexSegmentBuilder requires an explicit index type; call with_index_type(...)" - .to_string(), - ) - })?; - let mut seen_segment_ids = HashSet::with_capacity(self.segments.len()); - for segment in &self.segments { - if !seen_segment_ids.insert(segment.uuid) { - return Err(Error::invalid_input(format!( - "IndexSegmentBuilder received duplicate segment uuid {}", - segment.uuid - ))); - } - } - - match index_type { - IndexType::Inverted => crate::index::scalar::inverted::plan_segments( - &self.segments, - self.target_segment_bytes, - ), - IndexType::Vector => { - crate::index::vector::ivf::plan_segments( - &self.segments, - Some(index_type), - self.target_segment_bytes, - ) - .await - } - IndexType::IvfFlat - | IndexType::IvfPq - | IndexType::IvfSq - | IndexType::IvfRq - | IndexType::IvfHnswFlat - | IndexType::IvfHnswPq - | IndexType::IvfHnswSq => { - crate::index::vector::ivf::plan_segments( - &self.segments, - Some(index_type), - self.target_segment_bytes, - ) - .await - } - unsupported => Err(Error::invalid_input(format!( - "IndexSegmentBuilder does not support planning segments for index type {}", - unsupported - ))), - } - } - - /// Build one segment from a previously-generated plan. - pub async fn build(&self, plan: &IndexSegmentPlan) -> Result { - match plan.requested_index_type().ok_or_else(|| { - Error::invalid_input( - "IndexSegmentBuilder requires planned segments to declare an index type" - .to_string(), - ) - })? { - IndexType::Inverted => { - crate::index::scalar::inverted::build_segment(self.dataset, plan).await - } - IndexType::Vector - | IndexType::IvfFlat - | IndexType::IvfPq - | IndexType::IvfSq - | IndexType::IvfRq - | IndexType::IvfHnswFlat - | IndexType::IvfHnswPq - | IndexType::IvfHnswSq => { - crate::index::vector::ivf::build_segment( - self.dataset.object_store.as_ref(), - &self.dataset.indices_dir(), - plan, - ) - .await - } - unsupported => Err(Error::invalid_input(format!( - "IndexSegmentBuilder does not support building segments for index type {}", - unsupported - ))), - } - } - - /// Plan and build all segments from the provided inputs. - pub async fn build_all(&self) -> Result> { - let plans = self.plan().await?; - try_join_all(plans.iter().map(|plan| self.build(plan))).await - } -} - #[cfg(test)] mod tests { use super::*; use crate::dataset::{WriteMode, WriteParams}; - use crate::index::DatasetIndexExt; + use crate::index::{DatasetIndexExt, IndexSegment}; use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount}; use arrow::datatypes::{Float32Type, Int32Type}; use arrow_array::cast::AsArray; @@ -1576,17 +1391,11 @@ mod tests { input_segments.push(segment); } - let segments = dataset - .create_index_segment_builder() - .with_index_type(params.index_type()) - .with_segments(input_segments.clone()) - .build_all() - .await - .unwrap(); + let segments = input_segments.clone(); assert_eq!(segments.len(), fragments.len()); let mut built_segment_ids = segments .iter() - .map(|segment| segment.uuid()) + .map(|segment| segment.uuid) .collect::>(); built_segment_ids.sort(); let mut input_segment_ids = input_segments @@ -1627,7 +1436,7 @@ mod tests { } #[tokio::test] - async fn test_index_segment_builder_vector_commits_multi_segment_logical_index() { + async fn test_commit_existing_index_segments_vector_commits_multi_segment_logical_index() { let tmpdir = TempStrDir::default(); let dataset_uri = format!("file://{}", tmpdir.as_str()); @@ -1672,13 +1481,7 @@ mod tests { input_segments.push(segment); } - let segments = dataset - .create_index_segment_builder() - .with_index_type(params.index_type()) - .with_segments(input_segments) - .build_all() - .await - .unwrap(); + let segments = input_segments; assert_eq!(segments.len(), 2); dataset @@ -1725,7 +1528,7 @@ mod tests { } #[tokio::test] - async fn test_index_segment_builder_vector_segments_without_index_details() { + async fn test_commit_existing_index_segments_rejects_vector_segments_without_index_details() { let tmpdir = TempStrDir::default(); let dataset_uri = format!("file://{}", tmpdir.as_str()); @@ -1771,18 +1574,18 @@ mod tests { input_segments.push(segment); } - let segments = dataset - .create_index_segment_builder() - .with_index_type(params.index_type()) - .with_segments(input_segments) - .build_all() + let err = dataset + .commit_existing_index_segments("vector_idx", "vector", input_segments) .await - .unwrap(); - assert_eq!(segments.len(), 2); + .unwrap_err(); + assert!( + err.to_string().contains("missing index details"), + "unexpected error: {err}" + ); } #[tokio::test] - async fn test_index_segment_builder_fts_commits_multi_segment_logical_index() { + async fn test_commit_existing_index_segments_finalizes_fts_segments() { let tmpdir = TempStrDir::default(); let dataset_uri = format!("file://{}", tmpdir.as_str()); @@ -1820,20 +1623,19 @@ mod tests { input_segments.push(segment); } - let segments = dataset - .create_index_segment_builder() - .with_index_type(IndexType::Inverted) - .with_segments(input_segments.clone()) - .build_all() + let segments = input_segments.clone(); + assert_eq!(segments.len(), input_segments.len()); + + dataset + .commit_existing_index_segments("text_idx", "text", segments) .await .unwrap(); - assert_eq!(segments.len(), input_segments.len()); - for segment in &segments { + for segment in &input_segments { let metadata_path = dataset .indices_dir() .clone() - .join(segment.uuid().to_string()) + .join(segment.uuid.to_string()) .join(lance_index::scalar::inverted::METADATA_FILE); assert!( dataset @@ -1845,11 +1647,6 @@ mod tests { ); } - dataset - .commit_existing_index_segments("text_idx", "text", segments) - .await - .unwrap(); - let indices = dataset.load_indices_by_name("text_idx").await.unwrap(); assert_eq!(indices.len(), input_segments.len()); } @@ -1933,131 +1730,6 @@ mod tests { assert_eq!(results.num_rows(), 20); } - #[tokio::test] - async fn test_index_segment_builder_rejects_duplicate_segment_uuids() { - let tmpdir = TempStrDir::default(); - let dataset_uri = format!("file://{}", tmpdir.as_str()); - - let batches = RecordBatchIterator::new( - vec![Ok(create_text_batch(0, 10))], - create_text_batch(0, 1).schema(), - ); - let mut dataset = Dataset::write( - batches, - &dataset_uri, - Some(WriteParams { - max_rows_per_file: 10, - mode: WriteMode::Overwrite, - ..Default::default() - }), - ) - .await - .unwrap(); - - let params = InvertedIndexParams::default(); - let segment = - CreateIndexBuilder::new(&mut dataset, &["text"], IndexType::Inverted, ¶ms) - .name("text_idx".to_string()) - .fragments(vec![0]) - .execute_uncommitted() - .await - .unwrap(); - - let err = dataset - .create_index_segment_builder() - .with_index_type(IndexType::Inverted) - .with_segments(vec![segment.clone(), segment]) - .build_all() - .await - .unwrap_err(); - assert!( - err.to_string().contains("duplicate segment uuid"), - "unexpected error: {err}" - ); - } - - #[tokio::test] - async fn test_index_segment_builder_requires_explicit_index_type() { - let tmpdir = TempStrDir::default(); - let dataset_uri = format!("file://{}", tmpdir.as_str()); - - let batches = RecordBatchIterator::new( - vec![Ok(create_text_batch(0, 10))], - create_text_batch(0, 1).schema(), - ); - let mut dataset = Dataset::write( - batches, - &dataset_uri, - Some(WriteParams { - max_rows_per_file: 10, - mode: WriteMode::Overwrite, - ..Default::default() - }), - ) - .await - .unwrap(); - - let params = InvertedIndexParams::default(); - let segment = - CreateIndexBuilder::new(&mut dataset, &["text"], IndexType::Inverted, ¶ms) - .name("text_idx".to_string()) - .fragments(vec![0]) - .execute_uncommitted() - .await - .unwrap(); - - let err = dataset - .create_index_segment_builder() - .with_segments(vec![segment]) - .plan() - .await - .unwrap_err(); - assert!( - err.to_string().contains("requires an explicit index type"), - "unexpected error: {err}" - ); - } - - #[tokio::test] - async fn test_index_segment_builder_requires_requested_index_type() { - let tmpdir = TempStrDir::default(); - let dataset_uri = format!("file://{}", tmpdir.as_str()); - - let batches = RecordBatchIterator::new( - vec![Ok(create_text_batch(0, 10))], - create_text_batch(0, 1).schema(), - ); - let dataset = Dataset::write( - batches, - &dataset_uri, - Some(WriteParams { - max_rows_per_file: 10, - mode: WriteMode::Overwrite, - ..Default::default() - }), - ) - .await - .unwrap(); - - let segment = IndexSegment::new( - Uuid::new_v4(), - [0_u32], - Arc::new(prost_types::Any::default()), - 0, - ); - let plan = IndexSegmentPlan::new(segment, Vec::new(), 0, None); - let err = dataset - .create_index_segment_builder() - .with_index_type(IndexType::Inverted) - .build(&plan) - .await - .unwrap_err(); - assert!( - err.to_string().contains("declare an index type"), - "unexpected error: {err}" - ); - } - #[tokio::test] async fn test_commit_existing_index_supports_local_hnsw_segments() { let tmpdir = TempStrDir::default(); diff --git a/rust/lance/src/index/scalar/inverted.rs b/rust/lance/src/index/scalar/inverted.rs index 8a77aa88353..44cf6ff2e08 100644 --- a/rust/lance/src/index/scalar/inverted.rs +++ b/rust/lance/src/index/scalar/inverted.rs @@ -12,8 +12,8 @@ use lance_core::ROW_ID; use lance_index::metrics::NoOpMetricsCollector; use lance_index::pbold::InvertedIndexDetails; use lance_index::scalar::inverted::InvertedIndex; +use lance_index::scalar::lance_format::LanceIndexStore; use lance_index::scalar::registry::VALUE_COLUMN_NAME; -use lance_index::{IndexType, scalar::lance_format::LanceIndexStore}; use lance_table::format::IndexMetadata; use prost::Message; use roaring::RoaringBitmap; @@ -22,11 +22,7 @@ use uuid::Uuid; use crate::{ Dataset, Error, Result, dataset::index::LanceIndexStoreExt, - index::{ - DatasetIndexExt, - api::{IndexSegment, IndexSegmentPlan}, - scalar::fetch_index_details, - }, + index::{DatasetIndexExt, scalar::fetch_index_details}, }; /// Build an empty update stream for the inverted merge API. @@ -54,7 +50,7 @@ fn empty_inverted_update_stream( ))) } -async fn finalize_segment_files_if_needed( +pub(crate) async fn finalize_segment_files_if_needed( dataset: &Dataset, segment: &IndexMetadata, ) -> Result<()> { @@ -79,88 +75,6 @@ async fn finalize_segment_files_if_needed( .await } -/// Plan physical segments for staged inverted-index outputs. -/// -/// Each staged inverted root remains its own physical segment for now. -pub(crate) fn plan_segments( - segments: &[IndexMetadata], - target_segment_bytes: Option, -) -> Result> { - if let Some(0) = target_segment_bytes { - return Err(Error::invalid_input( - "target_segment_bytes must be greater than zero".to_string(), - )); - } - if target_segment_bytes.is_some() && segments.len() > 1 { - // TODO: Support merging multiple staged inverted roots into one segment. - return Err(Error::invalid_input( - "Inverted segment builder does not yet support merging multiple source segments" - .to_string(), - )); - } - - segments - .iter() - .map(|segment| { - let fragment_bitmap = segment.fragment_bitmap.as_ref().ok_or_else(|| { - Error::index(format!( - "Segment '{}' is missing fragment coverage", - segment.uuid - )) - })?; - let index_details = segment.index_details.as_ref().ok_or_else(|| { - Error::index(format!( - "Segment '{}' is missing index details", - segment.uuid - )) - })?; - let built_segment = IndexSegment::new( - segment.uuid, - fragment_bitmap.iter(), - index_details.clone(), - segment.index_version, - ); - let estimated_bytes = segment - .files - .as_ref() - .map(|files| files.iter().map(|file| file.size_bytes).sum()) - .unwrap_or(0); - Ok(IndexSegmentPlan::new( - built_segment, - vec![segment.clone()], - estimated_bytes, - Some(IndexType::Inverted), - )) - }) - .collect() -} - -/// Finalize one staged inverted root into a commit-ready physical segment. -pub(crate) async fn build_segment( - dataset: &Dataset, - segment_plan: &IndexSegmentPlan, -) -> Result { - let built_segment = segment_plan.segment().clone(); - let source_segments = segment_plan.segments(); - if source_segments.len() != 1 { - // TODO: Support building one segment from multiple staged inverted roots. - return Err(Error::invalid_input( - "Inverted segment builder does not yet support merging multiple source segments" - .to_string(), - )); - } - let source_segment = &source_segments[0]; - if source_segment.uuid != built_segment.uuid() { - return Err(Error::invalid_input( - "Inverted segment builder requires the built segment UUID to match the staged source UUID" - .to_string(), - )); - } - - finalize_segment_files_if_needed(dataset, source_segment).await?; - Ok(built_segment) -} - /// Merge one caller-defined group of source FTS segments into a single segment. pub(crate) async fn merge_segments( dataset: &Dataset, diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index fa6f873ce8a..71bc03c3a82 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -17,13 +17,7 @@ use crate::index::DatasetIndexInternalExt; use crate::index::vector::utils::{get_vector_dim, get_vector_type}; use crate::{ dataset::Dataset, - index::{ - INDEX_FILE_NAME, - api::{IndexSegment, IndexSegmentPlan}, - pb, - prefilter::PreFilter, - vector::ivf::io::write_pq_partitions, - }, + index::{INDEX_FILE_NAME, pb, prefilter::PreFilter, vector::ivf::io::write_pq_partitions}, }; use crate::{dataset::builder::DatasetBuilder, index::vector::IndexFileVersion}; use arrow::datatypes::UInt8Type; @@ -108,11 +102,7 @@ use prost::Message; use roaring::RoaringBitmap; use serde::Serialize; use serde_json::json; -use std::{ - any::Any, - collections::{HashMap, HashSet}, - sync::Arc, -}; +use std::{any::Any, collections::HashMap, sync::Arc}; use tokio::sync::mpsc; use tracing::instrument; use uuid::Uuid; @@ -2070,128 +2060,6 @@ async fn write_ivf_hnsw_file( Ok(()) } -pub(crate) async fn plan_segments( - segments: &[TableIndexMetadata], - requested_index_type: Option, - target_segment_bytes: Option, -) -> Result> { - if let Some(index_type) = requested_index_type - && !matches!( - index_type, - IndexType::IvfFlat - | IndexType::IvfPq - | IndexType::IvfSq - | IndexType::IvfRq - | IndexType::IvfHnswFlat - | IndexType::IvfHnswPq - | IndexType::IvfHnswSq - | IndexType::Vector - ) - { - return Err(Error::invalid_input(format!( - "Unsupported distributed vector segment build type: {}", - index_type - ))); - } - - if let Some(0) = target_segment_bytes { - return Err(Error::invalid_input( - "target_segment_bytes must be greater than zero".to_string(), - )); - } - - if segments.is_empty() { - return Err(Error::index("No segment metadata was provided".to_string())); - } - - let mut sorted_segments = segments.to_vec(); - sorted_segments.sort_by_key(|index| index.uuid); - let mut expected_segment_ids = HashSet::with_capacity(sorted_segments.len()); - for segment in &sorted_segments { - if !expected_segment_ids.insert(segment.uuid) { - return Err(Error::index(format!( - "Distributed vector segment '{}' was provided more than once", - segment.uuid - ))); - } - } - - let mut covered_fragments = RoaringBitmap::new(); - for segment in &sorted_segments { - let fragment_bitmap = segment.fragment_bitmap.as_ref().ok_or_else(|| { - Error::index(format!( - "Segment '{}' is missing fragment coverage", - segment.uuid - )) - })?; - if covered_fragments.intersection_len(fragment_bitmap) > 0 { - return Err(Error::index( - "Distributed vector shards have overlapping fragment coverage".to_string(), - )); - } - covered_fragments |= fragment_bitmap.clone(); - } - - if target_segment_bytes.is_none() { - return sorted_segments - .into_iter() - .map(|segment| build_segment_plan(vec![segment], requested_index_type)) - .collect(); - } - - let target_segment_bytes = target_segment_bytes.unwrap(); - let mut plans = Vec::new(); - let mut current_group = Vec::new(); - let mut current_bytes = 0_u64; - - for segment in sorted_segments { - let source_bytes = estimate_source_index_bytes(&segment); - if !current_group.is_empty() - && current_bytes.saturating_add(source_bytes) > target_segment_bytes - { - plans.push(build_segment_plan( - std::mem::take(&mut current_group), - requested_index_type, - )?); - current_bytes = 0; - } - current_bytes = current_bytes.saturating_add(source_bytes); - current_group.push(segment); - } - - if !current_group.is_empty() { - plans.push(build_segment_plan(current_group, requested_index_type)?); - } - - Ok(plans) -} - -pub(crate) async fn build_segment( - object_store: &ObjectStore, - indices_dir: &Path, - segment_plan: &IndexSegmentPlan, -) -> Result { - let built_segment = segment_plan.segment().clone(); - let segments = segment_plan.segments(); - - if segments.len() == 1 && segments[0].uuid == built_segment.uuid() { - return Ok(built_segment); - } - - let final_dir = indices_dir.clone().join(built_segment.uuid().to_string()); - merge_segments_to_dir( - object_store, - indices_dir, - &final_dir, - segment_plan.segments(), - segment_plan.requested_index_type(), - lance_index::progress::noop_progress(), - ) - .await?; - - Ok(built_segment) -} - /// Merge one caller-defined group of source segments into a single segment. pub(crate) async fn merge_segments( object_store: &ObjectStore, @@ -2319,56 +2187,6 @@ async fn merge_segments_to_dir( Ok(()) } -fn build_segment_plan( - group: Vec, - requested_index_type: Option, -) -> Result { - debug_assert!(!group.is_empty()); - let first = &group[0]; - let mut fragment_bitmap = RoaringBitmap::new(); - let mut estimated_bytes = 0_u64; - let mut segments = Vec::with_capacity(group.len()); - - for segment in &group { - let source_fragment_bitmap = segment.fragment_bitmap.as_ref().ok_or_else(|| { - Error::index(format!( - "Segment '{}' is missing fragment coverage", - segment.uuid - )) - })?; - fragment_bitmap |= source_fragment_bitmap.clone(); - estimated_bytes = estimated_bytes.saturating_add(estimate_source_index_bytes(segment)); - segments.push(segment.clone()); - } - - let segment_uuid = if group.len() == 1 { - first.uuid - } else { - Uuid::new_v4() - }; - let index_version = match requested_index_type { - Some(index_type) => index_type.version(), - None => infer_source_index_version(&group)?, - }; - - // Legacy source segments may not carry index_details. Fall back to an empty - // placeholder; `needs_vector_details_inference` will pick this up on the - // next manifest load and populate the real details from the index files. - let index_details = match first.index_details.as_ref() { - Some(d) => d.clone(), - None => Arc::new(crate::index::vector::details::vector_index_details_default()), - }; - - let segment = IndexSegment::new(segment_uuid, fragment_bitmap, index_details, index_version); - - Ok(IndexSegmentPlan::new( - segment, - segments, - estimated_bytes, - requested_index_type, - )) -} - fn infer_source_index_version(group: &[TableIndexMetadata]) -> Result { debug_assert!(!group.is_empty()); let first = group[0].index_version; @@ -2380,14 +2198,6 @@ fn infer_source_index_version(group: &[TableIndexMetadata]) -> Result { Ok(first) } -fn estimate_source_index_bytes(index_metadata: &TableIndexMetadata) -> u64 { - index_metadata - .files - .as_ref() - .map(|files| files.iter().map(|file| file.size_bytes).sum()) - .unwrap_or(0) -} - /// Best-effort reset of one target directory before rewriting it. async fn reset_final_segment_dir(object_store: &ObjectStore, final_dir: &Path) -> Result<()> { match object_store.remove_dir_all(final_dir.clone()).await { diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 8b81b032037..feed72c9b91 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -2631,16 +2631,9 @@ mod tests { async fn build_distributed_segments( dataset: &mut Dataset, segments: Vec, - index_type: IndexType, + _index_type: IndexType, index_name: &str, - ) -> Vec { - let segments = dataset - .create_index_segment_builder() - .with_index_type(index_type) - .with_segments(segments) - .build_all() - .await - .unwrap(); + ) -> Vec { dataset .commit_existing_index_segments(index_name, "vector", segments.clone()) .await @@ -2863,7 +2856,7 @@ mod tests { let segment_index = ds_split .indices_dir() .clone() - .join(segment.uuid().to_string()) + .join(segment.uuid.to_string()) .join(crate::index::INDEX_FILE_NAME); assert!( ds_split @@ -3050,7 +3043,14 @@ mod tests { assert_eq!(grouped_segments.len(), expected_fragment_coverage.len()); let mut actual_fragment_coverage = grouped_segments .iter() - .map(|segment| segment.fragment_bitmap().iter().collect::>()) + .map(|segment| { + segment + .fragment_bitmap + .as_ref() + .expect("segment should have fragment coverage") + .iter() + .collect::>() + }) .collect::>(); actual_fragment_coverage.sort(); assert_eq!( @@ -3179,14 +3179,6 @@ mod tests { segments.push(segment); } - let segments = dataset - .create_index_segment_builder() - .with_index_type(IndexType::IvfHnswFlat) - .with_segments(segments) - .build_all() - .await - .unwrap(); - dataset .commit_existing_index_segments("vector_idx", "vector", segments) .await @@ -3256,15 +3248,8 @@ mod tests { ) .await .unwrap(); - let merged_segment = dataset - .create_index_segment_builder() - .with_index_type(params.index_type()) - .with_segments(vec![merged_segment]) - .build_all() - .await - .unwrap(); dataset - .commit_existing_index_segments(INDEX_NAME, "vector", merged_segment) + .commit_existing_index_segments(INDEX_NAME, "vector", vec![merged_segment]) .await .unwrap(); diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index 96129ea09fa..07130e47609 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -2381,14 +2381,7 @@ mod tests { .fragments(vec![fragment_id]); metadatas.push(builder.execute_uncommitted().await.unwrap()); } - let segments = ds - .create_index_segment_builder() - .with_index_type(IndexType::Inverted) - .with_segments(metadatas.clone()) - .build_all() - .await - .unwrap(); - ds.commit_existing_index_segments("seg_fts", "text", segments) + ds.commit_existing_index_segments("seg_fts", "text", metadatas.clone()) .await .unwrap(); assert_eq!( diff --git a/rust/lance/tests/query/inverted.rs b/rust/lance/tests/query/inverted.rs index da2a7181c34..4392278c0d8 100644 --- a/rust/lance/tests/query/inverted.rs +++ b/rust/lance/tests/query/inverted.rs @@ -211,14 +211,7 @@ async fn test_segmented_inverted_match_query() { .fragments(vec![fragment_id]); metadatas.push(builder.execute_uncommitted().await.unwrap()); } - let segments = ds - .create_index_segment_builder() - .with_index_type(IndexType::Inverted) - .with_segments(metadatas.clone()) - .build_all() - .await - .unwrap(); - ds.commit_existing_index_segments("segmented_fts", "text", segments) + ds.commit_existing_index_segments("segmented_fts", "text", metadatas.clone()) .await .unwrap(); assert!(metadatas.len() >= 2); @@ -288,14 +281,7 @@ async fn test_segmented_inverted_fuzzy_match_uses_global_idf() { .fragments(vec![fragment_id]); metadatas.push(builder.execute_uncommitted().await.unwrap()); } - let segments = ds - .create_index_segment_builder() - .with_index_type(IndexType::Inverted) - .with_segments(metadatas) - .build_all() - .await - .unwrap(); - ds.commit_existing_index_segments("segmented_fuzzy", "text", segments) + ds.commit_existing_index_segments("segmented_fuzzy", "text", metadatas) .await .unwrap(); @@ -374,14 +360,7 @@ async fn test_segmented_inverted_phrase_query() { .fragments(vec![fragment_id]); metadatas.push(builder.execute_uncommitted().await.unwrap()); } - let segments = ds - .create_index_segment_builder() - .with_index_type(IndexType::Inverted) - .with_segments(metadatas) - .build_all() - .await - .unwrap(); - ds.commit_existing_index_segments("segmented_phrase_fts", "text", segments) + ds.commit_existing_index_segments("segmented_phrase_fts", "text", metadatas) .await .unwrap(); @@ -444,14 +423,7 @@ async fn test_segmented_inverted_match_query_with_unindexed_fragments() { .fragments(vec![fragment_id]); metadatas.push(builder.execute_uncommitted().await.unwrap()); } - let segments = ds - .create_index_segment_builder() - .with_index_type(IndexType::Inverted) - .with_segments(metadatas) - .build_all() - .await - .unwrap(); - ds.commit_existing_index_segments("segmented_mixed_fts", "text", segments) + ds.commit_existing_index_segments("segmented_mixed_fts", "text", metadatas) .await .unwrap();