From 698e3d483d65f11eff5a07c4cd2afaefb46c0831 Mon Sep 17 00:00:00 2001 From: zhangyue19921010 Date: Sun, 31 May 2026 14:34:15 +0800 Subject: [PATCH 1/3] need more test --- java/lance-jni/src/blocking_dataset.rs | 21 +- .../lance/index/scalar/BTreeIndexParams.java | 7 + java/src/test/java/org/lance/DatasetTest.java | 5 +- .../java/org/lance/index/ScalarIndexTest.java | 238 +------------- python/python/tests/test_scalar_index.py | 303 ------------------ rust/lance-index/src/scalar/btree.rs | 89 ++--- rust/lance/src/dataset.rs | 17 +- rust/lance/src/index/create.rs | 198 +++++++----- 8 files changed, 179 insertions(+), 699 deletions(-) diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index d338ff0fc4b..df43c205583 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -49,7 +49,6 @@ use lance_file::version::LanceFileVersion; use lance_index::IndexCriteria as RustIndexCriteria; use lance_index::optimize::OptimizeOptions; use lance_index::progress::noop_progress; -use lance_index::scalar::btree::BTreeParameters; use lance_index::{IndexParams, IndexType}; use lance_io::object_store::ObjectStoreRegistry; use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsProvider}; @@ -960,8 +959,9 @@ fn inner_create_index<'local>( None }; - // we should skip committing index when building distributed indices. - let mut skip_commit = fragment_ids.is_some(); + // we should skip committing index when building distributed indices + // (one segment per worker; commit happens later via commit_existing_index_segments). + let skip_commit = fragment_ids.is_some(); // Handle scalar vs vector indices differently and get params before borrowing dataset let params_result: Result> = match index_type { @@ -980,7 +980,6 @@ fn inner_create_index<'local>( index_type: index_type_str, params: params_opt.clone(), }; - skip_commit = skip_commit || should_skip_commit(index_type, ¶ms_opt)?; Ok(Box::new(scalar_params)) } IndexType::FragmentReuse | IndexType::MemWal => { @@ -1060,20 +1059,6 @@ fn inner_drop_index(env: &mut JNIEnv, java_dataset: JObject, name: JString) -> R Ok(()) } -fn should_skip_commit(index_type: IndexType, params_opt: &Option) -> Result { - match index_type { - IndexType::BTree => { - // Should defer the commit if we are building range-based BTree index - if let Some(params) = params_opt { - let btree_parameters = serde_json::from_str::(params)?; - return Ok(btree_parameters.range_id.is_some()); - } - Ok(false) - } - _ => Ok(false), - } -} - #[unsafe(no_mangle)] pub extern "system" fn Java_org_lance_Dataset_innerMergeIndexMetadata<'local>( mut env: JNIEnv<'local>, diff --git a/java/src/main/java/org/lance/index/scalar/BTreeIndexParams.java b/java/src/main/java/org/lance/index/scalar/BTreeIndexParams.java index d72d5936d97..a7be46d2a85 100755 --- a/java/src/main/java/org/lance/index/scalar/BTreeIndexParams.java +++ b/java/src/main/java/org/lance/index/scalar/BTreeIndexParams.java @@ -59,7 +59,14 @@ public Builder zoneSize(long zoneSize) { * @param rangeId non-negative range identifier * @return this builder * @throws IllegalArgumentException + * @deprecated {@code rangeId} is no longer needed and is now ignored at build time (a warning + * is logged if set). A pre-sorted data stream is still supported — just pass the reader + * without a {@code rangeId}. Each build produces one canonical segment; for distributed + * builds, build one segment per worker and commit them with {@code + * commitExistingIndexSegments(...)}, optionally consolidating with {@code + * mergeExistingIndexSegments(...)}. This parameter will be removed in a future release. */ + @Deprecated public Builder rangeId(int rangeId) { if (rangeId < 0) { throw new IllegalArgumentException("rangeId must be non-negative"); diff --git a/java/src/test/java/org/lance/DatasetTest.java b/java/src/test/java/org/lance/DatasetTest.java index 315b010da1e..66d6d5d6687 100644 --- a/java/src/test/java/org/lance/DatasetTest.java +++ b/java/src/test/java/org/lance/DatasetTest.java @@ -2196,10 +2196,7 @@ void testDropIndex(@TempDir Path tempDir) { dataset.listIndexes().contains("test_index"), "Partially created index should not present"); - // 3. merge metadata, which will still not be committed - dataset.mergeIndexMetadata(uuid.toString(), IndexType.BTREE, Optional.empty()); - - // 4. commit the index + // 3. commit the index int fieldId = dataset.getLanceSchema().fields().stream() .filter(f -> f.getName().equals("name")) diff --git a/java/src/test/java/org/lance/index/ScalarIndexTest.java b/java/src/test/java/org/lance/index/ScalarIndexTest.java index 70ef43c853c..db3c93a7820 100644 --- a/java/src/test/java/org/lance/index/ScalarIndexTest.java +++ b/java/src/test/java/org/lance/index/ScalarIndexTest.java @@ -13,27 +13,14 @@ */ package org.lance.index; -import org.lance.CommitBuilder; import org.lance.Dataset; import org.lance.Fragment; import org.lance.TestUtils; -import org.lance.Transaction; import org.lance.WriteParams; import org.lance.index.scalar.ScalarIndexParams; -import org.lance.ipc.LanceScanner; -import org.lance.ipc.ScanOptions; -import org.lance.operation.CreateIndex; -import org.apache.arrow.c.ArrowArrayStream; -import org.apache.arrow.c.Data; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.IntVector; -import org.apache.arrow.vector.UInt8Vector; -import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.ipc.ArrowReader; -import org.apache.arrow.vector.ipc.ArrowStreamReader; -import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; @@ -41,17 +28,12 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -107,7 +89,11 @@ public void testCreateBTreeIndex(@TempDir Path tempDir) throws Exception { } @Test - public void testCreateBTreeIndexDistributively(@TempDir Path tempDir) throws Exception { + public void testBtreeMergeIndexMetadataSoftBreak(@TempDir Path tempDir) throws Exception { + // The historical fragment-based distributed BTree flow (per-fragment build into a + // shared uuid + mergeIndexMetadata) has been retired in favor of the segmented flow. + // A fragment-scoped build still succeeds, but mergeIndexMetadata(BTREE) must now fail + // fast with a migration error. String datasetPath = tempDir.resolve("build_index_distributedly").toString(); try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { TestUtils.SimpleTestDataset testDataset = @@ -117,220 +103,30 @@ public void testCreateBTreeIndexDistributively(@TempDir Path tempDir) throws Exc testDataset.write(1, 10).close(); try (Dataset dataset = testDataset.write(2, 10)) { List fragments = dataset.getFragments(); - assertEquals(2, dataset.getFragments().size()); + assertEquals(2, fragments.size()); ScalarIndexParams scalarParams = ScalarIndexParams.create("btree", "{\"zone_size\": 2048}"); IndexParams indexParams = IndexParams.builder().setScalarIndexParams(scalarParams).build(); UUID uuid = UUID.randomUUID(); - // 2. partially create index + // 2. a distributed worker can still build one fragment-scoped segment (uncommitted). dataset.createIndex( IndexOptions.builder(Collections.singletonList("name"), IndexType.BTREE, indexParams) .withIndexName("test_index") .withIndexUUID(uuid.toString()) .withFragmentIds(Collections.singletonList(fragments.get(0).getId())) .build()); - dataset.createIndex( - IndexOptions.builder(Collections.singletonList("name"), IndexType.BTREE, indexParams) - .withIndexName("test_index") - .withIndexUUID(uuid.toString()) - .withFragmentIds(Collections.singletonList(fragments.get(1).getId())) - .build()); - - // then no index should have been created - assertFalse( - dataset.listIndexes().contains("test_index"), - "Partially created index should not present"); - - // 3. merge metadata, which will still not be committed - dataset.mergeIndexMetadata(uuid.toString(), IndexType.BTREE, Optional.empty()); - - // 4. commit the index - int fieldId = - dataset.getLanceSchema().fields().stream() - .filter(f -> f.getName().equals("name")) - .findAny() - .orElseThrow(() -> new RuntimeException("Cannot find 'name' field for TestDataset")) - .getId(); - - long datasetVersion = dataset.version(); - - Index index = - Index.builder() - .uuid(uuid) - .name("test_index") - .fields(Collections.singletonList(fieldId)) - .datasetVersion(datasetVersion) - .indexVersion(0) - .fragments(fragments.stream().map(Fragment::getId).collect(Collectors.toList())) - .build(); - - CreateIndex createIndexOp = - CreateIndex.builder().withNewIndices(Collections.singletonList(index)).build(); - - try (Transaction createIndexTx = - new Transaction.Builder() - .readVersion(datasetVersion) - .operation(createIndexOp) - .build()) { - try (Dataset newDataset = new CommitBuilder(dataset).execute(createIndexTx)) { - // new dataset should contain that index - assertEquals(datasetVersion + 1, newDataset.version()); - assertTrue(newDataset.listIndexes().contains("test_index")); - } - } - } - } - } - - @Test - public void testRangedBTreeIndex(@TempDir Path tempDir) throws Exception { - String datasetPath = tempDir.resolve("ranged_btree_map").toString(); - UUID indexUUID = UUID.randomUUID(); - try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { - TestUtils.SimpleTestDataset testDataset = - new TestUtils.SimpleTestDataset(allocator, datasetPath); - testDataset.createEmptyDataset().close(); - // 1. write some data - try (Dataset dataset = testDataset.write(1, 200)) { - - // 2. scan data out - List data = new ArrayList<>(); - try (LanceScanner scanner = - dataset.newScan( - new ScanOptions.Builder() - .withRowId(true) - .columns(Collections.singletonList("id")) - .build()); - ArrowReader arrowReader = scanner.scanBatches(); ) { - while (arrowReader.loadNextBatch()) { - VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); - UInt8Vector rowIdVec = (UInt8Vector) root.getVector("_rowid"); - IntVector idVec = (IntVector) root.getVector("id"); - for (int i = 0; i < root.getRowCount(); i++) { - data.add(new long[] {idVec.get(i), rowIdVec.get(i)}); - } - } - } - - // 3. sort data globally (This will be done by computing engines in production) - data.sort((d1, d2) -> (int) (d1[0] - d2[0])); - int mid = data.size() / 2; - - // 4. divide sorted data into ranges and build index for each range - createBtreeIndexForRange(dataset, data.subList(0, mid), 1, allocator, indexUUID); - createBtreeIndexForRange(dataset, data.subList(mid, data.size()), 2, allocator, indexUUID); - // 5. merge index. - dataset.mergeIndexMetadata(indexUUID.toString(), IndexType.BTREE, Optional.empty()); - - // 6. commit index - long datasetVersion = dataset.version(); - int fieldId = - dataset.getLanceSchema().fields().stream() - .filter(f -> f.getName().equals("id")) - .findAny() - .orElseThrow(() -> new RuntimeException("Cannot find 'id' field for TestDataset")) - .getId(); - Index index = - Index.builder() - .uuid(indexUUID) - .name("test_index") - .fields(Collections.singletonList(fieldId)) - .datasetVersion(datasetVersion) - .indexVersion(0) - .fragments( - dataset.getFragments().stream() - .map(Fragment::getId) - .collect(Collectors.toList())) - .build(); - - CreateIndex createIndexOp = - CreateIndex.builder().withNewIndices(Collections.singletonList(index)).build(); - - try (Transaction createIndexTx = - new Transaction.Builder() - .readVersion(datasetVersion) - .operation(createIndexOp) - .build()) { - try (Dataset newDataset = new CommitBuilder(dataset).execute(createIndexTx)) { - // new dataset should contain that index - assertEquals(datasetVersion + 1, newDataset.version()); - assertTrue(newDataset.listIndexes().contains("test_index")); - - // 7. compare results - // force use index should get the right value - ScanOptions scanOptions = - new ScanOptions.Builder().withRowId(true).filter("id in (10, 20, 30)").build(); - try (LanceScanner scanner = newDataset.newScan(scanOptions); - ArrowReader arrowReader = scanner.scanBatches(); ) { - List ids = new ArrayList<>(); - while (arrowReader.loadNextBatch()) { - VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); - IntVector idVec = (IntVector) root.getVector("id"); - for (int i = 0; i < idVec.getValueCount(); i++) { - ids.add(idVec.get(i)); - } - } - Collections.sort(ids); - Assertions.assertIterableEquals(Arrays.asList(10, 20, 30), ids); - } - } - } - } - } - } - - private void createBtreeIndexForRange( - Dataset dataset, - List preprocessedData, - int rangeId, - BufferAllocator allocator, - UUID indexUUID) { - // Note that the indexing column is called 'value' in btree. - Schema schema = - new Schema( - Arrays.asList( - Field.nullable("value", new ArrowType.Int(32, true)), - Field.nullable("_rowid", new ArrowType.Int(64, false))), - null); - try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { - root.allocateNew(); - IntVector idVec = (IntVector) root.getVector("value"); - UInt8Vector rowIdVec = (UInt8Vector) root.getVector("_rowid"); - for (int i = 0; i < preprocessedData.size(); i++) { - long[] dataPair = preprocessedData.get(i); - idVec.set(i, (int) dataPair[0]); - rowIdVec.setSafe(i, dataPair[1]); - } - root.setRowCount(preprocessedData.size()); - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - try (ArrowStreamWriter writer = new ArrowStreamWriter(root, null, out)) { - writer.start(); - writer.writeBatch(); - writer.end(); - } catch (IOException e) { - throw new RuntimeException("Cannot write schema root", e); - } - - byte[] arrowData = out.toByteArray(); - ByteArrayInputStream in = new ByteArrayInputStream(arrowData); - - try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator); - ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { - Data.exportArrayStream(allocator, reader, stream); - - ScalarIndexParams scalarParams = - ScalarIndexParams.create("btree", String.format("{\"range_id\": %s}", rangeId)); - IndexParams indexParams = IndexParams.builder().setScalarIndexParams(scalarParams).build(); - dataset.createIndex( - IndexOptions.builder(Collections.singletonList("id"), IndexType.BTREE, indexParams) - .withIndexUUID(indexUUID.toString()) - .withPreprocessedData(stream) - .build()); - } catch (Exception e) { - throw new RuntimeException("Cannot read arrow stream.", e); + // 3. but the shard-then-merge commit step is no longer supported. + Exception ex = + Assertions.assertThrows( + Exception.class, + () -> + dataset.mergeIndexMetadata(uuid.toString(), IndexType.BTREE, Optional.empty())); + assertTrue( + ex.getMessage() != null + && ex.getMessage().contains("no longer supports merge_index_metadata"), + "expected BTree merge_index_metadata soft-break error, got: " + ex.getMessage()); } } } diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index 7b4dede319b..b8af34d448e 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -95,97 +95,6 @@ def data_table(indexed_dataset: lance.LanceDataset): return indexed_dataset.scanner().to_table() -@pytest.fixture -def btree_comparison_datasets(tmp_path): - """Setup datasets for B-tree comparison tests""" - # Test configuration - num_fragments = 3 - rows_per_fragment = 10000 - total_rows = num_fragments * rows_per_fragment - - # Create dataset for fragment-level indexing - fragment_ds = generate_multi_fragment_dataset( - tmp_path / "fragment", - num_fragments=num_fragments, - rows_per_fragment=rows_per_fragment, - ) - - # Create dataset for complete indexing (same data structure) - complete_ds = generate_multi_fragment_dataset( - tmp_path / "complete", - num_fragments=num_fragments, - rows_per_fragment=rows_per_fragment, - ) - - import uuid - - # Build fragment-level B-tree index - fragment_index_id = str(uuid.uuid4()) - fragment_index_name = "fragment_btree_precise_test" - - fragments = fragment_ds.get_fragments() - fragment_ids = [fragment.fragment_id for fragment in fragments] - - # Create fragment-level indices - for fragment in fragments: - fragment_id = fragment.fragment_id - - fragment_ds.create_scalar_index( - column="id", - index_type="BTREE", - name=fragment_index_name, - replace=False, - index_uuid=fragment_index_id, - fragment_ids=[fragment_id], - ) - - # Merge fragment indices - fragment_ds.merge_index_metadata(fragment_index_id, index_type="BTREE") - - # Create Index object for fragment-based index - from lance.dataset import Index - - field_id = fragment_ds.schema.get_field_index("id") - - fragment_index = Index( - uuid=fragment_index_id, - name=fragment_index_name, - fields=[field_id], - dataset_version=fragment_ds.version, - fragment_ids=set(fragment_ids), - index_version=0, - ) - - # Commit fragment-based index - create_fragment_index_op = lance.LanceOperation.CreateIndex( - new_indices=[fragment_index], - removed_indices=[], - ) - - fragment_ds_committed = lance.LanceDataset.commit( - fragment_ds.uri, - create_fragment_index_op, - read_version=fragment_ds.version, - ) - - # Build complete B-tree index - complete_index_name = f"complete_btree_{uuid.uuid4().hex[:8]}" - complete_ds.create_scalar_index( - column="id", - index_type="BTREE", - name=complete_index_name, - ) - # Reload the dataset to get the indexed version - complete_ds = lance.dataset(complete_ds.uri) - - return { - "fragment_ds": fragment_ds_committed, - "complete_ds": complete_ds, - "rows_per_fragment": rows_per_fragment, - "total_rows": total_rows, - } - - def test_load_indices(indexed_dataset: lance.LanceDataset): indices = indexed_dataset.describe_indices() vec_idx = next(idx for idx in indices if "VectorIndex" in idx.type_url) @@ -3759,141 +3668,6 @@ def test_backward_compatibility_changed_index_protos(tmp_path): assert results.column("x").to_pylist() == [100] -def test_distribute_btree_index_build(tmp_path): - """ - Test distributed B-tree index build similar to test_distribute_fts_index_build. - This test creates B-tree indices on individual fragments and then - commits them as a single index. - """ - # Generate test dataset with multiple fragments - ds = generate_multi_fragment_dataset( - tmp_path, num_fragments=4, rows_per_fragment=10000 - ) - - import uuid - - index_id = str(uuid.uuid4()) - index_name = "btree_multiple_fragment_idx" - - fragments = ds.get_fragments() - fragment_ids = [fragment.fragment_id for fragment in fragments] - - for fragment in ds.get_fragments(): - fragment_id = fragment.fragment_id - - # Create B-tree scalar index for each fragment - # Use the same index_name for all fragments (like in FTS test) - ds.create_scalar_index( - column="id", # Use integer column for B-tree - index_type="BTREE", - name=index_name, - replace=False, - index_uuid=index_id, - fragment_ids=[fragment_id], - ) - - # test that the dataset should be searchable - # when the index not committed yet - # Test that the index works for searching - # Test exact equality queries - test_id = 100 # Should be in first fragment - results = ds.scanner( - filter=f"id = {test_id}", - columns=["id", "text"], - ).to_table() - - assert results.num_rows == 1, f"No results found for id = {test_id}" - - # Merge the B-tree index metadata - ds.merge_index_metadata(index_id, index_type="BTREE") - - # Create an Index object using the new dataclass format - from lance.dataset import Index - - # Get the schema field for the indexed column - field_id = ds.schema.get_field_index("id") - - index = Index( - uuid=index_id, - name=index_name, - fields=[field_id], # Use field index instead of field object - dataset_version=ds.version, - fragment_ids=set(fragment_ids), - index_version=0, - ) - - # Create the index operation - create_index_op = lance.LanceOperation.CreateIndex( - new_indices=[index], - removed_indices=[], - ) - - # Commit the index - ds_committed = lance.LanceDataset.commit( - ds.uri, - create_index_op, - read_version=ds.version, - ) - - # Verify the index was created and is functional - stats = ds_committed.stats.index_stats(index_name) - assert stats["name"] == index_name - assert stats["index_type"] == "BTree" - - # Test that the index works for searching - # Test exact equality queries - test_id = 100 # Should be in first fragment - results = ds_committed.scanner( - filter=f"id = {test_id}", - columns=["id", "text"], - ).to_table() - - assert results.num_rows == 1, f"No results found for id = {test_id}" - - # Test range queries across fragments - results_range = ds_committed.scanner( - filter="id >= 200 AND id < 800", - columns=["id", "text"], - ).to_table() - - assert results_range.num_rows > 0, "No results found for range query" - - # Compare with complete index results to ensure consistency - # Create a reference dataset with complete index - reference_ds = generate_multi_fragment_dataset( - tmp_path / "reference", num_fragments=4, rows_per_fragment=10000 - ) - - # Create complete B-tree index for comparison - reference_ds.create_scalar_index( - column="id", - index_type="BTREE", - name="reference_btree_idx", - ) - - # Compare exact query results - reference_results = reference_ds.scanner( - filter=f"id = {test_id}", - columns=["id", "text"], - ).to_table() - - assert results.num_rows == reference_results.num_rows, ( - f"Distributed index returned {results.num_rows} results, " - f"but complete index returned {reference_results.num_rows} results" - ) - - # Compare range query results - reference_range_results = reference_ds.scanner( - filter="id >= 200 AND id < 800", - columns=["id", "text"], - ).to_table() - - assert results_range.num_rows == reference_range_results.num_rows, ( - f"Distributed index range query returned {results_range.num_rows} results, " - f"but complete index returned {reference_range_results.num_rows} results" - ) - - def _assert_committed_distributed_bitmap_index(ds, index_id, index_name, fragment_ids): ds.merge_index_metadata(index_id, index_type="BITMAP") @@ -4041,83 +3815,6 @@ def test_btree_fragment_ids_parameter_validation(tmp_path): print(f"Expected error for invalid fragment ID: {e}") -@pytest.mark.parametrize( - "test_name,filter_expr", - [ - # Test 1: Boundary values at fragment edges - ("First value", "id = 0"), - ("Fragment 0 last value", "id = 9999"), - ("Fragment 1 first value", "id = 10000"), - ("Fragment 1 last value", "id = 19999"), - ("Fragment 2 first value", "id = 20000"), - ("Last value", "id = 29999"), - # Test 2: Values in the middle of fragments - ("Fragment 0 middle", "id = 5000"), - ("Fragment 1 middle", "id = 15000"), - ("Fragment 2 middle", "id = 25000"), - # Test 3: Range queries within single fragments - ("Range within fragment 0", "id >= 10 AND id < 20"), - ("Range within fragment 1", "id >= 10010 AND id < 10020"), - ("Range within fragment 2", "id >= 20010 AND id < 20020"), - # Test 4: Range queries spanning multiple fragments - ("Cross fragment 0-1", "id >= 9995 AND id < 10005"), - ("Cross fragment 1-2", "id >= 19995 AND id < 20005"), - ("Cross all fragments", "id >= 5000 AND id < 25000"), - # Test 5: Edge cases - ("Non-existent small value", "id = -1"), - ("Non-existent large value", "id = 30100"), - ("Large range", "id >= 0 AND id < 30000"), - # Test 6: Comparison operators - ("Less than boundary", "id < 10000"), - ("Greater than boundary", "id > 19999"), - ("Less than or equal", "id <= 10050"), - ("Greater than or equal", "id >= 10050"), - ], -) -def test_btree_query_comparison_parametrized( - btree_comparison_datasets, test_name, filter_expr -): - """ - Parametrized B-tree index query comparison test - - Convert the original loop test to parametrized test, - each test case runs independently - """ - fragment_ds = btree_comparison_datasets["fragment_ds"] - complete_ds = btree_comparison_datasets["complete_ds"] - - # Query fragment-based index - fragment_results = fragment_ds.scanner( - filter=filter_expr, - columns=["id", "text"], - ).to_table() - - # Query complete index - complete_results = complete_ds.scanner( - filter=filter_expr, - columns=["id", "text"], - ).to_table() - - # Compare row counts - assert fragment_results.num_rows == complete_results.num_rows, ( - f"Test '{test_name}' failed: Fragment index " - f"returned {fragment_results.num_rows} rows, " - f"but complete index returned {complete_results.num_rows}" - f" rows for filter: {filter_expr}" - ) - - # Compare actual results if there are any - if fragment_results.num_rows > 0: - # Sort both results by id for comparison - fragment_ids = sorted(fragment_results.column("id").to_pylist()) - complete_ids = sorted(complete_results.column("id").to_pylist()) - - assert fragment_ids == complete_ids, ( - f"Test '{test_name}' failed: Fragment index " - f"and complete index returned different results for filter: {filter_expr}" - ) - - def test_fts_flat_fallback_matches_wand(tmp_path): # Repro: when filter matches < 10%, FTS fell back to flat search and missed results. # Two-term query increases reproduction likelihood. diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 6de490b9572..cb57b0c195f 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -58,10 +58,9 @@ use lance_datafusion::{ chunker::chunk_concat_stream, exec::{LanceExecutionOptions, OneShotExec, execute_plan}, }; -use lance_io::object_store::ObjectStore; use lance_select::NullableRowAddrSet; use log::{debug, warn}; -use object_store::{Error as ObjectStoreError, path::Path}; +use object_store::Error as ObjectStoreError; use rangemap::RangeInclusiveMap; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize, Serializer}; @@ -2141,66 +2140,6 @@ pub async fn train_btree_index( Ok(()) } -pub async fn merge_index_files( - object_store: &ObjectStore, - index_dir: &Path, - store: Arc, - batch_readhead: Option, - progress: Arc, -) -> Result<()> { - // List all partition page / lookup files in the index directory - let (part_page_files, part_lookup_files) = - list_page_lookup_files(object_store, index_dir).await?; - merge_metadata_files( - store.as_ref(), - &part_page_files, - &part_lookup_files, - batch_readhead, - progress, - ) - .await -} - -/// List and filter files from the index directory -/// Returns (page_files, lookup_files) -async fn list_page_lookup_files( - object_store: &ObjectStore, - index_dir: &Path, -) -> Result<(Vec, Vec)> { - let mut part_page_files = Vec::new(); - let mut part_lookup_files = Vec::new(); - - let mut list_stream = object_store.list(Some(index_dir.clone())); - - while let Some(item) = list_stream.next().await { - match item { - Ok(meta) => { - let file_name = meta.location.filename().unwrap_or_default(); - // Filter files matching the pattern part_*_page_data.lance - if file_name.starts_with("part_") && file_name.ends_with("_page_data.lance") { - part_page_files.push(file_name.to_string()); - } - // Filter files matching the pattern part_*_page_lookup.lance - if file_name.starts_with("part_") && file_name.ends_with("_page_lookup.lance") { - part_lookup_files.push(file_name.to_string()); - } - } - Err(_) => continue, - } - } - - if part_page_files.is_empty() || part_lookup_files.is_empty() { - return Err(Error::internal(format!( - "No partition metadata files found in index directory: {} (page_files: {}, lookup_files: {})", - index_dir, - part_page_files.len(), - part_lookup_files.len() - ))); - } - - Ok((part_page_files, part_lookup_files)) -} - fn find_single_partition_files( files: &[lance_table::format::IndexFile], ) -> Result> { @@ -2803,6 +2742,12 @@ pub struct BTreeParameters { /// The number of rows to include in each zone pub zone_size: Option, + /// DEPRECATED / UNSUPPORTED: range-based distributed BTree building has been retired. + /// Setting this to `Some(..)` is now rejected at build time (see `BTreeIndexPlugin::train_index`). + /// Build one segment per worker and commit them with `commit_existing_index_segments(...)`, + /// optionally consolidating with `merge_existing_index_segments(...)`. The field is retained + /// (rather than removed) so the plugin can detect and reject stale `range_id` inputs loudly + /// instead of serde silently dropping an unknown field. /// The ordinal ID of a data partition for building a large, distributed BTree index. /// /// When building an index from multiple, pre-partitioned data chunks (for example, @@ -2903,13 +2848,27 @@ impl ScalarIndexPlugin for BTreeIndexPlugin { data: SendableRecordBatchStream, index_store: &dyn IndexStore, request: Box, - fragment_ids: Option>, + _fragment_ids: Option>, _progress: Arc, ) -> Result { let request = request .as_any() .downcast_ref::() .unwrap(); + if request.parameters.range_id.is_some() { + // `range_id` is deprecated and now ignored. A pre-sorted data stream is + // still supported (pass it as the training data), but `range_id` no longer + // needs to be set: each build now produces one canonical segment, and + // distribution is handled by the segmented-index APIs. The field will be + // removed in a future release. + warn!( + "BTree `range_id` is deprecated and now ignored; a pre-sorted data \ + stream is still supported, but `range_id` no longer needs to be passed. \ + Use the segmented-index APIs instead (build per-fragment segments, then \ + commit_existing_index_segments(...) / merge_existing_index_segments(...)). \ + The `range_id` field will be removed in a future release." + ); + } train_btree_index( data, index_store, @@ -2917,8 +2876,8 @@ impl ScalarIndexPlugin for BTreeIndexPlugin { .parameters .zone_size .unwrap_or(DEFAULT_BTREE_BATCH_SIZE), - fragment_ids, - request.parameters.range_id, + None, + None, ) .await?; Ok(CreatedIndex { diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 30e43c5bb32..1aeb08ac2ad 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -3035,7 +3035,7 @@ impl Dataset { &self, index_uuid: &str, index_type: IndexType, - batch_readhead: Option, + _batch_readhead: Option, progress: Arc, ) -> Result<()> { let store = LanceIndexStore::from_dataset_for_new(self, index_uuid)?; @@ -3052,15 +3052,12 @@ impl Dataset { .await } IndexType::BTree => { - // Call merge_index_files function for btree index - lance_index::scalar::btree::merge_index_files( - self.object_store.as_ref(), - &index_dir, - Arc::new(store), - batch_readhead, - progress, - ) - .await + Err(Error::invalid_input( + "BTree distributed indexing no longer supports merge_index_metadata; \ + build segments, optionally merge groups with merge_existing_index_segments(...), \ + and commit with commit_existing_index_segments(...)" + .to_string(), + )) } IndexType::Bitmap => { lance_index::scalar::bitmap::merge_index_files( diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 842252f9a45..759ab956a4e 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -1272,104 +1272,146 @@ mod tests { } #[tokio::test] - async fn test_merge_index_metadata_btree_reports_progress() { + async fn test_merge_index_metadata_btree_soft_break() { let tmpdir = TempStrDir::default(); let dataset_uri = format!("file://{}", tmpdir.as_str()); - let reader = gen_batch() .col("id", lance_datagen::array::step::()) .into_reader_rows( - lance_datagen::RowCount::from(256), - lance_datagen::BatchCount::from(4), + lance_datagen::RowCount::from(8), + lance_datagen::BatchCount::from(1), ); - let mut dataset = Dataset::write( - reader, - &dataset_uri, - Some(WriteParams { - max_rows_per_file: 64, - mode: WriteMode::Overwrite, - ..Default::default() - }), - ) - .await - .unwrap(); - - let params = ScalarIndexParams::for_builtin(lance_index::scalar::BuiltinIndexType::BTree); - let fragments = dataset.get_fragments(); - let fragment_ids: Vec = fragments.iter().map(|f| f.id() as u32).collect(); - let shared_uuid = Uuid::new_v4().to_string(); - let build_progress = Arc::new(RecordingProgress::default()); - - for &fragment_id in &fragment_ids { - CreateIndexBuilder::new(&mut dataset, &["id"], IndexType::BTree, ¶ms) - .name("distributed_btree".to_string()) - .fragments(vec![fragment_id]) - .index_uuid(shared_uuid.clone()) - .progress(build_progress.clone()) - .execute_uncommitted() - .await - .unwrap(); - } + let dataset = Dataset::write(reader, &dataset_uri, None).await.unwrap(); - let merge_progress = Arc::new(RecordingProgress::default()); - dataset + let err = dataset .merge_index_metadata( - &shared_uuid, + &Uuid::new_v4().to_string(), IndexType::BTree, - Some(1), - merge_progress.clone(), + None, + Arc::new(NoopIndexBuildProgress), ) .await - .unwrap(); - - let build_tags = build_progress - .recorded_events() - .iter() - .map(|(kind, stage, _)| format!("{kind}:{stage}")) - .collect::>(); + .unwrap_err(); assert!( - build_tags.iter().any(|e| e == "start:load_data"), - "expected load_data progress during public distributed build" + err.to_string() + .contains("no longer supports merge_index_metadata"), + "expected BTree merge_index_metadata soft-break error, got: {err}" ); + } - let merge_tags = merge_progress - .recorded_events() - .iter() - .map(|(kind, stage, _)| format!("{kind}:{stage}")) - .collect::>(); - let pages_start = merge_tags - .iter() - .position(|e| e == "start:merge_pages") - .expect("missing merge_pages start"); - let pages_complete = merge_tags - .iter() - .position(|e| e == "complete:merge_pages") - .expect("missing merge_pages complete"); - let write_start = merge_tags - .iter() - .position(|e| e == "start:write_lookup_file") - .expect("missing write_lookup_file start"); - let write_complete = merge_tags - .iter() - .position(|e| e == "complete:write_lookup_file") - .expect("missing write_lookup_file complete"); - assert!(pages_start < pages_complete); - assert!(pages_complete < write_start); - assert!(write_start < write_complete); - assert!( - merge_tags.iter().any(|e| e == "progress:merge_pages"), - "expected merge_pages progress during public merge" + /// Assert a committed segment directory holds exactly one canonical BTree + /// payload — one `page_data.lance` + one `page_lookup.lance` — and no `part_*` + /// shard files. Locks the "every segment has exactly one lookup" invariant. + async fn assert_canonical_btree_segment(dataset: &Dataset, uuid: &Uuid) { + let index_dir = dataset.indices_dir().join(uuid.to_string()); + let files = list_index_files_with_sizes(&dataset.object_store, &index_dir) + .await + .unwrap(); + let names: Vec<&str> = files.iter().map(|f| f.path.as_str()).collect(); + assert_eq!( + names.iter().filter(|n| **n == "page_lookup.lance").count(), + 1, + "segment must have exactly one canonical page_lookup.lance, got {names:?}" ); - assert!( - merge_tags.iter().any(|e| e == "progress:write_lookup_file"), - "expected write_lookup_file progress during public merge" + assert_eq!( + names.iter().filter(|n| **n == "page_data.lance").count(), + 1, + "segment must have exactly one canonical page_data.lance, got {names:?}" ); assert!( - !merge_tags.iter().any(|e| e == "start:merge_lookups"), - "fragment-based distributed BTREE merge should not use merge_lookups" + !names.iter().any(|n| n.starts_with("part_")), + "segment must have no part_* shard files, got {names:?}" ); } + #[tokio::test] + async fn test_range_based_btree_index_create() { + use crate::dataset::scanner::ColumnOrdering; + use datafusion::common::ScalarValue; + use futures::TryStreamExt; + use lance_index::scalar::{SargableQuery, SearchResult}; + use std::ops::Bound; + + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + + // Write the dataset with deliberately unsorted ids so the sort step is real. + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + let ids: Vec = (0..256).rev().collect(); + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(ids))]).unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + let mut dataset = Dataset::write(reader, &dataset_uri, None).await.unwrap(); + + // The worker scans the dataset's `(id, _rowid)` rows and sorts them by value, + // producing the BTree training stream `(value, _rowid)` externally — the "scan, + // sort, then hand a pre-sorted reader to the builder" path; no `range_id`. + let sorted_batches: Vec = { + let mut scan = dataset.scan(); + scan.order_by(Some(vec![ColumnOrdering::asc_nulls_first( + "id".to_string(), + )])) + .unwrap(); + scan.with_row_id(); + scan.project_with_transform(&[("value", "id")]).unwrap(); + scan.try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap() + }; + let train_schema = sorted_batches[0].schema(); + let sorted_reader = + RecordBatchIterator::new(sorted_batches.into_iter().map(Ok), train_schema); + + // Build one self-contained segment directly from the sorted reader. + let params = ScalarIndexParams::for_builtin(lance_index::scalar::BuiltinIndexType::BTree); + let segment = CreateIndexBuilder::new(&mut dataset, &["id"], IndexType::BTree, ¶ms) + .name("id_btree".to_string()) + .preprocessed_data(Box::new(sorted_reader)) + .execute_uncommitted() + .await + .unwrap(); + let segment_uuid = segment.uuid; + + // Commit the segment via the segmented-index API — no merge step. + dataset + .commit_existing_index_segments("id_btree", "id", vec![segment]) + .await + .unwrap(); + + let committed = dataset.load_indices_by_name("id_btree").await.unwrap(); + assert_eq!(committed.len(), 1); + assert_eq!(committed[0].uuid, segment_uuid); + + // Exactly one canonical data + one lookup, no shards. + assert_canonical_btree_segment(&dataset, &segment_uuid).await; + + // The committed index answers a range query correctly. + let logical = crate::index::scalar_logical::open_named_scalar_index( + &dataset, + "id", + "id_btree", + &NoOpMetricsCollector, + ) + .await + .unwrap(); + let query = SargableQuery::Range( + Bound::Included(ScalarValue::Int32(Some(0))), + Bound::Excluded(ScalarValue::Int32(Some(64))), + ); + let matched = match logical.search(&query, &NoOpMetricsCollector).await.unwrap() { + SearchResult::Exact(row_addrs) => row_addrs.true_rows().row_addrs().unwrap().count(), + other => panic!("expected exact result, got {other:?}"), + }; + assert_eq!(matched, 64); + } + #[tokio::test] async fn test_distributed_build_bitmap() { use datafusion::common::ScalarValue; From 0ef69a65799e463002c5601ed2a3acb9cdcc1dea Mon Sep 17 00:00:00 2001 From: zhangyue19921010 Date: Sun, 31 May 2026 16:20:16 +0800 Subject: [PATCH 2/3] refactor(index)!: move distributed BTree build to segmented index framework --- java/lance-jni/src/blocking_dataset.rs | 6 +- .../java/org/lance/index/ScalarIndexTest.java | 168 ++++++++++++++- python/python/lance/dataset.py | 12 +- python/python/tests/test_scalar_index.py | 195 ++++++++++++++++++ rust/lance-index/src/scalar/btree.rs | 40 ++-- rust/lance/src/index/create.rs | 104 +++++++++- 6 files changed, 479 insertions(+), 46 deletions(-) diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index df43c205583..305c862e1a7 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -959,9 +959,8 @@ fn inner_create_index<'local>( None }; - // we should skip committing index when building distributed indices - // (one segment per worker; commit happens later via commit_existing_index_segments). - let skip_commit = fragment_ids.is_some(); + // we should skip committing index when building distributed indices. + let mut skip_commit = fragment_ids.is_some(); // Handle scalar vs vector indices differently and get params before borrowing dataset let params_result: Result> = match index_type { @@ -980,6 +979,7 @@ fn inner_create_index<'local>( index_type: index_type_str, params: params_opt.clone(), }; + skip_commit = skip_commit || (index_type == IndexType::BTree && batch_reader.is_some()); Ok(Box::new(scalar_params)) } IndexType::FragmentReuse | IndexType::MemWal => { diff --git a/java/src/test/java/org/lance/index/ScalarIndexTest.java b/java/src/test/java/org/lance/index/ScalarIndexTest.java index db3c93a7820..0884b70e4b5 100644 --- a/java/src/test/java/org/lance/index/ScalarIndexTest.java +++ b/java/src/test/java/org/lance/index/ScalarIndexTest.java @@ -18,9 +18,19 @@ import org.lance.TestUtils; import org.lance.WriteParams; import org.lance.index.scalar.ScalarIndexParams; +import org.lance.ipc.LanceScanner; +import org.lance.ipc.ScanOptions; +import org.apache.arrow.c.ArrowArrayStream; +import org.apache.arrow.c.Data; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.UInt8Vector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; @@ -28,7 +38,11 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -89,11 +103,7 @@ public void testCreateBTreeIndex(@TempDir Path tempDir) throws Exception { } @Test - public void testBtreeMergeIndexMetadataSoftBreak(@TempDir Path tempDir) throws Exception { - // The historical fragment-based distributed BTree flow (per-fragment build into a - // shared uuid + mergeIndexMetadata) has been retired in favor of the segmented flow. - // A fragment-scoped build still succeeds, but mergeIndexMetadata(BTREE) must now fail - // fast with a migration error. + public void testCreateBTreeIndexDistributively(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("build_index_distributedly").toString(); try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { TestUtils.SimpleTestDataset testDataset = @@ -105,11 +115,156 @@ public void testBtreeMergeIndexMetadataSoftBreak(@TempDir Path tempDir) throws E List fragments = dataset.getFragments(); assertEquals(2, fragments.size()); + ScalarIndexParams scalarParams = ScalarIndexParams.create("btree", "{\"zone_size\": 2048}"); + IndexParams indexParams = IndexParams.builder().setScalarIndexParams(scalarParams).build(); + String indexName = "test_index"; + + List segments = new ArrayList<>(); + for (Fragment fragment : fragments) { + segments.add( + dataset.createIndex( + IndexOptions.builder( + Collections.singletonList("name"), IndexType.BTREE, indexParams) + .withIndexName(indexName) + .withFragmentIds(Collections.singletonList(fragment.getId())) + .build())); + } + + assertFalse( + dataset.listIndexes().contains(indexName), + "Partially created index should not present"); + + List committed = dataset.commitExistingIndexSegments(indexName, "name", segments); + assertEquals(2, committed.size()); + assertTrue(dataset.listIndexes().contains(indexName)); + + assertEquals(2, dataset.countIndexedRows(indexName, "name = 'Person 5'", Optional.empty())); + assertEquals( + 10, + dataset.countIndexedRows( + indexName, "name >= 'Person 3' AND name < 'Person 8'", Optional.empty())); + } + } + } + + @Test + public void testRangedBTreeIndex(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("ranged_btree_map").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + testDataset.write(1, 100).close(); + try (Dataset dataset = testDataset.write(2, 100)) { + List fragments = dataset.getFragments(); + assertEquals(2, fragments.size()); + + List segments = new ArrayList<>(); + for (Fragment fragment : fragments) { + List data = new ArrayList<>(); + try (LanceScanner scanner = + dataset.newScan( + new ScanOptions.Builder() + .fragmentIds(Collections.singletonList(fragment.getId())) + .withRowId(true) + .columns(Collections.singletonList("id")) + .build()); + ArrowReader arrowReader = scanner.scanBatches(); ) { + while (arrowReader.loadNextBatch()) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + UInt8Vector rowIdVec = (UInt8Vector) root.getVector("_rowid"); + IntVector idVec = (IntVector) root.getVector("id"); + for (int i = 0; i < root.getRowCount(); i++) { + data.add(new long[] {idVec.get(i), rowIdVec.get(i)}); + } + } + } + + data.sort((d1, d2) -> Long.compare(d1[0], d2[0])); + segments.add(createBtreeIndexFromPreprocessedData(dataset, data, fragment, allocator)); + } + + String indexName = "test_index"; + List committed = dataset.commitExistingIndexSegments(indexName, "id", segments); + assertEquals(2, committed.size()); + assertTrue(dataset.listIndexes().contains(indexName)); + + assertEquals( + 6, dataset.countIndexedRows(indexName, "id in (10, 20, 30)", Optional.empty())); + assertEquals( + 20, dataset.countIndexedRows(indexName, "id >= 50 AND id < 60", Optional.empty())); + } + } + } + + private Index createBtreeIndexFromPreprocessedData( + Dataset dataset, + List preprocessedData, + Fragment fragment, + BufferAllocator allocator) { + Schema schema = + new Schema( + Arrays.asList( + Field.nullable("value", new ArrowType.Int(32, true)), + Field.nullable("_rowid", new ArrowType.Int(64, false))), + null); + try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + root.allocateNew(); + IntVector idVec = (IntVector) root.getVector("value"); + UInt8Vector rowIdVec = (UInt8Vector) root.getVector("_rowid"); + for (int i = 0; i < preprocessedData.size(); i++) { + long[] dataPair = preprocessedData.get(i); + idVec.setSafe(i, (int) dataPair[0]); + rowIdVec.setSafe(i, dataPair[1]); + } + root.setRowCount(preprocessedData.size()); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (ArrowStreamWriter writer = new ArrowStreamWriter(root, null, out)) { + writer.start(); + writer.writeBatch(); + writer.end(); + } catch (IOException e) { + throw new RuntimeException("Cannot write schema root", e); + } + + byte[] arrowData = out.toByteArray(); + ByteArrayInputStream in = new ByteArrayInputStream(arrowData); + + try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator); + ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + Data.exportArrayStream(allocator, reader, stream); + + ScalarIndexParams scalarParams = ScalarIndexParams.create("btree", "{\"zone_size\": 64}"); + IndexParams indexParams = IndexParams.builder().setScalarIndexParams(scalarParams).build(); + return dataset.createIndex( + IndexOptions.builder(Collections.singletonList("id"), IndexType.BTREE, indexParams) + .withIndexName("test_index") + .withFragmentIds(Collections.singletonList(fragment.getId())) + .withPreprocessedData(stream) + .build()); + } catch (Exception e) { + throw new RuntimeException("Cannot read arrow stream.", e); + } + } + } + + @Test + public void testBtreeMergeIndexMetadataSoftBreak(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("btree_merge_metadata_soft_break").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + testDataset.write(1, 10).close(); + try (Dataset dataset = testDataset.write(2, 10)) { + List fragments = dataset.getFragments(); + assertEquals(2, fragments.size()); + ScalarIndexParams scalarParams = ScalarIndexParams.create("btree", "{\"zone_size\": 2048}"); IndexParams indexParams = IndexParams.builder().setScalarIndexParams(scalarParams).build(); UUID uuid = UUID.randomUUID(); - // 2. a distributed worker can still build one fragment-scoped segment (uncommitted). dataset.createIndex( IndexOptions.builder(Collections.singletonList("name"), IndexType.BTREE, indexParams) .withIndexName("test_index") @@ -117,7 +272,6 @@ public void testBtreeMergeIndexMetadataSoftBreak(@TempDir Path tempDir) throws E .withFragmentIds(Collections.singletonList(fragments.get(0).getId())) .build()); - // 3. but the shard-then-merge commit step is no longer supported. Exception ex = Assertions.assertThrows( Exception.class, diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index eeb2dacff6a..b15471cfc57 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2975,7 +2975,7 @@ def create_scalar_index( index_uuid: Optional[str] = None, progress_callback: Optional[Callable[[IndexProgress], None]] = None, **kwargs, - ): + ) -> Index: """Create a scalar index on a column. Scalar indices, like vector indices, can be used to speed up scans. A scalar @@ -3163,6 +3163,12 @@ def create_scalar_index( that use scalar indices will either have a ``ScalarIndexQuery`` relation or a ``MaterializeIndex`` operator. + Returns + ------- + Index + Metadata for the index. When ``fragment_ids`` is provided the index is + built but not committed, and this is the resulting segment metadata to + pass to :meth:`commit_existing_index_segments`. """ if isinstance(column, str): column = [column] @@ -3264,7 +3270,9 @@ def create_scalar_index( if progress_callback is not None: kwargs["progress_callback"] = progress_callback - self._ds.create_index([column], index_type, name, replace, train, None, kwargs) + return self._ds.create_index( + [column], index_type, name, replace, train, None, kwargs + ) def _create_index_impl( self, diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index b8af34d448e..52662b16663 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -95,6 +95,58 @@ def data_table(indexed_dataset: lance.LanceDataset): return indexed_dataset.scanner().to_table() +def _commit_segmented_btree_index(dataset, column, index_name): + segments = [ + dataset.create_scalar_index( + column=column, + index_type="BTREE", + name=index_name, + fragment_ids=[fragment.fragment_id], + ) + for fragment in dataset.get_fragments() + ] + return dataset.commit_existing_index_segments(index_name, column, segments) + + +@pytest.fixture +def btree_comparison_datasets(tmp_path): + """Setup datasets for B-tree comparison tests""" + num_fragments = 3 + rows_per_fragment = 10000 + total_rows = num_fragments * rows_per_fragment + + fragment_ds = generate_multi_fragment_dataset( + tmp_path / "fragment", + num_fragments=num_fragments, + rows_per_fragment=rows_per_fragment, + ) + + complete_ds = generate_multi_fragment_dataset( + tmp_path / "complete", + num_fragments=num_fragments, + rows_per_fragment=rows_per_fragment, + ) + + fragment_ds_committed = _commit_segmented_btree_index( + fragment_ds, "id", "fragment_btree_precise_test" + ) + + complete_index_name = f"complete_btree_{uuid.uuid4().hex[:8]}" + complete_ds.create_scalar_index( + column="id", + index_type="BTREE", + name=complete_index_name, + ) + complete_ds = lance.dataset(complete_ds.uri) + + return { + "fragment_ds": fragment_ds_committed, + "complete_ds": complete_ds, + "rows_per_fragment": rows_per_fragment, + "total_rows": total_rows, + } + + def test_load_indices(indexed_dataset: lance.LanceDataset): indices = indexed_dataset.describe_indices() vec_idx = next(idx for idx in indices if "VectorIndex" in idx.type_url) @@ -3668,6 +3720,69 @@ def test_backward_compatibility_changed_index_protos(tmp_path): assert results.column("x").to_pylist() == [100] +def test_distribute_btree_index_build(tmp_path): + """ + Test distributed B-tree index build with segmented index commit. + This test creates B-tree segments on individual fragments and then + commits them as a single logical index. + """ + ds = generate_multi_fragment_dataset( + tmp_path, num_fragments=4, rows_per_fragment=10000 + ) + + index_name = "btree_multiple_fragment_idx" + ds_committed = _commit_segmented_btree_index(ds, "id", index_name) + + stats = ds_committed.stats.index_stats(index_name) + assert stats["name"] == index_name + assert stats["index_type"] == "BTree" + + test_id = 100 + results = ds_committed.scanner( + filter=f"id = {test_id}", + columns=["id", "text"], + ).to_table() + + assert results.num_rows == 1, f"No results found for id = {test_id}" + + results_range = ds_committed.scanner( + filter="id >= 200 AND id < 800", + columns=["id", "text"], + ).to_table() + + assert results_range.num_rows > 0, "No results found for range query" + + reference_ds = generate_multi_fragment_dataset( + tmp_path / "reference", num_fragments=4, rows_per_fragment=10000 + ) + + reference_ds.create_scalar_index( + column="id", + index_type="BTREE", + name="reference_btree_idx", + ) + + reference_results = reference_ds.scanner( + filter=f"id = {test_id}", + columns=["id", "text"], + ).to_table() + + assert results.num_rows == reference_results.num_rows, ( + f"Distributed index returned {results.num_rows} results, " + f"but complete index returned {reference_results.num_rows} results" + ) + + reference_range_results = reference_ds.scanner( + filter="id >= 200 AND id < 800", + columns=["id", "text"], + ).to_table() + + assert results_range.num_rows == reference_range_results.num_rows, ( + f"Distributed index range query returned {results_range.num_rows} results, " + f"but complete index returned {reference_range_results.num_rows} results" + ) + + def _assert_committed_distributed_bitmap_index(ds, index_id, index_name, fragment_ids): ds.merge_index_metadata(index_id, index_type="BITMAP") @@ -3784,6 +3899,15 @@ def test_distributed_bitmap_index_build_single_fragment_shards(tmp_path): _assert_committed_distributed_bitmap_index(ds, index_id, index_name, fragment_ids) +def test_merge_index_metadata_btree_soft_break(tmp_path): + ds = generate_multi_fragment_dataset( + tmp_path, num_fragments=2, rows_per_fragment=100 + ) + + with pytest.raises(ValueError, match="no longer supports merge_index_metadata"): + ds.merge_index_metadata(str(uuid.uuid4()), index_type="BTREE") + + def test_btree_fragment_ids_parameter_validation(tmp_path): """ Test validation of fragment_ids parameter for B-tree indices. @@ -3815,6 +3939,77 @@ def test_btree_fragment_ids_parameter_validation(tmp_path): print(f"Expected error for invalid fragment ID: {e}") +@pytest.mark.parametrize( + "test_name,filter_expr", + [ + # Test 1: Boundary values at fragment edges + ("First value", "id = 0"), + ("Fragment 0 last value", "id = 9999"), + ("Fragment 1 first value", "id = 10000"), + ("Fragment 1 last value", "id = 19999"), + ("Fragment 2 first value", "id = 20000"), + ("Last value", "id = 29999"), + # Test 2: Values in the middle of fragments + ("Fragment 0 middle", "id = 5000"), + ("Fragment 1 middle", "id = 15000"), + ("Fragment 2 middle", "id = 25000"), + # Test 3: Range queries within single fragments + ("Range within fragment 0", "id >= 10 AND id < 20"), + ("Range within fragment 1", "id >= 10010 AND id < 10020"), + ("Range within fragment 2", "id >= 20010 AND id < 20020"), + # Test 4: Range queries spanning multiple fragments + ("Cross fragment 0-1", "id >= 9995 AND id < 10005"), + ("Cross fragment 1-2", "id >= 19995 AND id < 20005"), + ("Cross all fragments", "id >= 5000 AND id < 25000"), + # Test 5: Edge cases + ("Non-existent small value", "id = -1"), + ("Non-existent large value", "id = 30100"), + ("Large range", "id >= 0 AND id < 30000"), + # Test 6: Comparison operators + ("Less than boundary", "id < 10000"), + ("Greater than boundary", "id > 19999"), + ("Less than or equal", "id <= 10050"), + ("Greater than or equal", "id >= 10050"), + ], +) +def test_btree_query_comparison_parametrized( + btree_comparison_datasets, test_name, filter_expr +): + """ + Parametrized B-tree index query comparison test. + + Compares segmented fragment-built BTree results with a complete BTree index. + """ + fragment_ds = btree_comparison_datasets["fragment_ds"] + complete_ds = btree_comparison_datasets["complete_ds"] + + fragment_results = fragment_ds.scanner( + filter=filter_expr, + columns=["id", "text"], + ).to_table() + + complete_results = complete_ds.scanner( + filter=filter_expr, + columns=["id", "text"], + ).to_table() + + assert fragment_results.num_rows == complete_results.num_rows, ( + f"Test '{test_name}' failed: Fragment index " + f"returned {fragment_results.num_rows} rows, " + f"but complete index returned {complete_results.num_rows}" + f" rows for filter: {filter_expr}" + ) + + if fragment_results.num_rows > 0: + fragment_ids = sorted(fragment_results.column("id").to_pylist()) + complete_ids = sorted(complete_results.column("id").to_pylist()) + + assert fragment_ids == complete_ids, ( + f"Test '{test_name}' failed: Fragment index " + f"and complete index returned different results for filter: {filter_expr}" + ) + + def test_fts_flat_fallback_matches_wand(tmp_path): # Repro: when filter matches < 10%, FTS fell back to flat search and missed results. # Two-term query increases reproduction likelihood. diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index cb57b0c195f..ba6d3dd142d 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -2742,35 +2742,19 @@ pub struct BTreeParameters { /// The number of rows to include in each zone pub zone_size: Option, - /// DEPRECATED / UNSUPPORTED: range-based distributed BTree building has been retired. - /// Setting this to `Some(..)` is now rejected at build time (see `BTreeIndexPlugin::train_index`). - /// Build one segment per worker and commit them with `commit_existing_index_segments(...)`, - /// optionally consolidating with `merge_existing_index_segments(...)`. The field is retained - /// (rather than removed) so the plugin can detect and reject stale `range_id` inputs loudly - /// instead of serde silently dropping an unknown field. - /// The ordinal ID of a data partition for building a large, distributed BTree index. + /// DEPRECATED: range-based distributed BTree building has been retired. + /// Setting this to `Some(..)` now emits a warning and is ignored at build time + /// (see `BTreeIndexPlugin::train_index`). Build one segment per worker and + /// commit them with `commit_existing_index_segments(...)`, optionally + /// consolidating with `merge_existing_index_segments(...)`. The field is + /// retained (rather than removed) so the plugin can detect stale `range_id` + /// inputs and warn loudly instead of serde silently dropping an unknown field. /// - /// When building an index from multiple, pre-partitioned data chunks (for example, - /// in a distributed environment), this ID specifies which partition this particular - /// build operation corresponds to. - /// - /// # Data Distribution Requirements - /// - /// If this parameter is `Some(id)`, the caller **must** guarantee that the input data - /// is strictly global sorted. The input data, when considered as a whole across all - /// partitions ordered by `range_id`, must be sorted. - /// - /// Concretely, this means: - /// - /// All values in the data provided for `range_id: N` must be **less than or equal to** - /// all values in the data for `range_id: N+1`. - /// - /// Lance relies on this precondition to ensure the final, merged index is valid and - /// correctly ordered. - /// - /// # `None` Case - /// - /// If `range_id` is `None`, a single, monolithic index is built over the provided dataset. + /// Historically, this was the ordinal ID of a globally sorted range + /// partition. Lance used it to write `part_*` BTree files that were later + /// merged by `merge_index_metadata`. That flow has been retired. A + /// pre-sorted training stream is still accepted, but this field no longer + /// affects file names, commit behavior, or query semantics. pub range_id: Option, } diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 759ab956a4e..70efea17c57 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -755,18 +755,21 @@ mod tests { use arrow_array::{FixedSizeListArray, RecordBatchIterator}; use arrow_array::{Int32Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use datafusion::common::ScalarValue; use lance_arrow::FixedSizeListArrayExt; - use lance_core::utils::tempfile::TempStrDir; + use lance_core::utils::{address::RowAddress, tempfile::TempStrDir}; use lance_datagen::{self, gen_batch}; use lance_index::optimize::OptimizeOptions; use lance_index::progress::IndexBuildProgress; - use lance_index::scalar::{FullTextSearchQuery, inverted::tokenizer::InvertedIndexParams}; + use lance_index::scalar::{ + FullTextSearchQuery, SargableQuery, SearchResult, inverted::tokenizer::InvertedIndexParams, + }; use lance_index::vector::hnsw::builder::HnswBuildParams; use lance_index::vector::ivf::IvfBuildParams; use lance_index::vector::kmeans::{KMeansParams, train_kmeans}; use lance_linalg::distance::{DistanceType, MetricType}; use serde_json::json; - use std::sync::Arc; + use std::{collections::BTreeSet, ops::Bound, sync::Arc}; use uuid::Uuid; lance_testing::define_stage_event_progress!(RecordingProgress, IndexBuildProgress, Result<()>); @@ -1324,13 +1327,102 @@ mod tests { ); } + #[tokio::test] + async fn test_segmented_btree_multi_fragment_commit_and_search() { + let test_dir = TempStrDir::default(); + let dataset = gen_batch() + .col("value", lance_datagen::array::step::()) + .into_dataset( + test_dir.as_str(), + FragmentCount::from(4), + FragmentRowCount::from(16), + ) + .await + .unwrap(); + let mut dataset = dataset; + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 4); + + let params = ScalarIndexParams::for_builtin(lance_index::scalar::BuiltinIndexType::BTree); + let mut segments = Vec::new(); + for fragment in &fragments { + segments.push( + CreateIndexBuilder::new(&mut dataset, &["value"], IndexType::BTree, ¶ms) + .name("value_btree_segments".to_string()) + .fragments(vec![fragment.id() as u32]) + .execute_uncommitted() + .await + .unwrap(), + ); + } + let segment_uuids = segments + .iter() + .map(|segment| segment.uuid) + .collect::>(); + + dataset + .commit_existing_index_segments("value_btree_segments", "value", segments) + .await + .unwrap(); + + let committed = dataset + .load_indices_by_name("value_btree_segments") + .await + .unwrap(); + assert_eq!(committed.len(), fragments.len()); + for segment_uuid in &segment_uuids { + assert_canonical_btree_segment(&dataset, segment_uuid).await; + } + + let logical = crate::index::scalar_logical::open_named_scalar_index( + &dataset, + "value", + "value_btree_segments", + &NoOpMetricsCollector, + ) + .await + .unwrap(); + + let point_query = SargableQuery::Equals(ScalarValue::Int32(Some(33))); + let point_matches = match logical + .search(&point_query, &NoOpMetricsCollector) + .await + .unwrap() + { + SearchResult::Exact(row_addrs) => row_addrs.true_rows().row_addrs().unwrap().count(), + other => panic!("expected exact point result, got {other:?}"), + }; + assert_eq!(point_matches, 1); + + let range_query = SargableQuery::Range( + Bound::Included(ScalarValue::Int32(Some(14))), + Bound::Excluded(ScalarValue::Int32(Some(35))), + ); + let range_row_addrs = match logical + .search(&range_query, &NoOpMetricsCollector) + .await + .unwrap() + { + SearchResult::Exact(row_addrs) => row_addrs, + other => panic!("expected exact range result, got {other:?}"), + }; + let searched_fragments = range_row_addrs + .true_rows() + .row_addrs() + .unwrap() + .map(|row_addr| RowAddress::from(u64::from(row_addr)).fragment_id()) + .collect::>(); + assert_eq!(searched_fragments.len(), 21); + assert_eq!( + searched_fragments.into_iter().collect::>(), + BTreeSet::from([0, 1, 2]) + ); + } + #[tokio::test] async fn test_range_based_btree_index_create() { use crate::dataset::scanner::ColumnOrdering; - use datafusion::common::ScalarValue; use futures::TryStreamExt; - use lance_index::scalar::{SargableQuery, SearchResult}; - use std::ops::Bound; let tmpdir = TempStrDir::default(); let dataset_uri = format!("file://{}", tmpdir.as_str()); From bf1c9b7ff62e94e912741c9525cb73cfba436cf9 Mon Sep 17 00:00:00 2001 From: zhangyue19921010 Date: Sun, 31 May 2026 21:26:16 +0800 Subject: [PATCH 3/3] refactor(index)!: move distributed BTree build to segmented index framework --- python/python/lance/dataset.py | 71 +++++++++++++++++------- python/python/tests/test_scalar_index.py | 29 +++++----- 2 files changed, 66 insertions(+), 34 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index b15471cfc57..1e1247ad1e0 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2975,7 +2975,7 @@ def create_scalar_index( index_uuid: Optional[str] = None, progress_callback: Optional[Callable[[IndexProgress], None]] = None, **kwargs, - ) -> Index: + ): """Create a scalar index on a column. Scalar indices, like vector indices, can be used to speed up scans. A scalar @@ -3065,11 +3065,11 @@ def create_scalar_index( structure. If False, an empty index will be created that can be populated later. fragment_ids : List[int], optional - If provided, the index will be created only on the specified fragments. - This enables distributed/fragment-level indexing. When provided, the - method returns metadata for one segment but does not commit - the index to the dataset. The segment can be planned, merged, and - committed later using the segment builder and commit APIs. + If provided, the index will be created only on the specified fragments + using the legacy distributed scalar-index path. BTREE indices do not + support this path; build distributed BTREE segments with + :meth:`create_index_uncommitted` (``index_type="BTREE"``) and publish + them with :meth:`commit_existing_index_segments` instead. This parameter is passed via kwargs internally. index_uuid : str, optional A UUID to use for the segment written by this call. @@ -3163,12 +3163,6 @@ def create_scalar_index( that use scalar indices will either have a ``ScalarIndexQuery`` relation or a ``MaterializeIndex`` operator. - Returns - ------- - Index - Metadata for the index. When ``fragment_ids`` is provided the index is - built but not committed, and this is the resulting segment metadata to - pass to :meth:`commit_existing_index_segments`. """ if isinstance(column, str): column = [column] @@ -3256,12 +3250,23 @@ def create_scalar_index( f"Scalar index column {column} cannot currently be a duration" ) elif isinstance(index_type, IndexConfig): + if fragment_ids is not None and index_type.index_type.upper() == "BTREE": + raise ValueError( + "BTree distributed indexing uses create_index_uncommitted(..., " + 'index_type="BTREE", fragment_ids=...)' + ) config = json.dumps(index_type.parameters) kwargs["config"] = indices.IndexConfig(index_type.index_type, config) index_type = "scalar" else: raise Exception("index_type must be str or IndexConfig") + if fragment_ids is not None and index_type == "BTREE": + raise ValueError( + "BTree distributed indexing uses create_index_uncommitted(..., " + 'index_type="BTREE", fragment_ids=...)' + ) + # Add fragment_ids and index_uuid to kwargs if provided if fragment_ids is not None: kwargs["fragment_ids"] = fragment_ids @@ -3270,9 +3275,7 @@ def create_scalar_index( if progress_callback is not None: kwargs["progress_callback"] = progress_callback - return self._ds.create_index( - [column], index_type, name, replace, train, None, kwargs - ) + self._ds.create_index([column], index_type, name, replace, train, None, kwargs) def _create_index_impl( self, @@ -3923,10 +3926,10 @@ def create_index_uncommitted( """ Create one segment without publishing it and return its metadata. - This is the public distributed-build API for vector index - construction. Unlike :meth:`create_index`, this method does not publish - the index into the dataset manifest. Instead, it writes one segment - under ``_indices//`` and returns the resulting + This is the public distributed-build API for vector and BTREE scalar + index construction. Unlike :meth:`create_index`, this method does not + publish the index into the dataset manifest. Instead, it writes one + segment under ``_indices//`` and returns the resulting :class:`Index` metadata. Callers should: @@ -3940,6 +3943,10 @@ def create_index_uncommitted( 5. build one or more physical segments and commit them with :meth:`commit_existing_index_segments` + BTREE segments do not yet support the segment builder (steps 3-4); collect + the returned segments and pass them straight to + :meth:`commit_existing_index_segments`. + Parameters are the same as :meth:`create_index`, with one additional requirement: @@ -3950,6 +3957,32 @@ def create_index_uncommitted( Index Metadata for the segment that was written by this call. """ + if isinstance(index_type, str) and index_type.upper() == "BTREE": + if fragment_ids is None: + raise ValueError( + "create_index_uncommitted requires fragment_ids " + "for distributed index build" + ) + if not isinstance(column, str): + raise NotImplementedError( + "Scalar indices currently only support a single column" + ) + + kwargs = dict(kwargs) + kwargs["fragment_ids"] = fragment_ids + if index_uuid is not None: + kwargs["index_uuid"] = index_uuid + + return self._ds.create_index( + [column], + "BTREE", + name, + replace, + train, + storage_options, + kwargs, + ) + return self._create_index_impl( column, index_type, diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index 52662b16663..11dca24c499 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -97,7 +97,7 @@ def data_table(indexed_dataset: lance.LanceDataset): def _commit_segmented_btree_index(dataset, column, index_name): segments = [ - dataset.create_scalar_index( + dataset.create_index_uncommitted( column=column, index_type="BTREE", name=index_name, @@ -3916,27 +3916,26 @@ def test_btree_fragment_ids_parameter_validation(tmp_path): tmp_path, num_fragments=2, rows_per_fragment=10000 ) - # Test with valid fragment IDs fragments = ds.get_fragments() valid_fragment_id = fragments[0].fragment_id - # This should work without errors - ds.create_scalar_index( - column="id", - index_type="BTREE", - fragment_ids=[valid_fragment_id], - ) - - # Test with invalid fragment ID (should handle gracefully) - try: + # create_scalar_index no longer accepts fragment_ids for BTREE; distributed + # builds must go through the segmented create_index_uncommitted path. + with pytest.raises(ValueError, match="create_index_uncommitted"): ds.create_scalar_index( column="id", index_type="BTREE", - fragment_ids=[999999], # Non-existent fragment ID + fragment_ids=[valid_fragment_id], ) - except Exception as e: - # It's acceptable for this to fail with an appropriate error - print(f"Expected error for invalid fragment ID: {e}") + + # Building one uncommitted segment for a valid fragment should work and + # return the segment metadata without committing it. + segment = ds.create_index_uncommitted( + column="id", + index_type="BTREE", + fragment_ids=[valid_fragment_id], + ) + assert segment.fragment_ids == {valid_fragment_id} @pytest.mark.parametrize(