diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index d338ff0fc4b..305c862e1a7 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -49,7 +49,6 @@ use lance_file::version::LanceFileVersion; use lance_index::IndexCriteria as RustIndexCriteria; use lance_index::optimize::OptimizeOptions; use lance_index::progress::noop_progress; -use lance_index::scalar::btree::BTreeParameters; use lance_index::{IndexParams, IndexType}; use lance_io::object_store::ObjectStoreRegistry; use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsProvider}; @@ -980,7 +979,7 @@ fn inner_create_index<'local>( index_type: index_type_str, params: params_opt.clone(), }; - skip_commit = skip_commit || should_skip_commit(index_type, ¶ms_opt)?; + skip_commit = skip_commit || (index_type == IndexType::BTree && batch_reader.is_some()); Ok(Box::new(scalar_params)) } IndexType::FragmentReuse | IndexType::MemWal => { @@ -1060,20 +1059,6 @@ fn inner_drop_index(env: &mut JNIEnv, java_dataset: JObject, name: JString) -> R Ok(()) } -fn should_skip_commit(index_type: IndexType, params_opt: &Option) -> Result { - match index_type { - IndexType::BTree => { - // Should defer the commit if we are building range-based BTree index - if let Some(params) = params_opt { - let btree_parameters = serde_json::from_str::(params)?; - return Ok(btree_parameters.range_id.is_some()); - } - Ok(false) - } - _ => Ok(false), - } -} - #[unsafe(no_mangle)] pub extern "system" fn Java_org_lance_Dataset_innerMergeIndexMetadata<'local>( mut env: JNIEnv<'local>, diff --git a/java/src/main/java/org/lance/index/scalar/BTreeIndexParams.java b/java/src/main/java/org/lance/index/scalar/BTreeIndexParams.java index d72d5936d97..a7be46d2a85 100755 --- a/java/src/main/java/org/lance/index/scalar/BTreeIndexParams.java +++ b/java/src/main/java/org/lance/index/scalar/BTreeIndexParams.java @@ -59,7 +59,14 @@ public Builder zoneSize(long zoneSize) { * @param rangeId non-negative range identifier * @return this builder * @throws IllegalArgumentException + * @deprecated {@code rangeId} is no longer needed and is now ignored at build time (a warning + * is logged if set). A pre-sorted data stream is still supported — just pass the reader + * without a {@code rangeId}. Each build produces one canonical segment; for distributed + * builds, build one segment per worker and commit them with {@code + * commitExistingIndexSegments(...)}, optionally consolidating with {@code + * mergeExistingIndexSegments(...)}. This parameter will be removed in a future release. */ + @Deprecated public Builder rangeId(int rangeId) { if (rangeId < 0) { throw new IllegalArgumentException("rangeId must be non-negative"); diff --git a/java/src/test/java/org/lance/DatasetTest.java b/java/src/test/java/org/lance/DatasetTest.java index 315b010da1e..66d6d5d6687 100644 --- a/java/src/test/java/org/lance/DatasetTest.java +++ b/java/src/test/java/org/lance/DatasetTest.java @@ -2196,10 +2196,7 @@ void testDropIndex(@TempDir Path tempDir) { dataset.listIndexes().contains("test_index"), "Partially created index should not present"); - // 3. merge metadata, which will still not be committed - dataset.mergeIndexMetadata(uuid.toString(), IndexType.BTREE, Optional.empty()); - - // 4. commit the index + // 3. commit the index int fieldId = dataset.getLanceSchema().fields().stream() .filter(f -> f.getName().equals("name")) diff --git a/java/src/test/java/org/lance/index/ScalarIndexTest.java b/java/src/test/java/org/lance/index/ScalarIndexTest.java index 70ef43c853c..0884b70e4b5 100644 --- a/java/src/test/java/org/lance/index/ScalarIndexTest.java +++ b/java/src/test/java/org/lance/index/ScalarIndexTest.java @@ -13,16 +13,13 @@ */ package org.lance.index; -import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.Fragment; import org.lance.TestUtils; -import org.lance.Transaction; import org.lance.WriteParams; import org.lance.index.scalar.ScalarIndexParams; import org.lance.ipc.LanceScanner; import org.lance.ipc.ScanOptions; -import org.lance.operation.CreateIndex; import org.apache.arrow.c.ArrowArrayStream; import org.apache.arrow.c.Data; @@ -51,7 +48,6 @@ import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -117,68 +113,36 @@ public void testCreateBTreeIndexDistributively(@TempDir Path tempDir) throws Exc testDataset.write(1, 10).close(); try (Dataset dataset = testDataset.write(2, 10)) { List fragments = dataset.getFragments(); - assertEquals(2, dataset.getFragments().size()); + assertEquals(2, fragments.size()); ScalarIndexParams scalarParams = ScalarIndexParams.create("btree", "{\"zone_size\": 2048}"); IndexParams indexParams = IndexParams.builder().setScalarIndexParams(scalarParams).build(); - UUID uuid = UUID.randomUUID(); - - // 2. partially create index - dataset.createIndex( - IndexOptions.builder(Collections.singletonList("name"), IndexType.BTREE, indexParams) - .withIndexName("test_index") - .withIndexUUID(uuid.toString()) - .withFragmentIds(Collections.singletonList(fragments.get(0).getId())) - .build()); - dataset.createIndex( - IndexOptions.builder(Collections.singletonList("name"), IndexType.BTREE, indexParams) - .withIndexName("test_index") - .withIndexUUID(uuid.toString()) - .withFragmentIds(Collections.singletonList(fragments.get(1).getId())) - .build()); + String indexName = "test_index"; + + List segments = new ArrayList<>(); + for (Fragment fragment : fragments) { + segments.add( + dataset.createIndex( + IndexOptions.builder( + Collections.singletonList("name"), IndexType.BTREE, indexParams) + .withIndexName(indexName) + .withFragmentIds(Collections.singletonList(fragment.getId())) + .build())); + } - // then no index should have been created assertFalse( - dataset.listIndexes().contains("test_index"), + dataset.listIndexes().contains(indexName), "Partially created index should not present"); - // 3. merge metadata, which will still not be committed - dataset.mergeIndexMetadata(uuid.toString(), IndexType.BTREE, Optional.empty()); + List committed = dataset.commitExistingIndexSegments(indexName, "name", segments); + assertEquals(2, committed.size()); + assertTrue(dataset.listIndexes().contains(indexName)); - // 4. commit the index - int fieldId = - dataset.getLanceSchema().fields().stream() - .filter(f -> f.getName().equals("name")) - .findAny() - .orElseThrow(() -> new RuntimeException("Cannot find 'name' field for TestDataset")) - .getId(); - - long datasetVersion = dataset.version(); - - Index index = - Index.builder() - .uuid(uuid) - .name("test_index") - .fields(Collections.singletonList(fieldId)) - .datasetVersion(datasetVersion) - .indexVersion(0) - .fragments(fragments.stream().map(Fragment::getId).collect(Collectors.toList())) - .build(); - - CreateIndex createIndexOp = - CreateIndex.builder().withNewIndices(Collections.singletonList(index)).build(); - - try (Transaction createIndexTx = - new Transaction.Builder() - .readVersion(datasetVersion) - .operation(createIndexOp) - .build()) { - try (Dataset newDataset = new CommitBuilder(dataset).execute(createIndexTx)) { - // new dataset should contain that index - assertEquals(datasetVersion + 1, newDataset.version()); - assertTrue(newDataset.listIndexes().contains("test_index")); - } - } + assertEquals(2, dataset.countIndexedRows(indexName, "name = 'Person 5'", Optional.empty())); + assertEquals( + 10, + dataset.countIndexedRows( + indexName, "name >= 'Person 3' AND name < 'Person 8'", Optional.empty())); } } } @@ -186,108 +150,58 @@ public void testCreateBTreeIndexDistributively(@TempDir Path tempDir) throws Exc @Test public void testRangedBTreeIndex(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("ranged_btree_map").toString(); - UUID indexUUID = UUID.randomUUID(); try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath); testDataset.createEmptyDataset().close(); - // 1. write some data - try (Dataset dataset = testDataset.write(1, 200)) { - - // 2. scan data out - List data = new ArrayList<>(); - try (LanceScanner scanner = - dataset.newScan( - new ScanOptions.Builder() - .withRowId(true) - .columns(Collections.singletonList("id")) - .build()); - ArrowReader arrowReader = scanner.scanBatches(); ) { - while (arrowReader.loadNextBatch()) { - VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); - UInt8Vector rowIdVec = (UInt8Vector) root.getVector("_rowid"); - IntVector idVec = (IntVector) root.getVector("id"); - for (int i = 0; i < root.getRowCount(); i++) { - data.add(new long[] {idVec.get(i), rowIdVec.get(i)}); - } - } - } - - // 3. sort data globally (This will be done by computing engines in production) - data.sort((d1, d2) -> (int) (d1[0] - d2[0])); - int mid = data.size() / 2; - - // 4. divide sorted data into ranges and build index for each range - createBtreeIndexForRange(dataset, data.subList(0, mid), 1, allocator, indexUUID); - createBtreeIndexForRange(dataset, data.subList(mid, data.size()), 2, allocator, indexUUID); - - // 5. merge index. - dataset.mergeIndexMetadata(indexUUID.toString(), IndexType.BTREE, Optional.empty()); - - // 6. commit index - long datasetVersion = dataset.version(); - int fieldId = - dataset.getLanceSchema().fields().stream() - .filter(f -> f.getName().equals("id")) - .findAny() - .orElseThrow(() -> new RuntimeException("Cannot find 'id' field for TestDataset")) - .getId(); - Index index = - Index.builder() - .uuid(indexUUID) - .name("test_index") - .fields(Collections.singletonList(fieldId)) - .datasetVersion(datasetVersion) - .indexVersion(0) - .fragments( - dataset.getFragments().stream() - .map(Fragment::getId) - .collect(Collectors.toList())) - .build(); - - CreateIndex createIndexOp = - CreateIndex.builder().withNewIndices(Collections.singletonList(index)).build(); - - try (Transaction createIndexTx = - new Transaction.Builder() - .readVersion(datasetVersion) - .operation(createIndexOp) - .build()) { - try (Dataset newDataset = new CommitBuilder(dataset).execute(createIndexTx)) { - // new dataset should contain that index - assertEquals(datasetVersion + 1, newDataset.version()); - assertTrue(newDataset.listIndexes().contains("test_index")); - - // 7. compare results - // force use index should get the right value - ScanOptions scanOptions = - new ScanOptions.Builder().withRowId(true).filter("id in (10, 20, 30)").build(); - try (LanceScanner scanner = newDataset.newScan(scanOptions); - ArrowReader arrowReader = scanner.scanBatches(); ) { - List ids = new ArrayList<>(); - while (arrowReader.loadNextBatch()) { - VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); - IntVector idVec = (IntVector) root.getVector("id"); - for (int i = 0; i < idVec.getValueCount(); i++) { - ids.add(idVec.get(i)); - } + testDataset.write(1, 100).close(); + try (Dataset dataset = testDataset.write(2, 100)) { + List fragments = dataset.getFragments(); + assertEquals(2, fragments.size()); + + List segments = new ArrayList<>(); + for (Fragment fragment : fragments) { + List data = new ArrayList<>(); + try (LanceScanner scanner = + dataset.newScan( + new ScanOptions.Builder() + .fragmentIds(Collections.singletonList(fragment.getId())) + .withRowId(true) + .columns(Collections.singletonList("id")) + .build()); + ArrowReader arrowReader = scanner.scanBatches(); ) { + while (arrowReader.loadNextBatch()) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + UInt8Vector rowIdVec = (UInt8Vector) root.getVector("_rowid"); + IntVector idVec = (IntVector) root.getVector("id"); + for (int i = 0; i < root.getRowCount(); i++) { + data.add(new long[] {idVec.get(i), rowIdVec.get(i)}); } - Collections.sort(ids); - Assertions.assertIterableEquals(Arrays.asList(10, 20, 30), ids); } } + + data.sort((d1, d2) -> Long.compare(d1[0], d2[0])); + segments.add(createBtreeIndexFromPreprocessedData(dataset, data, fragment, allocator)); } + + String indexName = "test_index"; + List committed = dataset.commitExistingIndexSegments(indexName, "id", segments); + assertEquals(2, committed.size()); + assertTrue(dataset.listIndexes().contains(indexName)); + + assertEquals( + 6, dataset.countIndexedRows(indexName, "id in (10, 20, 30)", Optional.empty())); + assertEquals( + 20, dataset.countIndexedRows(indexName, "id >= 50 AND id < 60", Optional.empty())); } } } - private void createBtreeIndexForRange( + private Index createBtreeIndexFromPreprocessedData( Dataset dataset, List preprocessedData, - int rangeId, - BufferAllocator allocator, - UUID indexUUID) { - // Note that the indexing column is called 'value' in btree. + Fragment fragment, + BufferAllocator allocator) { Schema schema = new Schema( Arrays.asList( @@ -300,7 +214,7 @@ private void createBtreeIndexForRange( UInt8Vector rowIdVec = (UInt8Vector) root.getVector("_rowid"); for (int i = 0; i < preprocessedData.size(); i++) { long[] dataPair = preprocessedData.get(i); - idVec.set(i, (int) dataPair[0]); + idVec.setSafe(i, (int) dataPair[0]); rowIdVec.setSafe(i, dataPair[1]); } root.setRowCount(preprocessedData.size()); @@ -321,12 +235,12 @@ private void createBtreeIndexForRange( ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { Data.exportArrayStream(allocator, reader, stream); - ScalarIndexParams scalarParams = - ScalarIndexParams.create("btree", String.format("{\"range_id\": %s}", rangeId)); + ScalarIndexParams scalarParams = ScalarIndexParams.create("btree", "{\"zone_size\": 64}"); IndexParams indexParams = IndexParams.builder().setScalarIndexParams(scalarParams).build(); - dataset.createIndex( + return dataset.createIndex( IndexOptions.builder(Collections.singletonList("id"), IndexType.BTREE, indexParams) - .withIndexUUID(indexUUID.toString()) + .withIndexName("test_index") + .withFragmentIds(Collections.singletonList(fragment.getId())) .withPreprocessedData(stream) .build()); } catch (Exception e) { @@ -335,6 +249,42 @@ private void createBtreeIndexForRange( } } + @Test + public void testBtreeMergeIndexMetadataSoftBreak(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("btree_merge_metadata_soft_break").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + testDataset.write(1, 10).close(); + try (Dataset dataset = testDataset.write(2, 10)) { + List fragments = dataset.getFragments(); + assertEquals(2, fragments.size()); + + ScalarIndexParams scalarParams = ScalarIndexParams.create("btree", "{\"zone_size\": 2048}"); + IndexParams indexParams = IndexParams.builder().setScalarIndexParams(scalarParams).build(); + UUID uuid = UUID.randomUUID(); + + dataset.createIndex( + IndexOptions.builder(Collections.singletonList("name"), IndexType.BTREE, indexParams) + .withIndexName("test_index") + .withIndexUUID(uuid.toString()) + .withFragmentIds(Collections.singletonList(fragments.get(0).getId())) + .build()); + + Exception ex = + Assertions.assertThrows( + Exception.class, + () -> + dataset.mergeIndexMetadata(uuid.toString(), IndexType.BTREE, Optional.empty())); + assertTrue( + ex.getMessage() != null + && ex.getMessage().contains("no longer supports merge_index_metadata"), + "expected BTree merge_index_metadata soft-break error, got: " + ex.getMessage()); + } + } + } + @Test public void testCreateZonemapIndex(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("zonemap_test").toString(); diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index eeb2dacff6a..07be193decb 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2952,6 +2952,107 @@ def cleanup_old_versions( delete_rate_limit, ) + def _prepare_scalar_index_request( + self, + column: Union[str, List[str]], + index_type: Union[str, IndexConfig], + kwargs: dict, + ) -> tuple[str, str, str]: + """Validate and normalize a scalar-index request for the native + ``create_index`` call. + """ + if isinstance(column, str): + column = [column] + + if len(column) > 1: + raise NotImplementedError( + "Scalar indices currently only support a single column" + ) + + column = column[0] + lance_field = self._ds.lance_schema.field_case_insensitive(column) + if lance_field is None: + raise KeyError(f"{column} not found in schema") + + if isinstance(index_type, str): + index_type = index_type.upper() + if index_type not in [ + "BTREE", + "BITMAP", + "NGRAM", + "ZONEMAP", + "LABEL_LIST", + "INVERTED", + "FTS", + "BLOOMFILTER", + "RTREE", + ]: + raise NotImplementedError( + ( + 'Only "BTREE", "BITMAP", "NGRAM", "ZONEMAP", "LABEL_LIST", ' + '"INVERTED", "BLOOMFILTER" or "RTREE" are supported for ' + f"scalar columns. Received {index_type}", + ) + ) + + field = lance_field.to_arrow() + + field_type = field.type + field_meta = field.metadata + if hasattr(field_type, "storage_type"): + field_type = field_type.storage_type + + if index_type in ["BTREE", "BITMAP", "ZONEMAP"]: + if ( + not pa.types.is_integer(field_type) + and not pa.types.is_floating(field_type) + and not pa.types.is_boolean(field_type) + and not pa.types.is_string(field_type) + and not pa.types.is_temporal(field_type) + and not pa.types.is_fixed_size_binary(field_type) + ): + raise TypeError( + f"BTREE/BITMAP index column {column} must be int", + ", float, bool, str, fixed-size-binary, or temporal ", + ) + elif index_type == "LABEL_LIST": + if not pa.types.is_list(field_type): + raise TypeError(f"LABEL_LIST index column {column} must be a list") + elif index_type == "NGRAM": + if not pa.types.is_string(field_type) and not pa.types.is_large_string( + field_type + ): + raise TypeError(f"NGRAM index column {column} must be a string") + elif index_type in ["INVERTED", "FTS"]: + value_type = field_type + if pa.types.is_list(field_type) or pa.types.is_large_list(field_type): + value_type = field_type.value_type + if ( + not pa.types.is_string(value_type) + and not pa.types.is_large_string(value_type) + and not ( + pa.types.is_large_binary(value_type) + and field_meta[b"ARROW:extension:name"] == b"lance.json" + ) + ): + raise TypeError( + f"INVERTED index column {column} must be string, large string" + f" or list of strings, or json, but got {value_type}" + ) + + if pa.types.is_duration(field_type): + raise TypeError( + f"Scalar index column {column} cannot currently be a duration" + ) + return column, index_type, index_type + elif isinstance(index_type, IndexConfig): + logical_index_type = index_type.index_type.upper() + config = json.dumps(index_type.parameters) + kwargs["config"] = indices.IndexConfig(index_type.index_type, config) + return column, "scalar", logical_index_type + else: + raise Exception("index_type must be str or IndexConfig") + def create_scalar_index( self, column: str, @@ -3065,11 +3166,11 @@ def create_scalar_index( structure. If False, an empty index will be created that can be populated later. fragment_ids : List[int], optional - If provided, the index will be created only on the specified fragments. - This enables distributed/fragment-level indexing. When provided, the - method returns metadata for one segment but does not commit - the index to the dataset. The segment can be planned, merged, and - committed later using the segment builder and commit APIs. + If provided, the index will be created only on the specified fragments + using the legacy distributed scalar-index path. BTREE indices do not + support this path; build distributed BTREE segments with + :meth:`create_index_uncommitted` (``index_type="BTREE"``) and publish + them with :meth:`commit_existing_index_segments` instead. This parameter is passed via kwargs internally. index_uuid : str, optional A UUID to use for the segment written by this call. @@ -3164,98 +3265,16 @@ def create_scalar_index( ``MaterializeIndex`` operator. """ - if isinstance(column, str): - column = [column] + column, index_type, logical_index_type = self._prepare_scalar_index_request( + column, index_type, kwargs + ) - if len(column) > 1: - raise NotImplementedError( - "Scalar indices currently only support a single column" + if fragment_ids is not None and logical_index_type == "BTREE": + raise ValueError( + "BTree distributed indexing uses create_index_uncommitted(..., " + 'index_type="BTREE", fragment_ids=...)' ) - column = column[0] - lance_field = self._ds.lance_schema.field_case_insensitive(column) - if lance_field is None: - raise KeyError(f"{column} not found in schema") - - # TODO: Add documentation of IndexConfig approach for creating - # indexes that need parameterization - if isinstance(index_type, str): - index_type = index_type.upper() - if index_type not in [ - "BTREE", - "BITMAP", - "NGRAM", - "ZONEMAP", - "LABEL_LIST", - "INVERTED", - "FTS", - "BLOOMFILTER", - "RTREE", - ]: - raise NotImplementedError( - ( - 'Only "BTREE", "BITMAP", "NGRAM", "ZONEMAP", "LABEL_LIST", ' - '"INVERTED", "BLOOMFILTER" or "RTREE" are supported for ' - f"scalar columns. Received {index_type}", - ) - ) - - field = lance_field.to_arrow() - - field_type = field.type - field_meta = field.metadata - if hasattr(field_type, "storage_type"): - field_type = field_type.storage_type - - if index_type in ["BTREE", "BITMAP", "ZONEMAP"]: - if ( - not pa.types.is_integer(field_type) - and not pa.types.is_floating(field_type) - and not pa.types.is_boolean(field_type) - and not pa.types.is_string(field_type) - and not pa.types.is_temporal(field_type) - and not pa.types.is_fixed_size_binary(field_type) - ): - raise TypeError( - f"BTREE/BITMAP index column {column} must be int", - ", float, bool, str, fixed-size-binary, or temporal ", - ) - elif index_type == "LABEL_LIST": - if not pa.types.is_list(field_type): - raise TypeError(f"LABEL_LIST index column {column} must be a list") - elif index_type == "NGRAM": - if not pa.types.is_string(field_type) and not pa.types.is_large_string( - field_type - ): - raise TypeError(f"NGRAM index column {column} must be a string") - elif index_type in ["INVERTED", "FTS"]: - value_type = field_type - if pa.types.is_list(field_type) or pa.types.is_large_list(field_type): - value_type = field_type.value_type - if ( - not pa.types.is_string(value_type) - and not pa.types.is_large_string(value_type) - and not ( - pa.types.is_large_binary(value_type) - and field_meta[b"ARROW:extension:name"] == b"lance.json" - ) - ): - raise TypeError( - f"INVERTED index column {column} must be string, large string" - f" or list of strings, or json, but got {value_type}" - ) - - if pa.types.is_duration(field_type): - raise TypeError( - f"Scalar index column {column} cannot currently be a duration" - ) - elif isinstance(index_type, IndexConfig): - config = json.dumps(index_type.parameters) - kwargs["config"] = indices.IndexConfig(index_type.index_type, config) - index_type = "scalar" - else: - raise Exception("index_type must be str or IndexConfig") - # Add fragment_ids and index_uuid to kwargs if provided if fragment_ids is not None: kwargs["fragment_ids"] = fragment_ids @@ -3915,10 +3934,10 @@ def create_index_uncommitted( """ Create one segment without publishing it and return its metadata. - This is the public distributed-build API for vector index - construction. Unlike :meth:`create_index`, this method does not publish - the index into the dataset manifest. Instead, it writes one segment - under ``_indices//`` and returns the resulting + This is the public distributed-build API for vector and BTREE scalar + index construction. Unlike :meth:`create_index`, this method does not + publish the index into the dataset manifest. Instead, it writes one + segment under ``_indices//`` and returns the resulting :class:`Index` metadata. Callers should: @@ -3932,6 +3951,10 @@ def create_index_uncommitted( 5. build one or more physical segments and commit them with :meth:`commit_existing_index_segments` + BTREE segments do not yet support the segment builder (steps 3-4); collect + the returned segments and pass them straight to + :meth:`commit_existing_index_segments`. + Parameters are the same as :meth:`create_index`, with one additional requirement: @@ -3942,6 +3965,35 @@ def create_index_uncommitted( Index Metadata for the segment that was written by this call. """ + is_btree_request = ( + isinstance(index_type, str) and index_type.upper() == "BTREE" + ) or ( + isinstance(index_type, IndexConfig) + and index_type.index_type.upper() == "BTREE" + ) + if is_btree_request: + if fragment_ids is None: + raise ValueError( + "create_index_uncommitted requires fragment_ids " + "for distributed index build" + ) + + kwargs = dict(kwargs) + column, rust_index_type, _ = self._prepare_scalar_index_request( + column, index_type, kwargs + ) + kwargs["fragment_ids"] = fragment_ids + + return self._ds.create_index( + [column], + rust_index_type, + name, + replace, + train, + storage_options, + kwargs, + ) + return self._create_index_impl( column, index_type, diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index 7b4dede319b..11dca24c499 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -95,87 +95,48 @@ def data_table(indexed_dataset: lance.LanceDataset): return indexed_dataset.scanner().to_table() +def _commit_segmented_btree_index(dataset, column, index_name): + segments = [ + dataset.create_index_uncommitted( + column=column, + index_type="BTREE", + name=index_name, + fragment_ids=[fragment.fragment_id], + ) + for fragment in dataset.get_fragments() + ] + return dataset.commit_existing_index_segments(index_name, column, segments) + + @pytest.fixture def btree_comparison_datasets(tmp_path): """Setup datasets for B-tree comparison tests""" - # Test configuration num_fragments = 3 rows_per_fragment = 10000 total_rows = num_fragments * rows_per_fragment - # Create dataset for fragment-level indexing fragment_ds = generate_multi_fragment_dataset( tmp_path / "fragment", num_fragments=num_fragments, rows_per_fragment=rows_per_fragment, ) - # Create dataset for complete indexing (same data structure) complete_ds = generate_multi_fragment_dataset( tmp_path / "complete", num_fragments=num_fragments, rows_per_fragment=rows_per_fragment, ) - import uuid - - # Build fragment-level B-tree index - fragment_index_id = str(uuid.uuid4()) - fragment_index_name = "fragment_btree_precise_test" - - fragments = fragment_ds.get_fragments() - fragment_ids = [fragment.fragment_id for fragment in fragments] - - # Create fragment-level indices - for fragment in fragments: - fragment_id = fragment.fragment_id - - fragment_ds.create_scalar_index( - column="id", - index_type="BTREE", - name=fragment_index_name, - replace=False, - index_uuid=fragment_index_id, - fragment_ids=[fragment_id], - ) - - # Merge fragment indices - fragment_ds.merge_index_metadata(fragment_index_id, index_type="BTREE") - - # Create Index object for fragment-based index - from lance.dataset import Index - - field_id = fragment_ds.schema.get_field_index("id") - - fragment_index = Index( - uuid=fragment_index_id, - name=fragment_index_name, - fields=[field_id], - dataset_version=fragment_ds.version, - fragment_ids=set(fragment_ids), - index_version=0, + fragment_ds_committed = _commit_segmented_btree_index( + fragment_ds, "id", "fragment_btree_precise_test" ) - # Commit fragment-based index - create_fragment_index_op = lance.LanceOperation.CreateIndex( - new_indices=[fragment_index], - removed_indices=[], - ) - - fragment_ds_committed = lance.LanceDataset.commit( - fragment_ds.uri, - create_fragment_index_op, - read_version=fragment_ds.version, - ) - - # Build complete B-tree index complete_index_name = f"complete_btree_{uuid.uuid4().hex[:8]}" complete_ds.create_scalar_index( column="id", index_type="BTREE", name=complete_index_name, ) - # Reload the dataset to get the indexed version complete_ds = lance.dataset(complete_ds.uri) return { @@ -3761,88 +3722,22 @@ def test_backward_compatibility_changed_index_protos(tmp_path): def test_distribute_btree_index_build(tmp_path): """ - Test distributed B-tree index build similar to test_distribute_fts_index_build. - This test creates B-tree indices on individual fragments and then - commits them as a single index. + Test distributed B-tree index build with segmented index commit. + This test creates B-tree segments on individual fragments and then + commits them as a single logical index. """ - # Generate test dataset with multiple fragments ds = generate_multi_fragment_dataset( tmp_path, num_fragments=4, rows_per_fragment=10000 ) - import uuid - - index_id = str(uuid.uuid4()) index_name = "btree_multiple_fragment_idx" + ds_committed = _commit_segmented_btree_index(ds, "id", index_name) - fragments = ds.get_fragments() - fragment_ids = [fragment.fragment_id for fragment in fragments] - - for fragment in ds.get_fragments(): - fragment_id = fragment.fragment_id - - # Create B-tree scalar index for each fragment - # Use the same index_name for all fragments (like in FTS test) - ds.create_scalar_index( - column="id", # Use integer column for B-tree - index_type="BTREE", - name=index_name, - replace=False, - index_uuid=index_id, - fragment_ids=[fragment_id], - ) - - # test that the dataset should be searchable - # when the index not committed yet - # Test that the index works for searching - # Test exact equality queries - test_id = 100 # Should be in first fragment - results = ds.scanner( - filter=f"id = {test_id}", - columns=["id", "text"], - ).to_table() - - assert results.num_rows == 1, f"No results found for id = {test_id}" - - # Merge the B-tree index metadata - ds.merge_index_metadata(index_id, index_type="BTREE") - - # Create an Index object using the new dataclass format - from lance.dataset import Index - - # Get the schema field for the indexed column - field_id = ds.schema.get_field_index("id") - - index = Index( - uuid=index_id, - name=index_name, - fields=[field_id], # Use field index instead of field object - dataset_version=ds.version, - fragment_ids=set(fragment_ids), - index_version=0, - ) - - # Create the index operation - create_index_op = lance.LanceOperation.CreateIndex( - new_indices=[index], - removed_indices=[], - ) - - # Commit the index - ds_committed = lance.LanceDataset.commit( - ds.uri, - create_index_op, - read_version=ds.version, - ) - - # Verify the index was created and is functional stats = ds_committed.stats.index_stats(index_name) assert stats["name"] == index_name assert stats["index_type"] == "BTree" - # Test that the index works for searching - # Test exact equality queries - test_id = 100 # Should be in first fragment + test_id = 100 results = ds_committed.scanner( filter=f"id = {test_id}", columns=["id", "text"], @@ -3850,7 +3745,6 @@ def test_distribute_btree_index_build(tmp_path): assert results.num_rows == 1, f"No results found for id = {test_id}" - # Test range queries across fragments results_range = ds_committed.scanner( filter="id >= 200 AND id < 800", columns=["id", "text"], @@ -3858,20 +3752,16 @@ def test_distribute_btree_index_build(tmp_path): assert results_range.num_rows > 0, "No results found for range query" - # Compare with complete index results to ensure consistency - # Create a reference dataset with complete index reference_ds = generate_multi_fragment_dataset( tmp_path / "reference", num_fragments=4, rows_per_fragment=10000 ) - # Create complete B-tree index for comparison reference_ds.create_scalar_index( column="id", index_type="BTREE", name="reference_btree_idx", ) - # Compare exact query results reference_results = reference_ds.scanner( filter=f"id = {test_id}", columns=["id", "text"], @@ -3882,7 +3772,6 @@ def test_distribute_btree_index_build(tmp_path): f"but complete index returned {reference_results.num_rows} results" ) - # Compare range query results reference_range_results = reference_ds.scanner( filter="id >= 200 AND id < 800", columns=["id", "text"], @@ -4010,6 +3899,15 @@ def test_distributed_bitmap_index_build_single_fragment_shards(tmp_path): _assert_committed_distributed_bitmap_index(ds, index_id, index_name, fragment_ids) +def test_merge_index_metadata_btree_soft_break(tmp_path): + ds = generate_multi_fragment_dataset( + tmp_path, num_fragments=2, rows_per_fragment=100 + ) + + with pytest.raises(ValueError, match="no longer supports merge_index_metadata"): + ds.merge_index_metadata(str(uuid.uuid4()), index_type="BTREE") + + def test_btree_fragment_ids_parameter_validation(tmp_path): """ Test validation of fragment_ids parameter for B-tree indices. @@ -4018,27 +3916,26 @@ def test_btree_fragment_ids_parameter_validation(tmp_path): tmp_path, num_fragments=2, rows_per_fragment=10000 ) - # Test with valid fragment IDs fragments = ds.get_fragments() valid_fragment_id = fragments[0].fragment_id - # This should work without errors - ds.create_scalar_index( - column="id", - index_type="BTREE", - fragment_ids=[valid_fragment_id], - ) - - # Test with invalid fragment ID (should handle gracefully) - try: + # create_scalar_index no longer accepts fragment_ids for BTREE; distributed + # builds must go through the segmented create_index_uncommitted path. + with pytest.raises(ValueError, match="create_index_uncommitted"): ds.create_scalar_index( column="id", index_type="BTREE", - fragment_ids=[999999], # Non-existent fragment ID + fragment_ids=[valid_fragment_id], ) - except Exception as e: - # It's acceptable for this to fail with an appropriate error - print(f"Expected error for invalid fragment ID: {e}") + + # Building one uncommitted segment for a valid fragment should work and + # return the segment metadata without committing it. + segment = ds.create_index_uncommitted( + column="id", + index_type="BTREE", + fragment_ids=[valid_fragment_id], + ) + assert segment.fragment_ids == {valid_fragment_id} @pytest.mark.parametrize( @@ -4078,27 +3975,23 @@ def test_btree_query_comparison_parametrized( btree_comparison_datasets, test_name, filter_expr ): """ - Parametrized B-tree index query comparison test + Parametrized B-tree index query comparison test. - Convert the original loop test to parametrized test, - each test case runs independently + Compares segmented fragment-built BTree results with a complete BTree index. """ fragment_ds = btree_comparison_datasets["fragment_ds"] complete_ds = btree_comparison_datasets["complete_ds"] - # Query fragment-based index fragment_results = fragment_ds.scanner( filter=filter_expr, columns=["id", "text"], ).to_table() - # Query complete index complete_results = complete_ds.scanner( filter=filter_expr, columns=["id", "text"], ).to_table() - # Compare row counts assert fragment_results.num_rows == complete_results.num_rows, ( f"Test '{test_name}' failed: Fragment index " f"returned {fragment_results.num_rows} rows, " @@ -4106,9 +3999,7 @@ def test_btree_query_comparison_parametrized( f" rows for filter: {filter_expr}" ) - # Compare actual results if there are any if fragment_results.num_rows > 0: - # Sort both results by id for comparison fragment_ids = sorted(fragment_results.column("id").to_pylist()) complete_ids = sorted(complete_results.column("id").to_pylist()) diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 6de490b9572..ba6d3dd142d 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -58,10 +58,9 @@ use lance_datafusion::{ chunker::chunk_concat_stream, exec::{LanceExecutionOptions, OneShotExec, execute_plan}, }; -use lance_io::object_store::ObjectStore; use lance_select::NullableRowAddrSet; use log::{debug, warn}; -use object_store::{Error as ObjectStoreError, path::Path}; +use object_store::Error as ObjectStoreError; use rangemap::RangeInclusiveMap; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize, Serializer}; @@ -2141,66 +2140,6 @@ pub async fn train_btree_index( Ok(()) } -pub async fn merge_index_files( - object_store: &ObjectStore, - index_dir: &Path, - store: Arc, - batch_readhead: Option, - progress: Arc, -) -> Result<()> { - // List all partition page / lookup files in the index directory - let (part_page_files, part_lookup_files) = - list_page_lookup_files(object_store, index_dir).await?; - merge_metadata_files( - store.as_ref(), - &part_page_files, - &part_lookup_files, - batch_readhead, - progress, - ) - .await -} - -/// List and filter files from the index directory -/// Returns (page_files, lookup_files) -async fn list_page_lookup_files( - object_store: &ObjectStore, - index_dir: &Path, -) -> Result<(Vec, Vec)> { - let mut part_page_files = Vec::new(); - let mut part_lookup_files = Vec::new(); - - let mut list_stream = object_store.list(Some(index_dir.clone())); - - while let Some(item) = list_stream.next().await { - match item { - Ok(meta) => { - let file_name = meta.location.filename().unwrap_or_default(); - // Filter files matching the pattern part_*_page_data.lance - if file_name.starts_with("part_") && file_name.ends_with("_page_data.lance") { - part_page_files.push(file_name.to_string()); - } - // Filter files matching the pattern part_*_page_lookup.lance - if file_name.starts_with("part_") && file_name.ends_with("_page_lookup.lance") { - part_lookup_files.push(file_name.to_string()); - } - } - Err(_) => continue, - } - } - - if part_page_files.is_empty() || part_lookup_files.is_empty() { - return Err(Error::internal(format!( - "No partition metadata files found in index directory: {} (page_files: {}, lookup_files: {})", - index_dir, - part_page_files.len(), - part_lookup_files.len() - ))); - } - - Ok((part_page_files, part_lookup_files)) -} - fn find_single_partition_files( files: &[lance_table::format::IndexFile], ) -> Result> { @@ -2803,29 +2742,19 @@ pub struct BTreeParameters { /// The number of rows to include in each zone pub zone_size: Option, - /// The ordinal ID of a data partition for building a large, distributed BTree index. - /// - /// When building an index from multiple, pre-partitioned data chunks (for example, - /// in a distributed environment), this ID specifies which partition this particular - /// build operation corresponds to. - /// - /// # Data Distribution Requirements - /// - /// If this parameter is `Some(id)`, the caller **must** guarantee that the input data - /// is strictly global sorted. The input data, when considered as a whole across all - /// partitions ordered by `range_id`, must be sorted. - /// - /// Concretely, this means: + /// DEPRECATED: range-based distributed BTree building has been retired. + /// Setting this to `Some(..)` now emits a warning and is ignored at build time + /// (see `BTreeIndexPlugin::train_index`). Build one segment per worker and + /// commit them with `commit_existing_index_segments(...)`, optionally + /// consolidating with `merge_existing_index_segments(...)`. The field is + /// retained (rather than removed) so the plugin can detect stale `range_id` + /// inputs and warn loudly instead of serde silently dropping an unknown field. /// - /// All values in the data provided for `range_id: N` must be **less than or equal to** - /// all values in the data for `range_id: N+1`. - /// - /// Lance relies on this precondition to ensure the final, merged index is valid and - /// correctly ordered. - /// - /// # `None` Case - /// - /// If `range_id` is `None`, a single, monolithic index is built over the provided dataset. + /// Historically, this was the ordinal ID of a globally sorted range + /// partition. Lance used it to write `part_*` BTree files that were later + /// merged by `merge_index_metadata`. That flow has been retired. A + /// pre-sorted training stream is still accepted, but this field no longer + /// affects file names, commit behavior, or query semantics. pub range_id: Option, } @@ -2903,13 +2832,27 @@ impl ScalarIndexPlugin for BTreeIndexPlugin { data: SendableRecordBatchStream, index_store: &dyn IndexStore, request: Box, - fragment_ids: Option>, + _fragment_ids: Option>, _progress: Arc, ) -> Result { let request = request .as_any() .downcast_ref::() .unwrap(); + if request.parameters.range_id.is_some() { + // `range_id` is deprecated and now ignored. A pre-sorted data stream is + // still supported (pass it as the training data), but `range_id` no longer + // needs to be set: each build now produces one canonical segment, and + // distribution is handled by the segmented-index APIs. The field will be + // removed in a future release. + warn!( + "BTree `range_id` is deprecated and now ignored; a pre-sorted data \ + stream is still supported, but `range_id` no longer needs to be passed. \ + Use the segmented-index APIs instead (build per-fragment segments, then \ + commit_existing_index_segments(...) / merge_existing_index_segments(...)). \ + The `range_id` field will be removed in a future release." + ); + } train_btree_index( data, index_store, @@ -2917,8 +2860,8 @@ impl ScalarIndexPlugin for BTreeIndexPlugin { .parameters .zone_size .unwrap_or(DEFAULT_BTREE_BATCH_SIZE), - fragment_ids, - request.parameters.range_id, + None, + None, ) .await?; Ok(CreatedIndex { diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 30e43c5bb32..1aeb08ac2ad 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -3035,7 +3035,7 @@ impl Dataset { &self, index_uuid: &str, index_type: IndexType, - batch_readhead: Option, + _batch_readhead: Option, progress: Arc, ) -> Result<()> { let store = LanceIndexStore::from_dataset_for_new(self, index_uuid)?; @@ -3052,15 +3052,12 @@ impl Dataset { .await } IndexType::BTree => { - // Call merge_index_files function for btree index - lance_index::scalar::btree::merge_index_files( - self.object_store.as_ref(), - &index_dir, - Arc::new(store), - batch_readhead, - progress, - ) - .await + Err(Error::invalid_input( + "BTree distributed indexing no longer supports merge_index_metadata; \ + build segments, optionally merge groups with merge_existing_index_segments(...), \ + and commit with commit_existing_index_segments(...)" + .to_string(), + )) } IndexType::Bitmap => { lance_index::scalar::bitmap::merge_index_files( diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 842252f9a45..70efea17c57 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -755,18 +755,21 @@ mod tests { use arrow_array::{FixedSizeListArray, RecordBatchIterator}; use arrow_array::{Int32Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use datafusion::common::ScalarValue; use lance_arrow::FixedSizeListArrayExt; - use lance_core::utils::tempfile::TempStrDir; + use lance_core::utils::{address::RowAddress, tempfile::TempStrDir}; use lance_datagen::{self, gen_batch}; use lance_index::optimize::OptimizeOptions; use lance_index::progress::IndexBuildProgress; - use lance_index::scalar::{FullTextSearchQuery, inverted::tokenizer::InvertedIndexParams}; + use lance_index::scalar::{ + FullTextSearchQuery, SargableQuery, SearchResult, inverted::tokenizer::InvertedIndexParams, + }; use lance_index::vector::hnsw::builder::HnswBuildParams; use lance_index::vector::ivf::IvfBuildParams; use lance_index::vector::kmeans::{KMeansParams, train_kmeans}; use lance_linalg::distance::{DistanceType, MetricType}; use serde_json::json; - use std::sync::Arc; + use std::{collections::BTreeSet, ops::Bound, sync::Arc}; use uuid::Uuid; lance_testing::define_stage_event_progress!(RecordingProgress, IndexBuildProgress, Result<()>); @@ -1272,102 +1275,233 @@ mod tests { } #[tokio::test] - async fn test_merge_index_metadata_btree_reports_progress() { + async fn test_merge_index_metadata_btree_soft_break() { let tmpdir = TempStrDir::default(); let dataset_uri = format!("file://{}", tmpdir.as_str()); - let reader = gen_batch() .col("id", lance_datagen::array::step::()) .into_reader_rows( - lance_datagen::RowCount::from(256), - lance_datagen::BatchCount::from(4), + lance_datagen::RowCount::from(8), + lance_datagen::BatchCount::from(1), ); - let mut dataset = Dataset::write( - reader, - &dataset_uri, - Some(WriteParams { - max_rows_per_file: 64, - mode: WriteMode::Overwrite, - ..Default::default() - }), - ) - .await - .unwrap(); - - let params = ScalarIndexParams::for_builtin(lance_index::scalar::BuiltinIndexType::BTree); - let fragments = dataset.get_fragments(); - let fragment_ids: Vec = fragments.iter().map(|f| f.id() as u32).collect(); - let shared_uuid = Uuid::new_v4().to_string(); - let build_progress = Arc::new(RecordingProgress::default()); - - for &fragment_id in &fragment_ids { - CreateIndexBuilder::new(&mut dataset, &["id"], IndexType::BTree, ¶ms) - .name("distributed_btree".to_string()) - .fragments(vec![fragment_id]) - .index_uuid(shared_uuid.clone()) - .progress(build_progress.clone()) - .execute_uncommitted() - .await - .unwrap(); - } + let dataset = Dataset::write(reader, &dataset_uri, None).await.unwrap(); - let merge_progress = Arc::new(RecordingProgress::default()); - dataset + let err = dataset .merge_index_metadata( - &shared_uuid, + &Uuid::new_v4().to_string(), IndexType::BTree, - Some(1), - merge_progress.clone(), + None, + Arc::new(NoopIndexBuildProgress), ) .await - .unwrap(); + .unwrap_err(); + assert!( + err.to_string() + .contains("no longer supports merge_index_metadata"), + "expected BTree merge_index_metadata soft-break error, got: {err}" + ); + } - let build_tags = build_progress - .recorded_events() - .iter() - .map(|(kind, stage, _)| format!("{kind}:{stage}")) - .collect::>(); + /// Assert a committed segment directory holds exactly one canonical BTree + /// payload — one `page_data.lance` + one `page_lookup.lance` — and no `part_*` + /// shard files. Locks the "every segment has exactly one lookup" invariant. + async fn assert_canonical_btree_segment(dataset: &Dataset, uuid: &Uuid) { + let index_dir = dataset.indices_dir().join(uuid.to_string()); + let files = list_index_files_with_sizes(&dataset.object_store, &index_dir) + .await + .unwrap(); + let names: Vec<&str> = files.iter().map(|f| f.path.as_str()).collect(); + assert_eq!( + names.iter().filter(|n| **n == "page_lookup.lance").count(), + 1, + "segment must have exactly one canonical page_lookup.lance, got {names:?}" + ); + assert_eq!( + names.iter().filter(|n| **n == "page_data.lance").count(), + 1, + "segment must have exactly one canonical page_data.lance, got {names:?}" + ); assert!( - build_tags.iter().any(|e| e == "start:load_data"), - "expected load_data progress during public distributed build" + !names.iter().any(|n| n.starts_with("part_")), + "segment must have no part_* shard files, got {names:?}" ); + } - let merge_tags = merge_progress - .recorded_events() + #[tokio::test] + async fn test_segmented_btree_multi_fragment_commit_and_search() { + let test_dir = TempStrDir::default(); + let dataset = gen_batch() + .col("value", lance_datagen::array::step::()) + .into_dataset( + test_dir.as_str(), + FragmentCount::from(4), + FragmentRowCount::from(16), + ) + .await + .unwrap(); + let mut dataset = dataset; + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 4); + + let params = ScalarIndexParams::for_builtin(lance_index::scalar::BuiltinIndexType::BTree); + let mut segments = Vec::new(); + for fragment in &fragments { + segments.push( + CreateIndexBuilder::new(&mut dataset, &["value"], IndexType::BTree, ¶ms) + .name("value_btree_segments".to_string()) + .fragments(vec![fragment.id() as u32]) + .execute_uncommitted() + .await + .unwrap(), + ); + } + let segment_uuids = segments .iter() - .map(|(kind, stage, _)| format!("{kind}:{stage}")) + .map(|segment| segment.uuid) .collect::>(); - let pages_start = merge_tags - .iter() - .position(|e| e == "start:merge_pages") - .expect("missing merge_pages start"); - let pages_complete = merge_tags - .iter() - .position(|e| e == "complete:merge_pages") - .expect("missing merge_pages complete"); - let write_start = merge_tags - .iter() - .position(|e| e == "start:write_lookup_file") - .expect("missing write_lookup_file start"); - let write_complete = merge_tags - .iter() - .position(|e| e == "complete:write_lookup_file") - .expect("missing write_lookup_file complete"); - assert!(pages_start < pages_complete); - assert!(pages_complete < write_start); - assert!(write_start < write_complete); - assert!( - merge_tags.iter().any(|e| e == "progress:merge_pages"), - "expected merge_pages progress during public merge" + + dataset + .commit_existing_index_segments("value_btree_segments", "value", segments) + .await + .unwrap(); + + let committed = dataset + .load_indices_by_name("value_btree_segments") + .await + .unwrap(); + assert_eq!(committed.len(), fragments.len()); + for segment_uuid in &segment_uuids { + assert_canonical_btree_segment(&dataset, segment_uuid).await; + } + + let logical = crate::index::scalar_logical::open_named_scalar_index( + &dataset, + "value", + "value_btree_segments", + &NoOpMetricsCollector, + ) + .await + .unwrap(); + + let point_query = SargableQuery::Equals(ScalarValue::Int32(Some(33))); + let point_matches = match logical + .search(&point_query, &NoOpMetricsCollector) + .await + .unwrap() + { + SearchResult::Exact(row_addrs) => row_addrs.true_rows().row_addrs().unwrap().count(), + other => panic!("expected exact point result, got {other:?}"), + }; + assert_eq!(point_matches, 1); + + let range_query = SargableQuery::Range( + Bound::Included(ScalarValue::Int32(Some(14))), + Bound::Excluded(ScalarValue::Int32(Some(35))), ); - assert!( - merge_tags.iter().any(|e| e == "progress:write_lookup_file"), - "expected write_lookup_file progress during public merge" + let range_row_addrs = match logical + .search(&range_query, &NoOpMetricsCollector) + .await + .unwrap() + { + SearchResult::Exact(row_addrs) => row_addrs, + other => panic!("expected exact range result, got {other:?}"), + }; + let searched_fragments = range_row_addrs + .true_rows() + .row_addrs() + .unwrap() + .map(|row_addr| RowAddress::from(u64::from(row_addr)).fragment_id()) + .collect::>(); + assert_eq!(searched_fragments.len(), 21); + assert_eq!( + searched_fragments.into_iter().collect::>(), + BTreeSet::from([0, 1, 2]) ); - assert!( - !merge_tags.iter().any(|e| e == "start:merge_lookups"), - "fragment-based distributed BTREE merge should not use merge_lookups" + } + + #[tokio::test] + async fn test_range_based_btree_index_create() { + use crate::dataset::scanner::ColumnOrdering; + use futures::TryStreamExt; + + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + + // Write the dataset with deliberately unsorted ids so the sort step is real. + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let ids: Vec = (0..256).rev().collect(); + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(ids))]).unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + let mut dataset = Dataset::write(reader, &dataset_uri, None).await.unwrap(); + + // The worker scans the dataset's `(id, _rowid)` rows and sorts them by value, + // producing the BTree training stream `(value, _rowid)` externally — the "scan, + // sort, then hand a pre-sorted reader to the builder" path; no `range_id`. + let sorted_batches: Vec = { + let mut scan = dataset.scan(); + scan.order_by(Some(vec![ColumnOrdering::asc_nulls_first( + "id".to_string(), + )])) + .unwrap(); + scan.with_row_id(); + scan.project_with_transform(&[("value", "id")]).unwrap(); + scan.try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap() + }; + let train_schema = sorted_batches[0].schema(); + let sorted_reader = + RecordBatchIterator::new(sorted_batches.into_iter().map(Ok), train_schema); + + // Build one self-contained segment directly from the sorted reader. + let params = ScalarIndexParams::for_builtin(lance_index::scalar::BuiltinIndexType::BTree); + let segment = CreateIndexBuilder::new(&mut dataset, &["id"], IndexType::BTree, ¶ms) + .name("id_btree".to_string()) + .preprocessed_data(Box::new(sorted_reader)) + .execute_uncommitted() + .await + .unwrap(); + let segment_uuid = segment.uuid; + + // Commit the segment via the segmented-index API — no merge step. + dataset + .commit_existing_index_segments("id_btree", "id", vec![segment]) + .await + .unwrap(); + + let committed = dataset.load_indices_by_name("id_btree").await.unwrap(); + assert_eq!(committed.len(), 1); + assert_eq!(committed[0].uuid, segment_uuid); + + // Exactly one canonical data + one lookup, no shards. + assert_canonical_btree_segment(&dataset, &segment_uuid).await; + + // The committed index answers a range query correctly. + let logical = crate::index::scalar_logical::open_named_scalar_index( + &dataset, + "id", + "id_btree", + &NoOpMetricsCollector, + ) + .await + .unwrap(); + let query = SargableQuery::Range( + Bound::Included(ScalarValue::Int32(Some(0))), + Bound::Excluded(ScalarValue::Int32(Some(64))), ); + let matched = match logical.search(&query, &NoOpMetricsCollector).await.unwrap() { + SearchResult::Exact(row_addrs) => row_addrs.true_rows().row_addrs().unwrap().count(), + other => panic!("expected exact result, got {other:?}"), + }; + assert_eq!(matched, 64); } #[tokio::test]