Skip to content

Latest commit

 

History

History
1745 lines (1316 loc) · 68.5 KB

File metadata and controls

1745 lines (1316 loc) · 68.5 KB

Python SDK

The SwarnDB Python SDK provides synchronous and asynchronous clients for interacting with a SwarnDB server over gRPC. It covers collections, vectors, search, the virtual graph (SwarnDB's automatic similarity graph), the first-class typed graph and hybrid queries, LLM extraction, and vector math operations.

Source: github.com/SarthiAI/swarndb

Requirements: Python 3.9+, grpcio>=1.80.0, protobuf>=4.25.0, numpy>=1.24.0


1. Installation

Install the SDK from PyPI:

pip install swarndb

For async support (adds grpcio async extras):

pip install swarndb[async]

2. Quick Start

A complete working example, from connection to search:

from swarndb import SwarnDBClient

with SwarnDBClient(host="localhost", port=50051) as client:
    # Create a collection
    client.collections.create("articles", dimension=128, distance_metric="cosine")

    # Insert vectors with metadata
    for i in range(10):
        client.vectors.insert(
            "articles",
            vector=[0.1 * (i + 1)] * 128,
            metadata={"category": "science", "year": 2024},
        )

    # Search for nearest neighbors
    results = client.search.query("articles", vector=[0.5] * 128, k=5)
    for r in results.results:
        print(f"ID: {r.id}, Score: {r.score:.4f}, Metadata: {r.metadata}")

3. Connecting

Basic Connection

from swarndb import SwarnDBClient

client = SwarnDBClient(host="localhost", port=50051)

With Authentication

client = SwarnDBClient(
    host="localhost",
    port=50051,
    api_key="your-api-key",
)

Context Manager (recommended)

The context manager automatically closes the gRPC channel on exit:

with SwarnDBClient(host="localhost", port=50051) as client:
    collections = client.collections.list()
    print(collections)

Connection Options

Parameter Type Default Description
host str "localhost" Server hostname or IP address
port int 50051 gRPC port number
api_key str or None None API key for authentication
secure bool False Use TLS/SSL encrypted channel
max_retries int 3 Max retry attempts for transient gRPC errors
retry_delay float 0.5 Base delay in seconds between retries (exponential backoff)
timeout float 30.0 Default per-call timeout in seconds
options list[tuple] or None None Additional gRPC channel options

4. Collections

Access via client.collections.

Create a Collection

info = client.collections.create(
    "products",
    dimension=1536,
    distance_metric="cosine",    # "cosine", "euclidean", "dot_product"
    default_threshold=0.7,
)
print(info.name, info.dimension)

Signature:

create(name, dimension, *, distance_metric="cosine", default_threshold=0.0, max_vectors=0) -> CollectionInfo

List All Collections

for col in client.collections.list():
    print(f"{col.name}: {col.vector_count} vectors, {col.dimension}d")

Get Collection Info

info = client.collections.get("products")
print(f"Metric: {info.distance_metric}, Vectors: {info.vector_count}")

Check If a Collection Exists

if client.collections.exists("products"):
    print("Collection exists")

Delete a Collection

client.collections.delete("products")

Optimize a Collection

After bulk inserting with defer_graph=True or index_mode="deferred", call optimize to rebuild indexes and the virtual graph:

result = client.collections.optimize("products")
print(f"Status: {result.status}, Vectors processed: {result.vectors_processed}")
print(f"Duration: {result.duration_ms}ms")

Get Collection Status

status = client.collections.get_status("products")
# Returns: "ready", "pending_optimization", or "optimizing"

5. Vectors

Access via client.vectors.

Insert a Vector

# Auto-assigned ID (pass id=0 or omit)
vec_id = client.vectors.insert(
    "products",
    vector=[0.1, 0.2, 0.3, ...],  # must match collection dimension
    metadata={"name": "Widget", "price": 29.99, "tags": ["sale", "new"]},
)
print(f"Inserted with ID: {vec_id}")

# Explicit ID
vec_id = client.vectors.insert(
    "products",
    vector=[0.4, 0.5, 0.6, ...],
    id=42,
    metadata={"name": "Gadget", "price": 49.99},
)

Signature:

insert(collection, vector, *, metadata=None, id=0) -> int

Get a Vector

record = client.vectors.get("products", id=42)
if record is None:
    print("Vector 42 not found")
else:
    print(f"ID: {record.id}")
    print(f"Vector: {record.vector[:5]}...")  # first 5 values
    print(f"Metadata: {record.metadata}")

client.vectors.get returns None when no vector with the given id exists. Transport, auth, and other failures still raise SwarnDBError.

Update a Vector

You can update the vector values, the metadata, or both:

# Update metadata only
client.vectors.update("products", id=42, metadata={"price": 39.99})

# Update vector values only
client.vectors.update("products", id=42, vector=[0.7, 0.8, 0.9, ...])

# Update both
client.vectors.update(
    "products", id=42,
    vector=[0.7, 0.8, 0.9, ...],
    metadata={"price": 39.99, "on_sale": True},
)

Delete a Vector

client.vectors.delete("products", id=42)

Bulk Insert

For high-throughput ingestion with performance tuning options:

import numpy as np

# Generate 10,000 random vectors
vectors = np.random.rand(10000, 1536).tolist()
metadata_list = [{"batch": "2024-Q1", "index": i} for i in range(10000)]

result = client.vectors.bulk_insert(
    "products",
    vectors=vectors,
    metadata_list=metadata_list,
    batch_size=1000,
    show_progress=True,          # requires tqdm
    defer_graph=True,            # skip graph during insert
    wal_flush_every=0,           # disable WAL for max speed
    index_mode="deferred",       # build index after all inserts
    parallel_build=True,         # parallel HNSW construction on optimize
)
print(f"Inserted: {result.inserted_count}, Errors: {len(result.errors)}")

# After bulk insert, rebuild indexes and graph
opt = client.collections.optimize("products")
print(f"Optimized in {opt.duration_ms}ms")

Signature:

bulk_insert(
    collection, vectors, *,
    metadata_list=None, ids=None, batch_size=1000,
    show_progress=False, batch_lock_size=None,
    defer_graph=False, wal_flush_every=None,
    ef_construction=None, index_mode=None,
    skip_metadata_index=False, parallel_build=False,
) -> BulkInsertResult

Bulk Insert Options:

Parameter Type Default Description
metadata_list list[dict] None Per-vector metadata (must match vectors length)
ids list[int] None Per-vector IDs (0 for auto-assign)
batch_size int 1000 Vectors per streaming batch
show_progress bool False Display tqdm progress bar
batch_lock_size int None Vectors per lock acquisition (1 to 10000)
defer_graph bool False Skip graph computation during insert
wal_flush_every int None WAL flush interval (0 = disable)
ef_construction int None Override HNSW ef_construction for this batch
index_mode str None "immediate" or "deferred"
skip_metadata_index bool False Skip metadata indexing during insert
parallel_build bool False Parallel HNSW build (requires index_mode="deferred")

Bulk Insert From a File

For very large loads where staging vectors as a file is acceptable, the server can ingest directly from a .npy or flat .f32 file on its own filesystem. The server reads the file via memory mapping, so the working memory for the load is bounded by the index being built rather than by the input file size.

import numpy as np

vectors = np.random.rand(1_000_000, 1536).astype(np.float32)
np.save("/data/ingest/embeddings.npy", vectors)

result = client.vectors.bulk_insert_from_path(
    collection="docs",
    path="/data/ingest/embeddings.npy",
    dim=1536,
    expected_count=1_000_000,
    total_count_hint=1_000_000,
    index_mode="immediate",
)

print(f"Inserted {result.inserted_count} vectors")
print(f"IDs from {result.assigned_ids[0]} to {result.assigned_ids[-1]}")

Signature:

bulk_insert_from_path(
    collection, path, *,
    dim=0, expected_count=0, total_count_hint=0,
    id_start=1, ids_path="",
    skip_metadata_index=False, index_mode="immediate",
    ef_construction=0, chunk_size=0,
) -> BulkInsertResult

Bulk Insert From Path Options:

Parameter Type Default Description
dim int 0 Vector dimensionality. 0 defers to the collection's configured dimension.
expected_count int 0 Expected number of vectors. 0 lets the server infer from file size.
total_count_hint int 0 Hint for the total vector count across multiple bulk inserts; used for arena capacity planning.
id_start int 1 Starting ID for auto-assigned IDs.
ids_path str "" Optional path to a sidecar file containing explicit IDs (one per vector). Empty string uses auto-assigned IDs starting at id_start.
skip_metadata_index bool False Skip metadata indexing during insert for faster ingestion. Rebuild later via client.collections.optimize.
index_mode str "immediate" "immediate" builds the index during insert. "deferred" builds it after, via client.collections.optimize.
ef_construction int 0 HNSW ef_construction override for this batch. 0 uses the collection default.
chunk_size int 0 0 loads the file in a single pass. A positive value processes the load in chunks of that many rows, snapshotting and releasing scratch memory between chunks; trades wall-clock for a lower peak resident memory footprint.

Path and format requirements:

  • The path must be absolute and must point to a file the server can read.
  • The path must point to a file the server process can read.
  • Supported wire formats: .npy (auto-detected via NumPy magic bytes) and flat little-endian float32 (.f32), where the file is expected_count * dim * 4 bytes.

Resume mid-call is not supported for bulk_insert_from_path; on failure, restart the full call. For resumable bulk loads, use streaming bulk_insert with BulkInsertResult.resume_token.

NumPy Integration

The SDK accepts NumPy arrays anywhere a list[float] is expected:

import numpy as np

embedding = np.random.rand(1536).astype(np.float32)
vec_id = client.vectors.insert("products", vector=embedding.tolist())

query = np.random.rand(1536).astype(np.float32)
results = client.search.query("products", vector=query.tolist(), k=10)

6. Search

Access via client.search.

Basic Search

results = client.search.query("products", vector=[0.5] * 1536, k=10)

for r in results.results:
    print(f"ID: {r.id}, Score: {r.score:.4f}")
print(f"Search took {results.search_time_us}us")

Signature:

query(
    collection, vector, k=10, *,
    filter=None, strategy="auto",
    include_metadata=True, include_graph=False,
    graph_threshold=0.0, max_graph_edges=10,
    ef_search=None,
) -> SearchResult

Filtered Search

Use the Filter class to build metadata filters with Python operators:

from swarndb import Filter

# Equality filter
results = client.search.query(
    "products", vector=[0.5] * 1536, k=10,
    filter=Filter.eq("category", "electronics"),
)

# Range filter
results = client.search.query(
    "products", vector=[0.5] * 1536, k=10,
    filter=Filter.field("price").between(10.0, 100.0),
)

# Combine with AND (& operator)
results = client.search.query(
    "products", vector=[0.5] * 1536, k=10,
    filter=Filter.eq("category", "electronics") & Filter.field("price").lt(50.0),
)

# Combine with OR (| operator)
results = client.search.query(
    "products", vector=[0.5] * 1536, k=10,
    filter=Filter.eq("brand", "Acme") | Filter.eq("brand", "Globex"),
)

# Negate with NOT (~ operator)
results = client.search.query(
    "products", vector=[0.5] * 1536, k=10,
    filter=~Filter.eq("discontinued", True),
)

# Membership filter
results = client.search.query(
    "products", vector=[0.5] * 1536, k=10,
    filter=Filter.in_("color", ["red", "blue", "green"]),
)

# Existence check
results = client.search.query(
    "products", vector=[0.5] * 1536, k=10,
    filter=Filter.exists("discount_price"),
)

# Contains filter
results = client.search.query(
    "products", vector=[0.5] * 1536, k=10,
    filter=Filter.contains("description", "wireless"),
)

Available Filter Operations:

Method Description
Filter.eq(field, value) Equality: field == value
Filter.ne(field, value) Not equal: field != value
Filter.gt(field, value) Greater than: field > value
Filter.gte(field, value) Greater than or equal
Filter.lt(field, value) Less than: field < value
Filter.lte(field, value) Less than or equal
Filter.in_(field, values) Membership: field in values
Filter.between(field, lo, hi) Range: lo <= field <= hi
Filter.exists(field) Field is present
Filter.contains(field, value) Field contains value

Chained syntax is also supported via Filter.field():

Filter.field("price").gt(50)
Filter.field("tags").contains("sale")
Filter.field("year").between(2020, 2024)

Boolean combinators:

f1 & f2          # AND
f1 | f2          # OR
~f1              # NOT
(f1 & f2) | f3   # nested logic

Graph-Enriched Search

Include virtual graph edges alongside search results for relationship discovery:

results = client.search.query(
    "products", vector=[0.5] * 1536, k=10,
    include_graph=True,
    graph_threshold=0.7,
    max_graph_edges=5,
)

for r in results.results:
    print(f"ID: {r.id}, Score: {r.score:.4f}")
    for edge in r.graph_edges:
        print(f"  Related to {edge.target_id} (similarity: {edge.similarity:.3f})")

Search with ef_search Override

Tune HNSW search quality per query:

results = client.search.query(
    "products", vector=[0.5] * 1536, k=10,
    ef_search=200,  # higher = better recall, slower
)

Filter Strategy

Control when metadata filtering is applied:

# "auto" (default): engine picks the best strategy
# "pre_filter": filter before ANN search (exact, slower for low selectivity)
# "post_filter": filter after ANN search (fast, may return fewer results)

results = client.search.query(
    "products", vector=[0.5] * 1536, k=10,
    filter=Filter.eq("category", "electronics"),
    strategy="pre_filter",
)

Batch Search

Search multiple queries in a single RPC call:

queries = [
    [0.1] * 1536,
    [0.5] * 1536,
    [0.9] * 1536,
]

batch = client.search.batch(
    "products", queries=queries, k=5,
    filter=Filter.eq("category", "electronics"),
    include_metadata=True,
)

for i, sr in enumerate(batch.results):
    print(f"Query {i}: {len(sr.results)} results in {sr.search_time_us}us")
print(f"Total batch time: {batch.total_time_us}us")

7. Graph Operations

Access via client.graph. SwarnDB's virtual graph automatically connects similar vectors based on a similarity threshold. On hybrid collections, client.graph also manages a first-class typed graph and a composable hybrid query builder.

For the complete, end-to-end treatment of the whole graph surface (virtual graph, typed nodes and edges, all 12 hybrid-query steps, all 3 terminals, sub-plans, all 14 predicate helpers, edge curation, the audit trail, and async patterns), see the Typed Graph: Complete Guide. This section is the quick reference.

Set Collection Threshold

Set the similarity threshold that determines which vectors are connected in the graph:

# Collection-level threshold
client.graph.set_threshold("products", threshold=0.75)

# Per-vector threshold override
client.graph.set_threshold("products", threshold=0.9, vector_id=42)

After setting a threshold, call client.collections.optimize("products") to rebuild the graph.

Get Related Vectors

Find vectors connected to a given vector via the virtual graph:

edges = client.graph.get_related(
    "products",
    vector_id=42,
    threshold=0.7,
    max_results=20,
)

for edge in edges:
    print(f"Related to {edge.target_id}, similarity: {edge.similarity:.3f}")

Graph Traversal

Multi-hop traversal discovers vectors connected through chains of similarity:

nodes = client.graph.traverse(
    "products",
    start_id=42,
    depth=3,          # max hops
    threshold=0.6,    # minimum edge similarity
    max_results=50,
)

for node in nodes:
    print(f"ID: {node.id}, Depth: {node.depth}, "
          f"Path similarity: {node.path_similarity:.3f}, "
          f"Path: {node.path}")

Complete Graph Exploration Workflow

from swarndb import SwarnDBClient

with SwarnDBClient(host="localhost", port=50051) as client:
    # 1. Create collection and insert data
    client.collections.create("articles", dimension=128, distance_metric="cosine")
    for i in range(100):
        client.vectors.insert(
            "articles",
            vector=[float(i % 10) / 10.0 + j * 0.01 for j in range(128)],
            metadata={"topic": f"topic_{i % 5}"},
        )

    # 2. Set threshold and rebuild graph
    client.graph.set_threshold("articles", threshold=0.8)
    client.collections.optimize("articles")

    # 3. Explore relationships
    edges = client.graph.get_related("articles", vector_id=1, max_results=10)
    print(f"Vector 1 has {len(edges)} related vectors")

    # 4. Traverse the graph
    nodes = client.graph.traverse("articles", start_id=1, depth=2, max_results=25)
    print(f"Traversal found {len(nodes)} reachable vectors within 2 hops")

Typed Graph (hybrid mode)

On a hybrid collection, client.graph also manages a first-class typed graph of nodes and edges. See Typed Graph: Overview for the full picture.

# Nodes
node_id = client.graph.put_node(
    "docs_graph",
    kind="entity",            # "content" or "entity"
    label="Person",
    properties={"name": "Ada Lovelace"},
    embedding=[0.0] * 1536,   # optional; used for entity dedup
    source="manual",
    created_by="curator",
)
node = client.graph.get_node("docs_graph", node_id)       # TypedNode or None
client.graph.delete_node("docs_graph", node_id)           # also removes incident edges

# Edges
edge_id = client.graph.put_edge(
    "docs_graph",
    source=42, target=node_id, edge_type="AUTHORED_BY",
    properties={"page": 1}, provenance={"doc_id": "paper-1"},
    confidence=1.0, verified=False, is_manual=True,
)
edge = client.graph.get_edge("docs_graph", edge_id)       # TypedEdge or None
edges = client.graph.list_edges("docs_graph", node=42, direction="outgoing", edge_type="AUTHORED_BY")
client.graph.delete_edge("docs_graph", edge_id)

# Curate: update (only supplied fields change), verify (keep + bless),
# reject (delete + remember the pattern)
client.graph.update_edge("docs_graph", edge_id, confidence=0.9, verified=True, actor="curator")
client.graph.verify_edge("docs_graph", edge_id, actor="curator")
result = client.graph.reject_edge("docs_graph", edge_id, actor="curator")  # .deleted, .rule_added

# Audit trail: every TypedEdge carries .history, a list of EdgeAudit(action, actor, at)
for entry in client.graph.get_edge("docs_graph", edge_id).history:
    print(entry.action, entry.actor, entry.at)

# Bulk import edges from CSV or JSONL
csv_data = "source,target,edge_type,confidence\n42,99,CITES,1.0\n"
report = client.graph.bulk_import_edges("docs_graph", csv_data, format="csv", auto_add_edge_types=False)
print(report.total_rows, report.imported, report.failed, report.errors)

Verify and reject are not symmetric: verify keeps the edge and marks it trustworthy (and protects it from re-extraction), while reject deletes it and can record a rule so the same wrong relationship is not re-proposed later. See the Typed Graph: Complete Guide for the full curation and re-extraction semantics.

Updating a node (mutable property bag, immutable provenance)

client.graph.update_node revises a node's property bag and records an audit entry. Only the property bag is mutable: the provenance (source, created_at, created_by) and the embedding are immutable and are never touched. Omitting properties performs an audit-only touch (the bag stays as it was). The call returns the updated TypedNode.

node = client.graph.update_node(
    "docs_graph", node_id,
    properties={"name": "Ada Lovelace", "note": "first programmer"},
    actor="curator",
)
print(node.properties, node.updated_at)

# Audit-only touch: omit properties to leave the bag unchanged.
client.graph.update_node("docs_graph", node_id, actor="curator")

Signature:

update_node(collection, node_id, *, properties=None, actor="") -> TypedNode

Temporal edges (opt-in validity window and context)

put_edge takes three optional temporal keyword arguments. All default off, so omitting them creates an always-valid, context-free edge exactly as before. Pass them to give an edge a validity window and a regime label:

  • valid_from and valid_until are unix-epoch milliseconds; valid_until is exclusive. Either may be None (unbounded on that side).
  • temporal_context is a free-form string regime label (for example "forecast" or "as-reported").
edge_id = client.graph.put_edge(
    "docs_graph",
    source=42, target=node_id, edge_type="EMPLOYED_AT",
    valid_from=1704067200000,        # 2024-01-01, unix-epoch millis
    valid_until=1735689600000,        # 2025-01-01, exclusive
    temporal_context="as-reported",
)

edge = client.graph.get_edge("docs_graph", edge_id)
print(edge.valid_from, edge.valid_until, edge.temporal_context)

These three fields read back on the TypedEdge returned by get_edge, list_edges, and the enumeration methods; each is None when the edge was created without it.

Reading nodes and edges by filter (paginated enumeration)

To walk the whole graph (rather than start from a vector or a known id), use the paginated read methods. They return a NodePage / EdgePage carrying the rows plus a next_cursor and has_more. Pass the previous next_cursor back as after_id to page forward; the page size is server-clamped.

from swarndb import Predicate

# One page of entity nodes labeled "Person".
page = client.graph.enumerate_nodes(
    "docs_graph",
    kind="entity",                    # "content" or "entity"
    label="Person",
    predicate=Predicate.exists("name"),
    after_id=0, limit=500,
)
for node in page.nodes:
    print(node.id, node.label, node.properties)
if page.has_more:
    next_page = client.graph.enumerate_nodes("docs_graph", after_id=page.next_cursor)

# One page of CITES edges, optionally constrained by an endpoint node.
epage = client.graph.enumerate_edges(
    "docs_graph",
    edge_type="CITES",
    endpoint_label="Paper",
    endpoint_kind="content",          # an endpoint (source or target) must match
    after_id=0, limit=500,
)
for edge in epage.edges:
    print(edge.id, edge.source, edge.target)

Signatures:

enumerate_nodes(collection, *, after_id=0, limit=1000, kind=None, label="", predicate=None) -> NodePage
enumerate_edges(collection, *, after_id=0, limit=1000, edge_type="", predicate=None, endpoint_label="", endpoint_kind=None) -> EdgePage

For convenience, iter_nodes and iter_edges walk every page to exhaustion and yield each row, so you do not manage the cursor yourself:

for node in client.graph.iter_nodes("docs_graph", kind="entity", label="Person"):
    ...
for edge in client.graph.iter_edges("docs_graph", edge_type="CITES"):
    ...

Hybrid Query Builder (hybrid mode)

client.graph.query(collection) returns a chainable builder that mixes vector similarity with graph traversal. Finish with return_nodes(), return_edges(), or return_paths(). Use the Predicate helpers and Direction constants for filtering and traversal direction.

from swarndb import Predicate, Direction

result = (
    client.graph.query("docs_graph")
    .vector_similar(query_vector, k=20)            # seed with nearest neighbors
    .traverse("AUTHORED_BY", direction=Direction.OUTGOING)
    .filter(Predicate.label_eq("Person"))
    .limit(10)
    .return_nodes()
)
for node in result.nodes:
    print(node.id, node.label, node.properties)

# Source steps: .vector_similar(vec, k, ef_search=200),
#               .from_node(id) / .from_nodes([...])
# Traversal:    .traverse(edge_type, direction),
#               .k_hop(edge_type, max=2, predicate=Predicate.eq("in_stock", True)),
#               .shortest_path(["CITES"], target=paper_b)
# Refinement:   .filter(predicate), .edges(edge_type, direction), .limit(n)
# Terminals:    .return_nodes() / .return_edges() / .return_paths()

# Set-combination steps take another plan, built with .to_plan() (alias
# .build_plan()): .mutual_neighbors(other_plan) keeps nodes reachable by both
# plans, .intersect(other_plan) keeps nodes in both result sets,
# .union(other_plan) combines both result sets.

# All 14 Predicate helpers: eq, ne, gt, ge, lt, le, is_in, not_in, exists,
# label_eq, and_, or_, not_, any_. Values are JSON-encoded on the wire;
# a bare key references the property bag, label_eq references the node label.

# A HybridQueryResult carries .nodes, .edges, and .paths; exactly one is
# populated, matching the terminal you called.

The async client mirrors this; build the same chain and await the terminal call. For the full hybrid-query surface (every step, sub-plan composition, and the predicate JSON wire encoding) see the Typed Graph: Complete Guide.

Seeding by graph filter: scan_by_filter

scan_by_filter is a source step (like vector_similar or from_nodes) that needs no vector and no ids. It seeds the frontier from nodes matching an optional kind, entity label, and predicate (a property condition, including the structural incident-edge-count term below). Chain the normal traversal and refinement steps after it.

from swarndb import Predicate

result = (
    client.graph.query("docs_graph")
    .scan_by_filter(
        kind="entity",                       # "content" or "entity"
        label="Person",
        predicate=Predicate.exists("orcid"),
    )
    .traverse("AUTHORED_BY", direction=Direction.INCOMING)
    .limit(50)
    .return_nodes()
)

Signature:

scan_by_filter(*, kind=None, label="", predicate=None) -> builder

Quality-weighted traversal and ranking (opt-in)

A WeightSpec folds edge quality (confidence, recency decay, or an explicit numeric property) into the per-edge weight used by k_hop, shortest_path, and rank_rrf. It is imported from swarndb. All of its fields default off, so omitting a WeightSpec (or passing one built with no arguments) keeps every edge at weight 1.0 and leaves traversal and ranking unchanged.

WeightSpec fields:

Field Type Default Description
use_confidence bool False Fold each edge's confidence into its weight
min_confidence float 0.0 Drop edges below this confidence
recency_half_life_ms int 0 Recency decay half-life in milliseconds (0 = no decay)
use_explicit_weight bool False Read the weight from a numeric edge property
explicit_weight_key str "weight" Property key to read when use_explicit_weight is set

Pass weight (and, on k_hop, order_by_weight=True to order the frontier by accumulated weight) to enable it:

from swarndb import WeightSpec, Predicate

# k_hop weighted by confidence, frontier ordered by accumulated weight.
result = (
    client.graph.query("docs_graph")
    .vector_similar(query_vector, k=20)
    .k_hop(
        "CITES", max=2,
        weight=WeightSpec(use_confidence=True, min_confidence=0.5),
        order_by_weight=True,
    )
    .limit(10)
    .return_nodes()
)

# Weighted shortest path: cost-weighted instead of hop count.
paths = (
    client.graph.query("docs_graph")
    .from_node(42)
    .shortest_path(
        ["CITES"], target=99,
        weighted=True,
        weight=WeightSpec(use_explicit_weight=True, explicit_weight_key="cost"),
    )
    .return_paths()
)

# RRF ranking weighted by bridge-route edge quality.
fused = (
    client.graph.query("docs_graph")
    .vector_similar(query_vector, k=10)
    .traverse("mentions", direction="outgoing")
    .k_hop("CITES", max=2)
    .traverse("mentions", direction="incoming")
    .rank_rrf(k=10, relation_edge_types=["CITES"],
              edge_weight=WeightSpec(use_confidence=True))
    .return_nodes()
)

Relevant signatures:

k_hop(edge_type=None, max=1, predicate=None, *, weight=None, order_by_weight=False,
      as_of=None, include_unbounded=True, context=None) -> builder
shortest_path(edge_types, target, *, weighted=False, weight=None,
              as_of=None, include_unbounded=True, context=None) -> builder
rank_rrf(k, *, rrf_k=60, k_hop_max=2, relation_edge_types=None,
         hub_damping=0.0, edge_weight=None) -> builder

Temporal traversal (opt-in)

traverse, k_hop, and shortest_path each take three optional temporal keyword arguments that restrict a hop to edges valid at a point in time and regime. They mirror the temporal fields on put_edge. All default to no filtering, so omitting them leaves the hop unchanged:

  • as_of is a unix-epoch milliseconds instant; the hop keeps only edges whose validity window contains it.
  • include_unbounded (default True) decides whether edges with no validity window (always-valid edges) are kept.
  • context is a regime label; the hop keeps only edges in that context.
result = (
    client.graph.query("docs_graph")
    .from_node(42)
    .traverse(
        "EMPLOYED_AT", direction="outgoing",
        as_of=1719792000000,          # 2024-07-01, unix-epoch millis
        include_unbounded=False,
        context="as-reported",
    )
    .return_nodes()
)

Vector-math over the graph frontier (opt-in)

Six frontier steps rank or filter the current node frontier by vector geometry. They are opt-in refinement steps: they run only when you add them, and they operate exactly over the frontier the graph has already produced (no ANN). Each takes on_missing to govern frontier nodes that carry no vector, "skip" (the default, which drops and counts them) or "error" (which fails the query). On diversity_rank the relevance-versus-diversity trade-off parameter is named lambda_ (with the trailing underscore, since lambda is a Python keyword).

# Analogy: rank by distance to a - b + c.
client.graph.query("docs_graph").from_nodes(ids).analogy_rank(a, b, c, k=10).return_nodes()

# Diversity (MMR): lambda_ in [0, 1] trades relevance vs. diversity.
client.graph.query("docs_graph").from_nodes(ids).diversity_rank(query_vec, lambda_=0.7, k=10).return_nodes()

# Cone: keep nodes within an angular cone around a direction.
import math
client.graph.query("docs_graph").from_nodes(ids).cone_filter(direction, math.pi / 6, k=10).return_nodes()

# Isolation: rank the most isolated first (min distance to any centroid).
client.graph.query("docs_graph").from_nodes(ids).isolation_rank(centroids, k=10).return_nodes()

# Centroid: rank by closeness to the frontier's own mean (most representative first).
client.graph.query("docs_graph").from_nodes(ids).centroid_rank(k=10).return_nodes()

# Interpolate: rank by distance to the point between a and b at fraction t.
client.graph.query("docs_graph").from_nodes(ids).interpolate_rank(a, b, t=0.5, k=10).return_nodes()

Signatures:

analogy_rank(a, b, c, k, *, on_missing="skip") -> builder
diversity_rank(query, lambda_, k, *, on_missing="skip") -> builder
cone_filter(direction, aperture_radians, k, *, on_missing="skip") -> builder
isolation_rank(centroids, k, *, on_missing="skip") -> builder
centroid_rank(k, *, on_missing="skip") -> builder
interpolate_rank(a, b, t, k, *, on_missing="skip") -> builder

Structural predicate: incident-edge count

Alongside the property predicates, Predicate.incident_edges is a node-only structural term: it compares the count of a node's incident edges against a value, optionally constrained by edge_type and direction. The comparison operator is one of the graph_pb2.HYBRID_CMP_* constants. Use it inside filter, scan_by_filter, or the enumeration predicate argument.

from swarndb import Predicate
from swarndb._proto import graph_pb2

# Nodes with at least three outgoing CITES edges.
hubs = (
    client.graph.query("docs_graph")
    .scan_by_filter(
        predicate=Predicate.incident_edges(
            graph_pb2.HYBRID_CMP_GE, 3,
            edge_type="CITES", direction="outgoing",
        ),
    )
    .return_nodes()
)

Signature:

Predicate.incident_edges(op, value, *, edge_type=None, direction="outgoing") -> predicate

One-call graph-augmented retrieval: graph_rag (recommended)

For graph-augmented retrieval, client.graph.graph_rag(...) is the recommended path. It composes the candidate pool for you (a vector seed expanded across the graph) and ranks it, so you get the result in one call instead of hand-building the chain. The seed-only form gives no lift; this helper closes that footgun by composing the graph expansion automatically. By default it uses vector_rank (graph-first scope-then-rank): the graph fixes the candidate set and it is ranked exactly within that set by similarity.

result = client.graph.graph_rag(
    "docs_graph", query_vector, k=10,
    relation_edge_types=["CITES"],     # empty/None uses the structural fallback
)
for node in result.nodes:
    print(node.id, node.label)

Signature:

graph_rag(
    collection, query_vector, k=10, *,
    fusion="vector_rank", mentions_edge_type="mentions",
    relation_edge_types=None, k_hop_max=2, rrf_k=60,
    hub_damping=0.0, ef_search=None,
) -> HybridQueryResult

vector_rank is the default because, on real data at scale, it ties plain vector retrieval on overall accuracy and wins on the hard multi-hop and adversarial questions, while the older RRF fusion regressed below plain vector retrieval. RRF stays fully supported as an explicit opt-in via graph_rag(..., fusion="rrf"), which opts into Reciprocal Rank Fusion in one call (the explicit chain in the Typed Graph: Complete Guide section 4.5 ending in .rank_rrf(...) is the customizable equivalent); it is just no longer the default. Plain client.graph.query(...).vector_similar(...).return_nodes() stays the zero-cost vector-only default. The async client mirrors it: await client.graph.graph_rag(...). See the Typed Graph: Complete Guide section 4.6 for the exact composed plan and the explicit equivalent.


7a. LLM Extraction (hybrid mode)

Access via client.extraction. These methods work only on hybrid collections and turn text chunks into typed entities and edges using your own LLM. See LLM Extraction for the concepts and the Typed Graph: Complete Guide for the full extraction API (cost preview, jobs, proposals, partial-success, truncation auto-retry, and incremental re-extraction) with runnable examples; the server needs SWARNDB_MASTER_KEY set to store api keys.

from swarndb import Chunk, EntityLabel, EdgeType

# 1. LLM config (api key is write-only; OpenAI-compatible, e.g. OpenRouter)
client.extraction.set_llm_config(
    "docs_graph",
    base_url="https://openrouter.ai/api/v1",
    api_key="sk-or-...",
    model_name="openai/gpt-4o-mini",
    temperature=0.0, max_tokens=2048, timeout_seconds=30,
)
info = client.extraction.get_llm_config("docs_graph")   # never returns the key
client.extraction.rotate_llm_config("docs_graph", new_api_key="sk-or-...")

# 2. Ontology: template + custom extension, plus optional prompt tuning
client.extraction.set_ontology(
    "docs_graph",
    base_template="research-papers",
    entity_labels=[EntityLabel(label="Dataset", description="A named dataset")],
    edge_types=[EdgeType(edge_type="USES_DATASET", description="Paper uses a dataset",
                         source_labels=["Paper"], target_labels=["Dataset"])],
    system_prompt="You are a research-paper analyst extracting a citation graph.",
    extra_guidance="Treat 'we' as the authors of the current paper.",
)
ontology = client.extraction.get_ontology("docs_graph")
# ontology.system_prompt, ontology.extra_guidance  (None when left at the default)

# 3. Cost preview, run, poll
chunks = [Chunk(doc_id="paper-1", chunk_id=0, text="...", embedding=vec)]
estimate = client.extraction.cost_preview("docs_graph", chunks)
job_id = client.extraction.start_extraction("docs_graph", chunks)
status = client.extraction.extraction_status("docs_graph", job_id)  # .state, counters, .failed_chunks, .chunk_errors
client.extraction.cancel_extraction("docs_graph", job_id)

# 4. Ontology proposals from the model
for p in client.extraction.list_proposals("docs_graph"):
    print(p.id, p.kind, p.name)
client.extraction.approve_proposal("docs_graph", p.id)
client.extraction.reject_proposal("docs_graph", p.id)

# 5. Document updates: diff then re-extract only what changed
diffs = client.extraction.diff_document("docs_graph", "paper-1", new_chunks)
summary = client.extraction.reextract_document("docs_graph", "paper-1", new_chunks)

The async client (AsyncExtractionAPI via await async_client.extraction....) exposes the same methods; await each call.

Customizing the extraction prompt

set_ontology takes two optional keyword arguments that tune the prompt the model sees, per collection:

  • system_prompt fully overrides SwarnDB's generic task framing with your own. Leave it unset (or empty) to keep the built-in framing.
  • extra_guidance is a short domain hint appended on top of whichever framing is in effect (the default one, or your system_prompt). Use it to teach the model things it cannot infer from the text.

In every case SwarnDB still enforces the machine contract: your ontology's allowed labels and edge types, the JSON output schema, and a fixed contract footer (output only the JSON object, stay within the allowed types or propose, cite the span and a confidence, do not invent) are always part of the prompt. A custom prompt can shape the task but cannot break parsing or your ontology. Changing either value recomputes the extraction cache, so the next run re-extracts under the new prompt.

get_ontology returns an OntologyInfo whose system_prompt and extra_guidance fields read these values back; each is None when left at the default.

client.extraction.set_ontology(
    "contracts_graph",
    base_template="legal-contracts",
    system_prompt="You are a contracts analyst extracting parties, dates, and obligations.",
    extra_guidance="Treat 'the Company' as the first party; dates are DD/MM/YYYY.",
)

ontology = client.extraction.get_ontology("contracts_graph")
print(ontology.system_prompt)   # the override above, or None for the default
print(ontology.extra_guidance)  # the hint above, or None

Job state and partial success

extraction_status returns an ExtractionJob whose state is one of "queued", "running", "completed", "completed_with_errors", "failed", or "cancelled". A single chunk failing does not fail the whole job: it finishes in "completed_with_errors", keeps everything that succeeded, and records what failed on two fields. failed_chunks is the true total number of failed chunks; chunk_errors is a sample (up to 100) of ChunkError entries, each with doc_id, chunk_id, and error. (A reply cut off at the model's token limit is retried once automatically with a higher budget, so most truncations never reach chunk_errors.)

job = client.extraction.extraction_status("docs_graph", job_id)

if job.state == "completed_with_errors":
    print(f"{job.failed_chunks} chunk(s) failed; sample of {len(job.chunk_errors)}:")
    for e in job.chunk_errors:
        print(f"  doc={e.doc_id} chunk={e.chunk_id}: {e.error}")

ChunkError is importable from swarndb alongside the other extraction types.


8. Vector Math

Access via client.math. All operations run server-side for performance.

Ghost Detection

Find isolated vectors that are far from any cluster centroid:

ghosts = client.math.detect_ghosts(
    "products",
    threshold=5.0,     # distance threshold
    auto_k=8,          # auto-compute 8 centroids
    metric="euclidean",
)

for g in ghosts:
    print(f"Ghost vector {g.id}, isolation score: {g.isolation_score:.2f}")

Cone Search

Find vectors within an angular cone around a direction:

import math

results = client.math.cone_search(
    "products",
    direction=[1.0] + [0.0] * 1535,       # unit direction vector
    aperture_radians=math.pi / 6,          # 30-degree cone
)

for r in results:
    print(f"ID: {r.id}, cosine: {r.cosine_similarity:.3f}, "
          f"angle: {math.degrees(r.angle_radians):.1f} degrees")

Centroid Computation

Compute the centroid of all or a subset of vectors:

# Centroid of the entire collection
centroid = client.math.centroid("products")

# Centroid of specific vectors
centroid = client.math.centroid("products", vector_ids=[1, 2, 3, 4, 5])

# Weighted centroid
centroid = client.math.centroid(
    "products",
    vector_ids=[1, 2, 3],
    weights=[0.5, 0.3, 0.2],
)

Interpolation

Interpolate between two vectors using linear (LERP) or spherical (SLERP) interpolation:

vec_a = [1.0, 0.0, 0.0, ...]
vec_b = [0.0, 1.0, 0.0, ...]

# Single interpolation at t=0.5
midpoint = client.math.interpolate(vec_a, vec_b, t=0.5, method="slerp")

# Generate a sequence of 10 interpolated vectors
sequence = client.math.interpolate_sequence(vec_a, vec_b, n=10, method="slerp")
print(f"Generated {len(sequence)} intermediate vectors")

Drift Detection

Detect distribution shift between two temporal windows of vectors:

# Compare old vs new embeddings
report = client.math.detect_drift(
    "products",
    window1_ids=[1, 2, 3, 4, 5],        # baseline window
    window2_ids=[96, 97, 98, 99, 100],   # comparison window
    metric="euclidean",
    threshold=2.0,
)

print(f"Centroid shift: {report.centroid_shift:.4f}")
print(f"Spread change: {report.spread_change:.4f}")
print(f"Has drifted: {report.has_drifted}")

K-Means Clustering

Run k-means clustering on collection vectors:

result = client.math.cluster(
    "products",
    k=5,
    max_iterations=100,
    tolerance=1e-4,
    metric="euclidean",
)

print(f"Converged: {result.converged} in {result.iterations} iterations")
print(f"Found {len(result.centroids)} clusters")

for assignment in result.assignments[:5]:
    print(f"Vector {assignment.id} -> Cluster {assignment.cluster} "
          f"(distance: {assignment.distance_to_centroid:.3f})")

PCA Dimensionality Reduction

Project high-dimensional vectors to lower dimensions:

pca = client.math.reduce_dimensions(
    "products",
    n_components=2,
    vector_ids=[1, 2, 3, 4, 5],  # optional subset
)

print(f"Explained variance: {pca.explained_variance}")
for i, point in enumerate(pca.projected):
    print(f"Vector -> ({point[0]:.3f}, {point[1]:.3f})")

Analogy Computation

Solve vector analogies of the form "a is to b as c is to ?":

# king - man + woman = queen (conceptually)
result = client.math.analogy(
    a=king_vec,
    b=man_vec,
    c=woman_vec,
    normalize=True,
)
# Use result as a query vector to find the closest match

Diversity Sampling (MMR)

Select vectors that balance relevance with diversity using Maximal Marginal Relevance:

results = client.math.diversity_sample(
    "products",
    query=[0.5] * 1536,
    k=10,
    lambda_=0.7,                          # 0.7 = favor relevance, 0.3 = favor diversity
    candidate_ids=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],  # optional
)

for r in results:
    print(f"ID: {r.id}, Relevance: {r.relevance_score:.3f}, MMR: {r.mmr_score:.3f}")

9. Async Client

The AsyncSwarnDBClient provides the same API surface using async/await. It uses grpc.aio under the hood.

Basic Usage

import asyncio
from swarndb import AsyncSwarnDBClient

async def main():
    async with AsyncSwarnDBClient(host="localhost", port=50051) as client:
        # Create collection
        await client.collections.create("articles", dimension=128)

        # Insert vectors
        for i in range(100):
            await client.vectors.insert(
                "articles",
                vector=[0.1 * (i % 10)] * 128,
                metadata={"index": i},
            )

        # Search
        results = await client.search.query("articles", [0.5] * 128, k=5)
        for r in results.results:
            print(f"ID: {r.id}, Score: {r.score:.4f}")

asyncio.run(main())

Concurrent Operations

The async client excels at running multiple operations in parallel:

async def concurrent_search(client):
    queries = [[0.1 * i] * 128 for i in range(10)]

    tasks = [
        client.search.query("articles", q, k=5)
        for q in queries
    ]
    results = await asyncio.gather(*tasks)

    for i, result in enumerate(results):
        print(f"Query {i}: {len(result.results)} results")

When to Use Async vs Sync

Use the async client when:

  • Your application already uses asyncio (web frameworks like FastAPI, aiohttp)
  • You need to run many concurrent searches or inserts
  • You want to overlap I/O with other async operations

Use the sync client when:

  • You are writing scripts, notebooks, or batch jobs
  • Your application does not use asyncio
  • Simplicity is more important than concurrency

10. Error Handling

All SDK exceptions inherit from SwarnDBError, so you can catch any error with a single clause or handle specific cases.

Exception Hierarchy

SwarnDBError (base)
  ConnectionError          # cannot reach server
  AuthenticationError      # invalid or missing API key
  CollectionError          # base for collection issues
    CollectionNotFoundError
    CollectionExistsError
  VectorError              # base for vector issues
    VectorNotFoundError
    DimensionMismatchError
  SearchError              # search operation failure
  GraphError               # graph operation failure
  MathError                # math operation failure

Catching Errors

from swarndb import SwarnDBClient
from swarndb.exceptions import (
    SwarnDBError,
    ConnectionError,
    CollectionNotFoundError,
    VectorNotFoundError,
    DimensionMismatchError,
    AuthenticationError,
)

with SwarnDBClient(host="localhost", port=50051) as client:
    try:
        info = client.collections.get("nonexistent")
    except CollectionNotFoundError as e:
        print(f"Collection not found: {e.collection_name}")

    # vectors.get returns None for missing ids; it does not raise.
    record = client.vectors.get("products", id=999999)
    if record is None:
        print("Vector missing")

    # VectorNotFoundError is still raised by update / delete when the
    # target id does not exist.
    try:
        client.vectors.delete("products", id=999999)
    except VectorNotFoundError as e:
        print(f"Vector missing: {e.vector_id}")

    try:
        client.vectors.insert("products", vector=[0.1, 0.2])  # wrong dimension
    except DimensionMismatchError as e:
        print(f"Expected {e.expected}d, got {e.got}d")

    try:
        client.search.query("products", [0.5] * 1536, k=10)
    except SwarnDBError as e:
        # Catch-all for any SDK error
        print(f"SwarnDB error: {e.message}")
        if e.details:
            print(f"Details: {e.details}")

11. Operational Endpoints

For deployments behind orchestration (Kubernetes, load balancers, oncall dashboards), SwarnDB exposes a set of operational endpoints reachable from the SDK. The async client mirrors every method below with the same name and the same return type.

Recovery Status

Returns the server's current boot-recovery snapshot, including the recovery path taken (snapshot only, snapshot plus WAL, full WAL replay, etc.), elapsed time since recovery started, and per-collection recovery path.

status = client.recovery_status()
print(status.path)             # e.g. "snapshot_plus_wal"
print(status.elapsed_secs)     # seconds since recovery began
for name, path in status.collections.items():
    print(name, path)

Per-Collection Persistence Status

Returns the snapshot and WAL LSN state for a single collection. Useful for verifying a recent write has been durably committed before the call returns to the caller.

ps = client.collections.persistence_status("docs")
print(ps.last_snapshot_lsn, ps.current_lsn, ps.next_lsn)

Force a Snapshot

Forces a synchronous snapshot for a collection and returns the LSN written. Useful before deliberate restarts or for taking snapshots on a schedule outside the server's auto-snapshot policy.

lsn = client.collections.snapshot("docs")
print(f"snapshot at LSN {lsn}")

Per-Collection Lock-Contention Metrics

Returns counters for lock acquisitions and total blocked time on a collection, useful for diagnosing contention under concurrent load.

m = client.collections.metrics("docs")
print(m.map_lock_acquisitions, m.collection_read_acquisitions)
print(m.collection_write_acquisitions, m.total_blocked_microseconds)

Liveness and Readiness Probes

Mirrors the Kubernetes-style /healthz and /readyz HTTP probes. A 503 from the server is not raised as an exception; it maps to healthy=False or ready=False on the returned dataclass. Transport, auth, and other failures still raise SwarnDBError.

h = client.healthz()
print(h.healthy, h.status, h.checks)

r = client.readyz()
print(r.ready, r.status, r.checks)

12. Type Reference

All types are frozen dataclasses imported from swarndb.types.

ScoredResult

A single search result with distance score and optional graph edges.

Field Type Description
id int Vector ID
score float Distance score (lower = more similar)
metadata dict[str, Any] Attached metadata (empty dict if not requested)
graph_edges list[GraphEdge] Related vectors via virtual graph

SearchResult

Result of a single search query.

Field Type Description
results list[ScoredResult] Matching vectors
search_time_us int Search duration in microseconds
warning str Optional warning message

BatchSearchResult

Result of a batch search operation.

Field Type Description
results list[SearchResult] One SearchResult per query
total_time_us int Total batch duration in microseconds

CollectionInfo

Metadata about a collection.

Field Type Description
name str Collection name
dimension int Vector dimensionality
distance_metric str Distance function name
vector_count int Number of stored vectors
default_threshold float Default similarity threshold

VectorRecord

A stored vector with its metadata.

Field Type Description
id int Vector ID
vector list[float] Vector values
metadata dict[str, Any] Attached metadata

BulkInsertResult

Result of a bulk insert operation (returned by bulk_insert and bulk_insert_from_path).

Field Type Description
inserted_count int Number of vectors inserted
errors list[str] Error messages (one per failed row, empty on full success)
last_completed_batch_idx int Index of the last batch fully committed before a partial failure (0 on full success or no batching)
last_committed_lsn int LSN of the last committed write in this operation
resume_token str Opaque token for resuming a streaming bulk insert from the next batch (empty on full success or non-resumable failures)
assigned_ids list[int] IDs assigned to inserted vectors, in input order

OptimizeResult

Result of a collection optimize operation.

Field Type Description
status str Operation status
message str Human-readable message
duration_ms int Duration in milliseconds
vectors_processed int Number of vectors processed

GraphEdge

An edge in the virtual graph.

Field Type Description
target_id int Connected vector ID
similarity float Edge similarity score

TraversalNode

A node visited during graph traversal.

Field Type Description
id int Vector ID
depth int Hop distance from start
path_similarity float Cumulative similarity along path
path list[int] Vector IDs along the traversal path

TypedNode

A node in the first-class typed graph (Hybrid mode). Returned by get_node, update_node, the enumeration methods, and node query terminals.

Field Type Description
id int Node ID
kind str "content" or "entity"
label str Entity label (empty for content nodes)
properties dict[str, Any] Mutable property bag
embedding list[float] Inline embedding (immutable; empty when none)
source str Provenance source (immutable)
created_at int Creation time, unix-epoch millis (immutable)
created_by str Creator (immutable)
history list[NodeAudit] Audit trail of NodeAudit(action, actor, at)
updated_at int Last-update time, unix-epoch millis

TypedEdge

A typed, directed edge in the first-class graph (Hybrid mode). Returned by get_edge, list_edges, the enumeration methods, and edge query terminals.

Field Type Description
id int Edge ID
source int Source node ID
target int Target node ID
edge_type str Edge type
properties dict[str, Any] Property bag
provenance dict[str, Any] Provenance bag
confidence float Confidence score
verified bool Whether the edge has been verified
is_manual bool Whether the edge was created manually
created_at int Creation time, unix-epoch millis
history list[EdgeAudit] Audit trail of EdgeAudit(action, actor, at)
valid_from int or None Start of the validity window, unix-epoch millis; None when unbounded
valid_until int or None End of the validity window (exclusive); None when unbounded
temporal_context str or None Regime label; None when context-free

NodePage

One page of a paginated node enumeration, returned by enumerate_nodes.

Field Type Description
nodes list[TypedNode] Nodes in this page
next_cursor int Pass as after_id to fetch the next page; 0 when exhausted
has_more bool Whether more pages remain

EdgePage

One page of a paginated edge enumeration, returned by enumerate_edges.

Field Type Description
edges list[TypedEdge] Edges in this page
next_cursor int Pass as after_id to fetch the next page; 0 when exhausted
has_more bool Whether more pages remain

GhostVector

A vector identified as isolated.

Field Type Description
id int Vector ID
isolation_score float Distance to nearest centroid

ConeSearchResult

A result from angular cone search.

Field Type Description
id int Vector ID
cosine_similarity float Cosine similarity to direction
angle_radians float Angle from cone axis

DriftReport

Report from distribution drift detection.

Field Type Description
centroid_shift float Distance between window centroids
mean_distance_window1 float Mean distance to centroid in window 1
mean_distance_window2 float Mean distance to centroid in window 2
spread_change float Change in spread between windows
has_drifted bool Whether drift exceeds the threshold

ClusterResult

Result of k-means clustering.

Field Type Description
centroids list[list[float]] Computed cluster centroids
assignments list[ClusterAssignment] Per-vector cluster assignments
iterations int Number of iterations run
converged bool Whether k-means converged

ClusterAssignment

Assignment of a vector to a cluster.

Field Type Description
id int Vector ID
cluster int Assigned cluster index
distance_to_centroid float Distance to cluster centroid

PCAResult

Result of PCA dimensionality reduction.

Field Type Description
components list[list[float]] Principal component vectors
explained_variance list[float] Variance explained per component
mean list[float] Mean vector of input data
projected list[list[float]] Projected lower-dimensional vectors

DiversityResult

A result from MMR diversity sampling.

Field Type Description
id int Vector ID
relevance_score float Relevance to the query
mmr_score float Combined MMR score

RecoveryStatus

Server boot-recovery snapshot returned by client.recovery_status().

Field Type Description
path str Recovery path taken at boot, for example "none", "snapshot_only", "snapshot_plus_wal", "full_wal_replay"
elapsed_secs int Seconds since recovery began
collections dict[str, str] Per-collection recovery path

PersistenceStatus

Per-collection persistence and LSN state returned by client.collections.persistence_status(name).

Field Type Description
last_snapshot_lsn int LSN of the last persisted snapshot
current_lsn int LSN of the most recently durably committed write
next_lsn int LSN that the next write will receive

CollectionMetrics

Per-collection lock-contention counters returned by client.collections.metrics(name).

Field Type Description
map_lock_acquisitions int Acquisitions of the collection map read lock
collection_read_acquisitions int Read-lock acquisitions on this collection
collection_write_acquisitions int Write-lock acquisitions on this collection
total_blocked_microseconds int Total time waited for these locks, in microseconds

ReadinessStatus

Result of the Kubernetes-style readiness probe returned by client.readyz().

Field Type Description
ready bool True when the server returned 200, False on 503
status str Status string from the probe body ("ready" or "not_ready")
checks dict[str, str] Per-check status (e.g. collections_accessible, collections_loaded)

HealthStatus

Result of the Kubernetes-style liveness probe returned by client.healthz().

Field Type Description
healthy bool True when the server returned 200, False on 503
status str Status string from the probe body ("alive")
checks dict[str, str] Per-check status, when present