Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions docs/src/guide/distributed_indexing.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,16 @@ indices/<physical_segment_uuid_1>/

These physical segments are then committed together as one logical index. In the
common no-merge case, the input segments are already the physical
segments and `build_all()` returns them unchanged.
segments and can be committed directly.

## Roles

There are two parties involved in distributed indexing:

- **Workers** build segments
- **The caller** launches workers, chooses how those segments should be turned
into physical segments, provides any additional inputs requested by the
segment build APIs, and
commits the final result
into final segments, optionally merges caller-defined groups, and commits the
final result

Lance does not provide a distributed scheduler. The caller is responsible for
launching workers and driving the overall workflow.
Expand Down
109 changes: 0 additions & 109 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1119,61 +1119,6 @@ fn inner_merge_index_metadata(
Ok(())
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_Dataset_nativeBuildIndexSegments<'local>(
mut env: JNIEnv<'local>,
java_dataset: JObject,
java_segments: JObject,
index_type: jint,
target_segment_bytes_jobj: JObject,
) -> JObject<'local> {
ok_or_throw!(
env,
inner_build_index_segments(
&mut env,
java_dataset,
java_segments,
index_type,
target_segment_bytes_jobj
)
)
}

fn inner_build_index_segments<'local>(
env: &mut JNIEnv<'local>,
java_dataset: JObject,
java_segments: JObject,
index_type: jint,
target_segment_bytes_jobj: JObject,
) -> Result<JObject<'local>> {
let segments = import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?;
let index_type = IndexType::try_from(index_type)?;
let target_segment_bytes = env
.get_long_opt(&target_segment_bytes_jobj)?
.map(|v| v as u64);
let template = segment_template(&segments)?;

let built_segments = {
let dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
let mut builder = dataset_guard
.inner
.create_index_segment_builder()
.with_index_type(index_type)
.with_segments(segments);
if let Some(target_segment_bytes) = target_segment_bytes {
builder = builder.with_target_segment_bytes(target_segment_bytes);
}
RT.block_on(builder.build_all())?
};

let built_metadata = built_segments
.into_iter()
.map(|segment| index_segment_to_metadata(&template, segment))
.collect::<Vec<_>>();
export_vec(env, &built_metadata)
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_Dataset_nativeMergeExistingIndexSegments<'local>(
mut env: JNIEnv<'local>,
Expand Down Expand Up @@ -1250,44 +1195,6 @@ fn inner_commit_existing_index_segments<'local>(
export_vec(env, &committed)
}

struct SegmentTemplate {
name: String,
fields: Vec<i32>,
dataset_version: u64,
}

fn segment_template(segments: &[IndexMetadata]) -> Result<SegmentTemplate> {
let first = segments
.first()
.ok_or_else(|| Error::input_error("segments cannot be empty".to_string()))?;
for segment in &segments[1..] {
if segment.name != first.name {
return Err(Error::input_error(format!(
"All segments must share the same index name, got '{}' and '{}'",
first.name, segment.name
)));
}
if segment.fields != first.fields {
return Err(Error::input_error(format!(
"All segments must target the same field ids, got {:?} and {:?}",
first.fields, segment.fields
)));
}
if segment.dataset_version != first.dataset_version {
return Err(Error::input_error(format!(
"All segments must share the same dataset version, got {} and {}",
first.dataset_version, segment.dataset_version
)));
}
}

Ok(SegmentTemplate {
name: first.name.clone(),
fields: first.fields.clone(),
dataset_version: first.dataset_version,
})
}

