Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.cassandra.index.sai.disk.v1.vector;

import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.db.CellSourceIdentifier;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
Expand Down Expand Up @@ -52,6 +53,12 @@ public PrimaryKey primaryKey()
return primaryKey;
}

@VisibleForTesting
public float score()
{
return indexScore;
}

public boolean isIndexDataValid(Row row, long nowInSecs)
{
// If the indexed column is part of the primary key, we don't need this type of validation because we would have
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ public class VectorMemoryIndex extends MemoryIndex
private final Memtable memtable;
private final LongAdder writeCount = new LongAdder();

private PrimaryKey minimumKey;
private PrimaryKey maximumKey;
private volatile KeyBounds keyBounds;

private final NavigableSet<PrimaryKey> primaryKeys = new ConcurrentSkipListSet<>();

Expand All @@ -86,7 +85,7 @@ public VectorMemoryIndex(StorageAttachedIndex index, Memtable memtable)
}

@Override
public synchronized long add(DecoratedKey key, Clustering<?> clustering, ByteBuffer value)
public long add(DecoratedKey key, Clustering<?> clustering, ByteBuffer value)
{
if (value == null || value.remaining() == 0 || !index.validateTermSize(key, value, false, null))
return 0;
Expand All @@ -98,11 +97,11 @@ public synchronized long add(DecoratedKey key, Clustering<?> clustering, ByteBuf

private long index(PrimaryKey primaryKey, ByteBuffer value)
{
updateKeyBounds(primaryKey);

writeCount.increment();
primaryKeys.add(primaryKey);
return graph.add(value, primaryKey, OnHeapGraph.InvalidVectorBehavior.FAIL);
long bytesUsed = graph.add(value, primaryKey, OnHeapGraph.InvalidVectorBehavior.FAIL);

@dcapwell dcapwell Jun 30, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feel that its best to do graph.add before primaryKeys.add; the reason is for orderBy(), it tries to score a key not yet in the graph, if we update the graph first this race goes away? It also keeps the publishing order consistent with key bounds?

This would also be consistent with long update() which writes to graph first

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

updateKeyBounds(primaryKey);
return bytesUsed;
}

@Override
Expand All @@ -129,9 +128,6 @@ public long update(DecoratedKey key, Clustering<?> clustering, ByteBuffer oldVal
{
PrimaryKey primaryKey = index.hasClustering() ? index.keyFactory().create(key, clustering)
: index.keyFactory().create(key);
// update bounds because only rows with vectors are included in the key bounds,
// so if the vector was null before, we won't have included it
updateKeyBounds(primaryKey);

// make the changes in this order, so we don't have a window where the row is not in the index at all
if (newRemaining > 0)
Expand All @@ -142,20 +138,18 @@ public long update(DecoratedKey key, Clustering<?> clustering, ByteBuffer oldVal
// remove primary key if it's no longer indexed
if (newRemaining <= 0 && oldRemaining > 0)
primaryKeys.remove(primaryKey);

// update bounds because only rows with vectors are included in the key bounds,
// so if the vector was null before, we won't have included it
updateKeyBounds(primaryKey);
}
return bytesUsed;
}

private void updateKeyBounds(PrimaryKey primaryKey)
private synchronized void updateKeyBounds(PrimaryKey primaryKey)
{
if (minimumKey == null)
minimumKey = primaryKey;
else if (primaryKey.compareTo(minimumKey) < 0)
minimumKey = primaryKey;
if (maximumKey == null)
maximumKey = primaryKey;
else if (primaryKey.compareTo(maximumKey) > 0)
maximumKey = primaryKey;
KeyBounds current = keyBounds;
keyBounds = current == null ? new KeyBounds(primaryKey, primaryKey) : current.withUpdated(primaryKey);
}

@Override
Expand Down Expand Up @@ -211,15 +205,15 @@ public CloseableIterator<PrimaryKeyWithScore> orderBy(QueryContext queryContext,
@Override
public CloseableIterator<PrimaryKeyWithScore> orderResultsBy(QueryContext queryContext, List<PrimaryKey> results, Expression orderer)
{
if (minimumKey == null)
// This case implies maximumKey is empty too.
KeyBounds bounds = keyBounds;
if (bounds == null)
return CloseableIterator.empty();

int limit = queryContext.limit();

List<PrimaryKey> resultsInRange = results.stream()
.dropWhile(k -> k.compareTo(minimumKey) < 0)
.takeWhile(k -> k.compareTo(maximumKey) <= 0)
.dropWhile(k -> k.compareTo(bounds.minimum) < 0)
.takeWhile(k -> k.compareTo(bounds.maximum) <= 0)
.collect(Collectors.toList());

int maxBruteForceRows = maxBruteForceRows(limit, resultsInRange.size(), graph.size());
Expand Down Expand Up @@ -418,4 +412,25 @@ public void close()
FileUtils.closeQuietly(nodeScores);
}
}

private static final class KeyBounds
{
final PrimaryKey minimum;
final PrimaryKey maximum;

KeyBounds(PrimaryKey minimum, PrimaryKey maximum)
{
this.minimum = minimum;
this.maximum = maximum;
}

KeyBounds withUpdated(PrimaryKey key)
{
PrimaryKey newMin = minimum.compareTo(key) > 0 ? key : minimum;
PrimaryKey newMax = maximum.compareTo(key) < 0 ? key : maximum;

// Avoid allocation if nothing changed
return newMin == minimum && newMax == maximum ? this : new KeyBounds(newMin, newMax);
}
}
}
Loading