Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 1 addition & 16 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ use lance_file::version::LanceFileVersion;
use lance_index::IndexCriteria as RustIndexCriteria;
use lance_index::optimize::OptimizeOptions;
use lance_index::progress::noop_progress;
use lance_index::scalar::btree::BTreeParameters;
use lance_index::{IndexParams, IndexType};
use lance_io::object_store::ObjectStoreRegistry;
use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsProvider};
Expand Down Expand Up @@ -980,7 +979,7 @@ fn inner_create_index<'local>(
index_type: index_type_str,
params: params_opt.clone(),
};
skip_commit = skip_commit || should_skip_commit(index_type, &params_opt)?;
skip_commit = skip_commit || (index_type == IndexType::BTree && batch_reader.is_some());
Ok(Box::new(scalar_params))
}
IndexType::FragmentReuse | IndexType::MemWal => {
Expand Down Expand Up @@ -1060,20 +1059,6 @@ fn inner_drop_index(env: &mut JNIEnv, java_dataset: JObject, name: JString) -> R
Ok(())
}

fn should_skip_commit(index_type: IndexType, params_opt: &Option<String>) -> Result<bool> {
match index_type {
IndexType::BTree => {
// Should defer the commit if we are building range-based BTree index
if let Some(params) = params_opt {
let btree_parameters = serde_json::from_str::<BTreeParameters>(params)?;
return Ok(btree_parameters.range_id.is_some());
}
Ok(false)
}
_ => Ok(false),
}
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_Dataset_innerMergeIndexMetadata<'local>(
mut env: JNIEnv<'local>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,14 @@ public Builder zoneSize(long zoneSize) {
* @param rangeId non-negative range identifier
* @return this builder
* @throws IllegalArgumentException
* @deprecated {@code rangeId} is no longer needed and is now ignored at build time (a warning
* is logged if set). A pre-sorted data stream is still supported — just pass the reader
* without a {@code rangeId}. Each build produces one canonical segment; for distributed
* builds, build one segment per worker and commit them with {@code
* commitExistingIndexSegments(...)}, optionally consolidating with {@code
* mergeExistingIndexSegments(...)}. This parameter will be removed in a future release.
*/
@Deprecated
public Builder rangeId(int rangeId) {
if (rangeId < 0) {
throw new IllegalArgumentException("rangeId must be non-negative");
Expand Down
5 changes: 1 addition & 4 deletions java/src/test/java/org/lance/DatasetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2196,10 +2196,7 @@ void testDropIndex(@TempDir Path tempDir) {
dataset.listIndexes().contains("test_index"),
"Partially created index should not present");

// 3. merge metadata, which will still not be committed
dataset.mergeIndexMetadata(uuid.toString(), IndexType.BTREE, Optional.empty());

// 4. commit the index
// 3. commit the index
int fieldId =
dataset.getLanceSchema().fields().stream()
.filter(f -> f.getName().equals("name"))
Expand Down
252 changes: 101 additions & 151 deletions java/src/test/java/org/lance/index/ScalarIndexTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,13 @@
*/
package org.lance.index;

import org.lance.CommitBuilder;
import org.lance.Dataset;
import org.lance.Fragment;
import org.lance.TestUtils;
import org.lance.Transaction;
import org.lance.WriteParams;
import org.lance.index.scalar.ScalarIndexParams;
import org.lance.ipc.LanceScanner;
import org.lance.ipc.ScanOptions;
import org.lance.operation.CreateIndex;

import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.Data;
Expand Down Expand Up @@ -51,7 +48,6 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -117,177 +113,95 @@ public void testCreateBTreeIndexDistributively(@TempDir Path tempDir) throws Exc
testDataset.write(1, 10).close();
try (Dataset dataset = testDataset.write(2, 10)) {
List<Fragment> fragments = dataset.getFragments();
assertEquals(2, dataset.getFragments().size());
assertEquals(2, fragments.size());

ScalarIndexParams scalarParams = ScalarIndexParams.create("btree", "{\"zone_size\": 2048}");
IndexParams indexParams = IndexParams.builder().setScalarIndexParams(scalarParams).build();
UUID uuid = UUID.randomUUID();

// 2. partially create index
dataset.createIndex(
IndexOptions.builder(Collections.singletonList("name"), IndexType.BTREE, indexParams)
.withIndexName("test_index")
.withIndexUUID(uuid.toString())
.withFragmentIds(Collections.singletonList(fragments.get(0).getId()))
.build());
dataset.createIndex(
IndexOptions.builder(Collections.singletonList("name"), IndexType.BTREE, indexParams)
.withIndexName("test_index")
.withIndexUUID(uuid.toString())
.withFragmentIds(Collections.singletonList(fragments.get(1).getId()))
.build());
String indexName = "test_index";

List<Index> segments = new ArrayList<>();
for (Fragment fragment : fragments) {
segments.add(
dataset.createIndex(
IndexOptions.builder(
Collections.singletonList("name"), IndexType.BTREE, indexParams)
.withIndexName(indexName)
.withFragmentIds(Collections.singletonList(fragment.getId()))
.build()));
}

// then no index should have been created
assertFalse(
dataset.listIndexes().contains("test_index"),
dataset.listIndexes().contains(indexName),
"Partially created index should not present");

// 3. merge metadata, which will still not be committed
dataset.mergeIndexMetadata(uuid.toString(), IndexType.BTREE, Optional.empty());
List<Index> committed = dataset.commitExistingIndexSegments(indexName, "name", segments);
assertEquals(2, committed.size());
assertTrue(dataset.listIndexes().contains(indexName));

// 4. commit the index
int fieldId =
dataset.getLanceSchema().fields().stream()
.filter(f -> f.getName().equals("name"))
.findAny()
.orElseThrow(() -> new RuntimeException("Cannot find 'name' field for TestDataset"))
.getId();

long datasetVersion = dataset.version();

Index index =
Index.builder()
.uuid(uuid)
.name("test_index")
.fields(Collections.singletonList(fieldId))
.datasetVersion(datasetVersion)
.indexVersion(0)
.fragments(fragments.stream().map(Fragment::getId).collect(Collectors.toList()))
.build();

CreateIndex createIndexOp =
CreateIndex.builder().withNewIndices(Collections.singletonList(index)).build();

try (Transaction createIndexTx =
new Transaction.Builder()
.readVersion(datasetVersion)
.operation(createIndexOp)
.build()) {
try (Dataset newDataset = new CommitBuilder(dataset).execute(createIndexTx)) {
// new dataset should contain that index
assertEquals(datasetVersion + 1, newDataset.version());
assertTrue(newDataset.listIndexes().contains("test_index"));
}
}
assertEquals(2, dataset.countIndexedRows(indexName, "name = 'Person 5'", Optional.empty()));
assertEquals(
10,
dataset.countIndexedRows(
indexName, "name >= 'Person 3' AND name < 'Person 8'", Optional.empty()));
}
}
}

@Test
public void testRangedBTreeIndex(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("ranged_btree_map").toString();
UUID indexUUID = UUID.randomUUID();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
// 1. write some data
try (Dataset dataset = testDataset.write(1, 200)) {

// 2. scan data out
List<long[]> data = new ArrayList<>();
try (LanceScanner scanner =
dataset.newScan(
new ScanOptions.Builder()
.withRowId(true)
.columns(Collections.singletonList("id"))
.build());
ArrowReader arrowReader = scanner.scanBatches(); ) {
while (arrowReader.loadNextBatch()) {
VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
UInt8Vector rowIdVec = (UInt8Vector) root.getVector("_rowid");
IntVector idVec = (IntVector) root.getVector("id");
for (int i = 0; i < root.getRowCount(); i++) {
data.add(new long[] {idVec.get(i), rowIdVec.get(i)});
}
}
}

// 3. sort data globally (This will be done by computing engines in production)
data.sort((d1, d2) -> (int) (d1[0] - d2[0]));
int mid = data.size() / 2;

// 4. divide sorted data into ranges and build index for each range
createBtreeIndexForRange(dataset, data.subList(0, mid), 1, allocator, indexUUID);
createBtreeIndexForRange(dataset, data.subList(mid, data.size()), 2, allocator, indexUUID);

// 5. merge index.
dataset.mergeIndexMetadata(indexUUID.toString(), IndexType.BTREE, Optional.empty());

// 6. commit index
long datasetVersion = dataset.version();
int fieldId =
dataset.getLanceSchema().fields().stream()
.filter(f -> f.getName().equals("id"))
.findAny()
.orElseThrow(() -> new RuntimeException("Cannot find 'id' field for TestDataset"))
.getId();
Index index =
Index.builder()
.uuid(indexUUID)
.name("test_index")
.fields(Collections.singletonList(fieldId))
.datasetVersion(datasetVersion)
.indexVersion(0)
.fragments(
dataset.getFragments().stream()
.map(Fragment::getId)
.collect(Collectors.toList()))
.build();

CreateIndex createIndexOp =
CreateIndex.builder().withNewIndices(Collections.singletonList(index)).build();

try (Transaction createIndexTx =
new Transaction.Builder()
.readVersion(datasetVersion)
.operation(createIndexOp)
.build()) {
try (Dataset newDataset = new CommitBuilder(dataset).execute(createIndexTx)) {
// new dataset should contain that index
assertEquals(datasetVersion + 1, newDataset.version());
assertTrue(newDataset.listIndexes().contains("test_index"));

// 7. compare results
// force use index should get the right value
ScanOptions scanOptions =
new ScanOptions.Builder().withRowId(true).filter("id in (10, 20, 30)").build();
try (LanceScanner scanner = newDataset.newScan(scanOptions);
ArrowReader arrowReader = scanner.scanBatches(); ) {
List<Integer> ids = new ArrayList<>();
while (arrowReader.loadNextBatch()) {
VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
IntVector idVec = (IntVector) root.getVector("id");
for (int i = 0; i < idVec.getValueCount(); i++) {
ids.add(idVec.get(i));
}
testDataset.write(1, 100).close();
try (Dataset dataset = testDataset.write(2, 100)) {
List<Fragment> fragments = dataset.getFragments();
assertEquals(2, fragments.size());

List<Index> segments = new ArrayList<>();
for (Fragment fragment : fragments) {
List<long[]> data = new ArrayList<>();
try (LanceScanner scanner =
dataset.newScan(
new ScanOptions.Builder()
.fragmentIds(Collections.singletonList(fragment.getId()))
.withRowId(true)
.columns(Collections.singletonList("id"))
.build());
ArrowReader arrowReader = scanner.scanBatches(); ) {
while (arrowReader.loadNextBatch()) {
VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
UInt8Vector rowIdVec = (UInt8Vector) root.getVector("_rowid");
IntVector idVec = (IntVector) root.getVector("id");
for (int i = 0; i < root.getRowCount(); i++) {
data.add(new long[] {idVec.get(i), rowIdVec.get(i)});
}
Collections.sort(ids);
Assertions.assertIterableEquals(Arrays.asList(10, 20, 30), ids);
}
}

data.sort((d1, d2) -> Long.compare(d1[0], d2[0]));
segments.add(createBtreeIndexFromPreprocessedData(dataset, data, fragment, allocator));
}

String indexName = "test_index";
List<Index> committed = dataset.commitExistingIndexSegments(indexName, "id", segments);
assertEquals(2, committed.size());
assertTrue(dataset.listIndexes().contains(indexName));

assertEquals(
6, dataset.countIndexedRows(indexName, "id in (10, 20, 30)", Optional.empty()));
assertEquals(
20, dataset.countIndexedRows(indexName, "id >= 50 AND id < 60", Optional.empty()));
}
}
}

private void createBtreeIndexForRange(
private Index createBtreeIndexFromPreprocessedData(
Dataset dataset,
List<long[]> preprocessedData,
int rangeId,
BufferAllocator allocator,
UUID indexUUID) {
// Note that the indexing column is called 'value' in btree.
Fragment fragment,
BufferAllocator allocator) {
Schema schema =
new Schema(
Arrays.asList(
Expand All @@ -300,7 +214,7 @@ private void createBtreeIndexForRange(
UInt8Vector rowIdVec = (UInt8Vector) root.getVector("_rowid");
for (int i = 0; i < preprocessedData.size(); i++) {
long[] dataPair = preprocessedData.get(i);
idVec.set(i, (int) dataPair[0]);
idVec.setSafe(i, (int) dataPair[0]);
rowIdVec.setSafe(i, dataPair[1]);
}
root.setRowCount(preprocessedData.size());
Expand All @@ -321,12 +235,12 @@ private void createBtreeIndexForRange(
ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) {
Data.exportArrayStream(allocator, reader, stream);

ScalarIndexParams scalarParams =
ScalarIndexParams.create("btree", String.format("{\"range_id\": %s}", rangeId));
ScalarIndexParams scalarParams = ScalarIndexParams.create("btree", "{\"zone_size\": 64}");
IndexParams indexParams = IndexParams.builder().setScalarIndexParams(scalarParams).build();
dataset.createIndex(
return dataset.createIndex(
IndexOptions.builder(Collections.singletonList("id"), IndexType.BTREE, indexParams)
.withIndexUUID(indexUUID.toString())
.withIndexName("test_index")
.withFragmentIds(Collections.singletonList(fragment.getId()))
.withPreprocessedData(stream)
.build());
} catch (Exception e) {
Expand All @@ -335,6 +249,42 @@ private void createBtreeIndexForRange(
}
}

@Test
public void testBtreeMergeIndexMetadataSoftBreak(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("btree_merge_metadata_soft_break").toString();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
testDataset.write(1, 10).close();
try (Dataset dataset = testDataset.write(2, 10)) {
List<Fragment> fragments = dataset.getFragments();
assertEquals(2, fragments.size());

ScalarIndexParams scalarParams = ScalarIndexParams.create("btree", "{\"zone_size\": 2048}");
IndexParams indexParams = IndexParams.builder().setScalarIndexParams(scalarParams).build();
UUID uuid = UUID.randomUUID();

dataset.createIndex(
IndexOptions.builder(Collections.singletonList("name"), IndexType.BTREE, indexParams)
.withIndexName("test_index")
.withIndexUUID(uuid.toString())
.withFragmentIds(Collections.singletonList(fragments.get(0).getId()))
.build());

Exception ex =
Assertions.assertThrows(
Exception.class,
() ->
dataset.mergeIndexMetadata(uuid.toString(), IndexType.BTREE, Optional.empty()));
assertTrue(
ex.getMessage() != null
&& ex.getMessage().contains("no longer supports merge_index_metadata"),
"expected BTree merge_index_metadata soft-break error, got: " + ex.getMessage());
}
}
}

@Test
public void testCreateZonemapIndex(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("zonemap_test").toString();
Expand Down
Loading
Loading