fn index_metadata_to_segment(metadata: &IndexMetadata) -> Result<IndexSegment> {
let fragment_bitmap = metadata.fragment_bitmap.clone().ok_or_else(|| {
Error::input_error(format!(
Expand All @@ -1310,22 +1217,6 @@ fn index_metadata_to_segment(metadata: &IndexMetadata) -> Result<IndexSegment> {
))
}

fn index_segment_to_metadata(template: &SegmentTemplate, segment: IndexSegment) -> IndexMetadata {
let (uuid, fragment_bitmap, index_details, index_version) = segment.into_parts();
IndexMetadata {
uuid,
fields: template.fields.clone(),
name: template.name.clone(),
dataset_version: template.dataset_version,
fragment_bitmap: Some(fragment_bitmap),
index_details: Some(index_details),
index_version,
created_at: Some(Utc::now()),
base_id: None,
files: None,
}
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_Dataset_nativeOptimizeIndices(
mut env: JNIEnv,
Expand Down
38 changes: 0 additions & 38 deletions java/src/main/java/org/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -1152,44 +1152,6 @@ public void mergeIndexMetadata(
private native void innerMergeIndexMetadata(
String indexUUID, int indexType, Optional<Integer> batchReadHead);

/**
* Build physical vector index segments from previously-created fragment-level index outputs.
*
* @param segments segment metadata returned by {@link #createIndex(IndexOptions)} when
* fragmentIds are provided
* @param indexType concrete index type for the staged segments
* @param targetSegmentBytes optional size target for merged physical segments
* @return built physical segment metadata
*/
public List<Index> buildIndexSegments(
List<Index> segments, IndexType indexType, Optional<Long> targetSegmentBytes) {
Preconditions.checkNotNull(segments, "segments cannot be null");
Preconditions.checkArgument(!segments.isEmpty(), "segments cannot be empty");
Preconditions.checkNotNull(indexType, "indexType cannot be null");
try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) {
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
return nativeBuildIndexSegments(segments, indexType.getValue(), targetSegmentBytes);
}
}

/**
* Build physical vector index segments from previously-created fragment-level index outputs.
*
* @param segments segment metadata returned by {@link #createIndex(IndexOptions)} when
* fragmentIds are provided
* @param targetSegmentBytes optional size target for merged physical segments
* @return built physical segment metadata
*/
@Deprecated
public List<Index> buildIndexSegments(List<Index> segments, Optional<Long> targetSegmentBytes) {
throw new IllegalArgumentException(
"buildIndexSegments now requires an explicit index type; call "
+ "buildIndexSegments(segments, indexType, targetSegmentBytes)");
}

private native List<Index> nativeBuildIndexSegments(
List<Index> segments, int indexType, Optional<Long> targetSegmentBytes);

/** Merge one caller-defined group of existing uncommitted vector index segments. */
public Index mergeExistingIndexSegments(List<Index> segments) {
Preconditions.checkNotNull(segments, "segments cannot be null");
Expand Down
28 changes: 9 additions & 19 deletions java/src/test/java/org/lance/index/VectorIndexTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -95,14 +94,11 @@ public void testCreateIvfFlatIndexDistributively(@TempDir Path tempDir) throws E
dataset.listIndexes().contains(TestVectorDataset.indexName),
"Partially created IVF_FLAT index should not present before commit");

List<Index> builtSegments =
dataset.buildIndexSegments(
List.of(firstSegment, secondSegment), IndexType.IVF_FLAT, Optional.empty());
assertEquals(2, builtSegments.size());

List<Index> committed =
dataset.commitExistingIndexSegments(
TestVectorDataset.indexName, TestVectorDataset.vectorColumnName, builtSegments);
TestVectorDataset.indexName,
TestVectorDataset.vectorColumnName,
List.of(firstSegment, secondSegment));
assertEquals(2, committed.size());
assertTrue(dataset.listIndexes().contains(TestVectorDataset.indexName));
}
Expand Down Expand Up @@ -188,14 +184,11 @@ public void testCreateIvfPqIndexDistributively(@TempDir Path tempDir) throws Exc
dataset.listIndexes().contains(TestVectorDataset.indexName),
"Partially created IVF_PQ index should not present before commit");

List<Index> builtSegments =
dataset.buildIndexSegments(
List.of(firstSegment, secondSegment), IndexType.IVF_PQ, Optional.empty());
assertEquals(2, builtSegments.size());

List<Index> committed =
dataset.commitExistingIndexSegments(
TestVectorDataset.indexName, TestVectorDataset.vectorColumnName, builtSegments);
TestVectorDataset.indexName,
TestVectorDataset.vectorColumnName,
List.of(firstSegment, secondSegment));
assertEquals(2, committed.size());
assertTrue(dataset.listIndexes().contains(TestVectorDataset.indexName));
}
Expand Down Expand Up @@ -265,14 +258,11 @@ public void testCreateIvfSqIndexDistributively(@TempDir Path tempDir) throws Exc
dataset.listIndexes().contains(TestVectorDataset.indexName),
"Partially created IVF_SQ index should not present before commit");

List<Index> builtSegments =
dataset.buildIndexSegments(
List.of(firstSegment, secondSegment), IndexType.IVF_SQ, Optional.empty());
assertEquals(2, builtSegments.size());

List<Index> committed =
dataset.commitExistingIndexSegments(
TestVectorDataset.indexName, TestVectorDataset.vectorColumnName, builtSegments);
TestVectorDataset.indexName,
TestVectorDataset.vectorColumnName,
List.of(firstSegment, secondSegment));
assertEquals(2, committed.size());
assertTrue(dataset.listIndexes().contains(TestVectorDataset.indexName));
}
Expand Down
33 changes: 11 additions & 22 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3075,8 +3075,9 @@ def create_scalar_index(
If provided, the index will be created only on the specified fragments.
This enables distributed/fragment-level indexing. When provided, the
method returns metadata for one segment but does not commit
the index to the dataset. The segment can be planned, merged, and
committed later using the segment builder and commit APIs.
the index to the dataset. The segment can be optionally merged with
other segments and committed later with
``commit_existing_index_segments(...)``.
This parameter is passed via kwargs internally.
index_uuid : str, optional
A UUID to use for the segment written by this call.
Expand Down Expand Up @@ -3751,8 +3752,8 @@ def create_index(
This enables distributed/fragment-level indexing. When provided, the
method creates one segment but does not commit the index
to the dataset. The returned metadata can be passed to
``create_index_segment_builder().with_index_type(...).with_segments(...)``
and then committed with ``commit_existing_index_segments(...)``.
``merge_existing_index_segments(...)`` if grouping is needed and then
committed with ``commit_existing_index_segments(...)``.
index_uuid : str, optional
A UUID to use for the segment written by this call.
If not provided, a new UUID will be generated.
Expand Down Expand Up @@ -3935,10 +3936,9 @@ def create_index_uncommitted(
1. run :meth:`create_index_uncommitted` on each worker with that worker's
assigned ``fragment_ids``
2. collect the returned :class:`Index` objects
3. call :meth:`IndexSegmentBuilder.with_index_type` with the concrete
distributed index type
4. pass them to :meth:`IndexSegmentBuilder.with_segments`
5. build one or more physical segments and commit them with
3. optionally merge caller-defined groups with
:meth:`merge_existing_index_segments`
4. commit the final segment list with
:meth:`commit_existing_index_segments`

Parameters are the same as :meth:`create_index`, with one additional
Expand Down Expand Up @@ -4019,9 +4019,9 @@ def merge_index_metadata(
Merge distributed scalar index metadata.

Vector distributed indexing no longer uses this API. For vector indices,
build segments with :meth:`create_index_uncommitted`, plan or
merge them with :meth:`create_index_segment_builder`, and publish them
with :meth:`commit_existing_index_segments`.
build segments with :meth:`create_index_uncommitted`, optionally merge
them with :meth:`merge_existing_index_segments`, and publish them with
:meth:`commit_existing_index_segments`.

This method does NOT commit changes.

Expand Down Expand Up @@ -4058,17 +4058,6 @@ def merge_index_metadata(
self._ds.merge_index_metadata(index_uuid, t, batch_readhead, progress_callback)
return None

def create_index_segment_builder(self):
"""
Create a builder for turning existing segments into physical segments.

Provide the segment metadata returned by
:meth:`create_index_uncommitted` through
:meth:`IndexSegmentBuilder.with_segments`, and declare the segment type
with :meth:`IndexSegmentBuilder.with_index_type`.
"""
return self._ds.create_index_segment_builder()

def merge_existing_index_segments(self, segments: List[Index]) -> Index:
"""
Merge one caller-defined group of existing uncommitted segments.
Expand Down
2 changes: 0 additions & 2 deletions python/python/lance/indices/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

IndexSegmentDescription = _lance.indices.IndexSegmentDescription
IndexSegment = _lance.indices.IndexSegment
IndexSegmentPlan = _lance.indices.IndexSegmentPlan

__all__ = [
"IndicesBuilder",
Expand All @@ -19,7 +18,6 @@
"IvfModel",
"IndexFileVersion",
"IndexSegment",
"IndexSegmentPlan",
"IndexSegmentDescription",
]

Expand Down
10 changes: 0 additions & 10 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ from .fragment import (
)
from .indices import IndexDescription as IndexDescription
from .indices import IndexSegment as IndexSegment
from .indices import IndexSegmentPlan as IndexSegmentPlan
from .lance import PySearchFilter
from .optimize import (
Compaction as Compaction,
Expand Down Expand Up @@ -191,14 +190,6 @@ class LanceColumnStatistics:
class _Session:
def size_bytes(self) -> int: ...

class IndexSegmentBuilder:
def with_index_type(self, index_type: str) -> Self: ...
def with_segments(self, segments: List[Index]) -> Self: ...
def with_target_segment_bytes(self, bytes: int) -> Self: ...
def plan(self) -> List[IndexSegmentPlan]: ...
def build(self, plan: IndexSegmentPlan) -> IndexSegment: ...
def build_all(self) -> List[IndexSegment]: ...

class LanceBlobFile:
def close(self): ...
def is_closed(self) -> bool: ...
Expand Down Expand Up @@ -410,7 +401,6 @@ class _Dataset:
batch_readhead: Optional[int] = None,
progress_callback: Optional[Callable[[IndexProgress], None]] = None,
): ...
def create_index_segment_builder(self) -> IndexSegmentBuilder: ...
def merge_existing_index_segments(self, segments: List[Index]) -> Index: ...
def commit_existing_index_segments(
self, index_name: str, column: str, segments: List[Union[IndexSegment, Index]]
Expand Down
8 changes: 0 additions & 8 deletions python/python/lance/lance/indices/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,6 @@ class IndexSegment:

def __repr__(self) -> str: ...

class IndexSegmentPlan:
segment: IndexSegment
segments: list[object]
estimated_bytes: int
requested_index_type: Optional[str]

def __repr__(self) -> str: ...

def train_ivf_model(
dataset,
column: str,
Expand Down
Loading
Loading