From 1b098bf292a0b2763700294237fa99d8e0678d7b Mon Sep 17 00:00:00 2001 From: Caleb Rackliffe Date: Fri, 26 Jun 2026 15:35:42 -0400 Subject: [PATCH 1/4] Make synchronization on VectorMemoryIndex inserts more granular patch by Caleb Rackliffe; reviewed by ? for CASSANDRA-21160 --- .../disk/v1/vector/PrimaryKeyWithScore.java | 7 + .../index/sai/memory/VectorMemoryIndex.java | 59 +- .../sai/memory/VectorMemoryIndexTest.java | 511 +++++++++++++++++- 3 files changed, 543 insertions(+), 34 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/vector/PrimaryKeyWithScore.java b/src/java/org/apache/cassandra/index/sai/disk/v1/vector/PrimaryKeyWithScore.java index c81c4ebd20df..9ea1181c2141 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/vector/PrimaryKeyWithScore.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/vector/PrimaryKeyWithScore.java @@ -18,6 +18,7 @@ package org.apache.cassandra.index.sai.disk.v1.vector; +import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.db.CellSourceIdentifier; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.Row; @@ -52,6 +53,12 @@ public PrimaryKey primaryKey() return primaryKey; } + @VisibleForTesting + public float score() + { + return indexScore; + } + public boolean isIndexDataValid(Row row, long nowInSecs) { // If the indexed column is part of the primary key, we don't need this type of validation because we would have diff --git a/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java b/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java index c9d5aa28895b..8c35cc66e15e 100644 --- a/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java +++ b/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java @@ -73,8 +73,7 @@ public class VectorMemoryIndex extends MemoryIndex private final Memtable memtable; private final LongAdder writeCount = new LongAdder(); - private PrimaryKey minimumKey; - private PrimaryKey maximumKey; + private volatile KeyBounds keyBounds; private final NavigableSet primaryKeys = new ConcurrentSkipListSet<>(); @@ -86,7 +85,7 @@ public VectorMemoryIndex(StorageAttachedIndex index, Memtable memtable) } @Override - public synchronized long add(DecoratedKey key, Clustering clustering, ByteBuffer value) + public long add(DecoratedKey key, Clustering clustering, ByteBuffer value) { if (value == null || value.remaining() == 0 || !index.validateTermSize(key, value, false, null)) return 0; @@ -98,11 +97,11 @@ public synchronized long add(DecoratedKey key, Clustering clustering, ByteBuf private long index(PrimaryKey primaryKey, ByteBuffer value) { - updateKeyBounds(primaryKey); - writeCount.increment(); primaryKeys.add(primaryKey); - return graph.add(value, primaryKey, OnHeapGraph.InvalidVectorBehavior.FAIL); + long bytesUsed = graph.add(value, primaryKey, OnHeapGraph.InvalidVectorBehavior.FAIL); + updateKeyBounds(primaryKey); + return bytesUsed; } @Override @@ -129,9 +128,6 @@ public long update(DecoratedKey key, Clustering clustering, ByteBuffer oldVal { PrimaryKey primaryKey = index.hasClustering() ? index.keyFactory().create(key, clustering) : index.keyFactory().create(key); - // update bounds because only rows with vectors are included in the key bounds, - // so if the vector was null before, we won't have included it - updateKeyBounds(primaryKey); // make the changes in this order, so we don't have a window where the row is not in the index at all if (newRemaining > 0) @@ -142,20 +138,18 @@ public long update(DecoratedKey key, Clustering clustering, ByteBuffer oldVal // remove primary key if it's no longer indexed if (newRemaining <= 0 && oldRemaining > 0) primaryKeys.remove(primaryKey); + + // update bounds because only rows with vectors are included in the key bounds, + // so if the vector was null before, we won't have included it + updateKeyBounds(primaryKey); } return bytesUsed; } - private void updateKeyBounds(PrimaryKey primaryKey) + private synchronized void updateKeyBounds(PrimaryKey primaryKey) { - if (minimumKey == null) - minimumKey = primaryKey; - else if (primaryKey.compareTo(minimumKey) < 0) - minimumKey = primaryKey; - if (maximumKey == null) - maximumKey = primaryKey; - else if (primaryKey.compareTo(maximumKey) > 0) - maximumKey = primaryKey; + KeyBounds current = keyBounds; + keyBounds = current == null ? new KeyBounds(primaryKey, primaryKey) : current.withUpdated(primaryKey); } @Override @@ -211,15 +205,15 @@ public CloseableIterator orderBy(QueryContext queryContext, @Override public CloseableIterator orderResultsBy(QueryContext queryContext, List results, Expression orderer) { - if (minimumKey == null) - // This case implies maximumKey is empty too. + KeyBounds bounds = keyBounds; + if (bounds == null) return CloseableIterator.empty(); int limit = queryContext.limit(); List resultsInRange = results.stream() - .dropWhile(k -> k.compareTo(minimumKey) < 0) - .takeWhile(k -> k.compareTo(maximumKey) <= 0) + .dropWhile(k -> k.compareTo(bounds.minimum) < 0) + .takeWhile(k -> k.compareTo(bounds.maximum) <= 0) .collect(Collectors.toList()); int maxBruteForceRows = maxBruteForceRows(limit, resultsInRange.size(), graph.size()); @@ -418,4 +412,25 @@ public void close() FileUtils.closeQuietly(nodeScores); } } + + private static final class KeyBounds + { + final PrimaryKey minimum; + final PrimaryKey maximum; + + KeyBounds(PrimaryKey minimum, PrimaryKey maximum) + { + this.minimum = minimum; + this.maximum = maximum; + } + + KeyBounds withUpdated(PrimaryKey key) + { + PrimaryKey newMin = minimum.compareTo(key) > 0 ? key : minimum; + PrimaryKey newMax = maximum.compareTo(key) < 0 ? key : maximum; + + // Avoid allocation if nothing changed + return newMin == minimum && newMax == maximum ? this : new KeyBounds(newMin, newMax); + } + } } diff --git a/test/unit/org/apache/cassandra/index/sai/memory/VectorMemoryIndexTest.java b/test/unit/org/apache/cassandra/index/sai/memory/VectorMemoryIndexTest.java index a1ef40cc95c6..dc11fc97b1fb 100644 --- a/test/unit/org/apache/cassandra/index/sai/memory/VectorMemoryIndexTest.java +++ b/test/unit/org/apache/cassandra/index/sai/memory/VectorMemoryIndexTest.java @@ -25,8 +25,19 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.junit.Before; @@ -62,6 +73,7 @@ import org.apache.cassandra.index.sai.StorageAttachedIndex; import org.apache.cassandra.index.sai.disk.v1.vector.PrimaryKeyWithScore; import org.apache.cassandra.index.sai.plan.Expression; +import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.inject.Injections; import org.apache.cassandra.inject.InvokePointBuilder; import org.apache.cassandra.locator.TokenMetadata; @@ -70,9 +82,12 @@ import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.FBUtilities; -import static org.apache.cassandra.config.CassandraRelevantProperties.MEMTABLE_SHARD_COUNT; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import static org.apache.cassandra.config.CassandraRelevantProperties.MEMTABLE_SHARD_COUNT; public class VectorMemoryIndexTest extends SAITester { @@ -82,11 +97,13 @@ public class VectorMemoryIndexTest extends SAITester .onMethod("search")) .build(); + private static final double RECALL_THRESHOLD = 0.9; + private ColumnFamilyStore cfs; private StorageAttachedIndex index; private VectorMemoryIndex memtableIndex; private IPartitioner partitioner; - private Map keyMap; + private ConcurrentMap keyMap; private Map rowMap; private int dimensionCount; @@ -107,7 +124,7 @@ public void setup() throws Throwable cfs = index.baseCfs(); partitioner = cfs.getPartitioner(); indexSearchCounter.reset(); - keyMap = new TreeMap<>(); + keyMap = new ConcurrentHashMap<>(); rowMap = new HashMap<>(); Injections.inject(indexSearchCounter); @@ -133,7 +150,6 @@ public void randomQueryTest() throws Exception List keys = new ArrayList<>(keyMap.keySet()); long actualVectorsReturned = 0; long expectedVectorsReturned = 0; - double expectedRecall = 0.9; for (int executionCount = 0; executionCount < 1000; executionCount++) { @@ -182,18 +198,489 @@ public void randomQueryTest() throws Exception expectedVectorsReturned += expectedResults; if (foundKeys.size() < expectedResults) assertTrue("Expected at least " + expectedResults + " results but got " + foundKeys.size(), - foundKeys.size() >= expectedResults * expectedRecall); + foundKeys.size() >= expectedResults * RECALL_THRESHOLD); } } assertTrue("Expected at least " + expectedVectorsReturned + " results but got " + actualVectorsReturned, - actualVectorsReturned >= expectedVectorsReturned * expectedRecall); + actualVectorsReturned >= expectedVectorsReturned * RECALL_THRESHOLD); + } + + /** + * Verifies that concurrent calls to add() do not corrupt the graph or lose data. + *

+ * GraphIndexBuilder.addGraphNode() is designed for concurrent use: insertionsInProgress + * is a ConcurrentSkipListSet, and PoolingSupport gives each thread its own GraphSearcher + * and scratch arrays. This test validates the full stack from VectorMemoryIndex.index() + * through OnHeapGraph.add() through GraphIndexBuilder.addGraphNode(). + *

+ * Each thread owns a disjoint range of partition key integers so every insert is a + * pure add with no PK collisions, isolating add() from update() semantics. + *

+ * After all writers complete, a full-ring search must return the vast majority of + * inserted keys with valid scores, confirming no data was lost or corrupted. + */ + @Test + public void testConcurrentAddsProduceConsistentFinalState() throws Exception + { + Memtable memtable = Mockito.mock(Memtable.class); + memtableIndex = new VectorMemoryIndex(index, memtable); + + int numThreads = Runtime.getRuntime().availableProcessors(); + int vectorsPerThread = 2000; + int totalInserted = numThreads * vectorsPerThread; + + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + // CyclicBarrier ensures all threads begin inserting simultaneously, + // maximizing contention on GraphIndexBuilder and ConcurrentVectorValues. + CyclicBarrier barrier = new CyclicBarrier(numThreads); + List> futures = new ArrayList<>(); + + for (int t = 0; t < numThreads; t++) + { + final int threadId = t; + futures.add(executor.submit(() -> { + try + { + barrier.await(); + for (int i = 0; i < vectorsPerThread; i++) + { + // Partition key is globally unique across all threads + int pk = threadId * vectorsPerThread + i; + addRow(pk, randomVectorFromThreadLocal()); + } + } + catch (BrokenBarrierException | InterruptedException e) + { + throw new RuntimeException(e); + } + })); + } + + executor.shutdown(); + assertTrue("Timed out waiting for concurrent adds", executor.awaitTermination(60, TimeUnit.SECONDS)); + + // Rethrow any exception from worker threads — assertion failures inside a + // Runnable are otherwise silently swallowed by the ExecutorService. + for (Future f : futures) + { + try + { + f.get(); + } + catch (ExecutionException e) + { + fail("Worker thread threw during concurrent add(): " + e.getCause()); + } + } + + // After all writes complete, a full-ring search with limit == totalInserted + // must return the vast majority of distinct results. Every returned key must + // have been inserted by a worker thread, and every score must be a valid + // positive float (a zero or NaN score would indicate graph corruption). + AbstractBounds fullRing = new Range<>(partitioner.getMinimumToken().minKeyBound(), partitioner.getMinimumToken().minKeyBound()); + Expression expression = generateRandomExpression(); + ReadCommand command = PartitionRangeReadCommand.create(cfs.metadata(), + FBUtilities.nowInSeconds(), + ColumnFilter.all(cfs.metadata()), + RowFilter.none(), + DataLimits.cqlLimits(totalInserted), + DataRange.allData(cfs.metadata().partitioner)); + + QueryContext queryContext = new QueryContext(command, DatabaseDescriptor.getRangeRpcTimeout(TimeUnit.MILLISECONDS)); + Set foundKeys = new HashSet<>(); + + try (CloseableIterator iterator = memtableIndex.orderBy(queryContext, expression, fullRing)) + { + while (iterator.hasNext()) + { + PrimaryKeyWithScore result = iterator.next(); + assertNotNull("Null PrimaryKey in search results after concurrent adds", result.primaryKey()); + + float score = result.score(); + assertTrue("Non-finite score after concurrent adds: " + score, Float.isFinite(score)); + + // All vector components are drawn from [0, 1) via ThreadLocalRandom.nextFloat(), + // so every term in the dot product is non-negative and the sum is strictly positive. + // A score of 0f or below would indicate graph corruption, not a valid similarity result. + assertTrue("Non-positive score after concurrent adds: " + score, score > 0f); + + int pk = Int32Type.instance.compose(result.primaryKey().partitionKey().getKey()); + assertFalse("Duplicate key returned after concurrent adds: " + pk, foundKeys.contains(pk)); + assertTrue("Returned key " + pk + " was not inserted by any worker thread", keyMap.containsKey(result.primaryKey().partitionKey())); + foundKeys.add(pk); + } + } + + // ANN recall is approximate — use a tolerance consistent with randomQueryTest + // rather than asserting exact equality, which would be flaky at high dimensionCount. + assertTrue("Search returned " + foundKeys.size() + " of " + totalInserted + " results after concurrent adds (expected at least " + (int) (totalInserted * RECALL_THRESHOLD) + ')', + foundKeys.size() >= totalInserted * RECALL_THRESHOLD); + } + + /** + * Verifies that orderBy() never throws an exception while concurrent add() calls + * are in progress, and that the index reaches a consistent state once writes settle. + *

+ * Missing results during concurrent writes are expected and correct — a read that + * races with a write is allowed to miss that write (valid linearization). The only + * invariant asserted during the write window is safety: no exceptions, no null PKs, + * no non-finite scores from results that *are* returned. + *

+ * Readers block on writersFinished after the barrier release and perform one final + * search pass after all writers have joined, confirming full consistency at rest. + */ + @Test + public void testConcurrentAddsAndSearchesNeverThrow() throws Exception + { + Memtable memtable = Mockito.mock(Memtable.class); + memtableIndex = new VectorMemoryIndex(index, memtable); + + int numWriterThreads = Runtime.getRuntime().availableProcessors(); + int numReaderThreads = Runtime.getRuntime().availableProcessors(); + int vectorsPerWriter = 2000; + int totalInserted = numWriterThreads * vectorsPerWriter; + + // Pre-seed enough rows that orderBy() always has a non-empty graph to search, + // avoiding the early-return in OnHeapGraph.search() when vectorValues.size() == 0 + // which would prevent readers from exercising any real code paths. + int preSeedCount = 50; + for (int i = 1; i <= preSeedCount; i++) + addRow(-i, randomVector()); // negative PKs, disjoint from writer range [0, totalInserted) + + // each reader blocks here until every writer has completed, + // then performs one final search to verify post-settlement consistency. + CountDownLatch writersFinished = new CountDownLatch(numWriterThreads); + + // phase1Executed: confirms at least one reader searched during the concurrent + // write window. If this latch is never counted down, the write window was too + // short and the concurrent safety assertions in Phase 1 were never exercised. + AtomicBoolean phase1Executed = new AtomicBoolean(false); + + CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); + ExecutorService executor = Executors.newFixedThreadPool(numWriterThreads + numReaderThreads); + CyclicBarrier barrier = new CyclicBarrier(numWriterThreads + numReaderThreads); + + // Writers: each inserts into a disjoint PK range + for (int t = 0; t < numWriterThreads; t++) + { + final int threadId = t; + executor.submit(() -> { + try + { + barrier.await(); + for (int i = 0; i < vectorsPerWriter; i++) + addRow(threadId * vectorsPerWriter + i, randomVectorFromThreadLocal()); + } + catch (Throwable e) + { + errors.add(e); + } + finally + { + writersFinished.countDown(); + } + }); + } + + // Readers: issue one search while writers are running (safety only), then + // block on writersFinished and issue one final search for correctness. + for (int t = 0; t < numReaderThreads; t++) + { + executor.submit(() -> { + try + { + barrier.await(); + + AbstractBounds fullRing = new Range<>(partitioner.getMinimumToken().minKeyBound(), partitioner.getMinimumToken().minKeyBound()); + + ReadCommand command = PartitionRangeReadCommand.create(cfs.metadata(), + FBUtilities.nowInSeconds(), + ColumnFilter.all(cfs.metadata()), + RowFilter.none(), + DataLimits.cqlLimits(totalInserted + preSeedCount), + DataRange.allData(cfs.metadata().partitioner)); + + QueryContext queryContext = new QueryContext(command, DatabaseDescriptor.getRangeRpcTimeout(TimeUnit.MILLISECONDS)); + + // Build query vectors inline — getRandom() is not thread-safe from + // worker threads, so we use ThreadLocalRandom directly. + ByteBuffer queryBuf = randomVectorFromThreadLocal(); + Expression concurrentExpression = Expression.create(index); + concurrentExpression.add(Operator.ANN, queryBuf); + + // --- Phase 1: one search while writers are still running --- + // Safety assertions only. Missing results are a valid linearization + // of concurrent read/write and are not asserted against here. + try (CloseableIterator it = memtableIndex.orderBy(queryContext, concurrentExpression, fullRing)) + { + while (it.hasNext()) + { + PrimaryKeyWithScore result = it.next(); + assertNotNull("Null PrimaryKey during concurrent add() + orderBy()", result.primaryKey()); + assertTrue("Non-finite score during concurrent add() + orderBy(): " + result.score(), Float.isFinite(result.score())); + } + } + phase1Executed.set(true); + + // --- Phase 2: block until all writers finish, then verify consistency --- + writersFinished.await(); + + ByteBuffer settledQueryBuf = randomVectorFromThreadLocal(); + Expression settledExpression = Expression.create(index); + settledExpression.add(Operator.ANN, settledQueryBuf); + + Set foundAfterSettle = new HashSet<>(); + try (CloseableIterator it = memtableIndex.orderBy(queryContext, settledExpression, fullRing)) + { + while (it.hasNext()) + { + PrimaryKeyWithScore result = it.next(); + assertNotNull("Null PrimaryKey after writes settled", result.primaryKey()); + assertTrue("Non-finite score after writes settled: " + result.score(), Float.isFinite(result.score())); + assertTrue("Non-positive score after writes settled: " + result.score(), result.score() > 0f); + int pk = Int32Type.instance.compose(result.primaryKey().partitionKey().getKey()); + foundAfterSettle.add(pk); + } + } + + // ANN recall is approximate, so we allow a small miss rate rather + // than asserting exact equality. Pre-seeded keys (negative PKs) are + // included in the limit so they do not crowd out writer-inserted keys. + int expectedMinimum = (int) (totalInserted * RECALL_THRESHOLD); + long writerKeysFound = foundAfterSettle.stream().filter(pk -> pk >= 0).count(); + assertTrue("Only " + writerKeysFound + " of " + totalInserted + " writer-inserted keys found after writes settled" + " (expected at least " + expectedMinimum + ')', + writerKeysFound >= expectedMinimum); + } + catch (Throwable e) + { + errors.add(e); + } + }); + } + + executor.shutdown(); + assertTrue("Timed out waiting for concurrent add() + orderBy()", executor.awaitTermination(60, TimeUnit.SECONDS)); + + // Verify Phase 1 actually executed — if this fails, increase vectorsPerWriter + // so the write window is wide enough for readers to search concurrently. + assertTrue("No reader executed a search during the concurrent write window; increase vectorsPerWriter to widen the write window", phase1Executed.get()); + + if (!errors.isEmpty()) + { + AssertionError failure = new AssertionError("Concurrent add() + orderBy() produced " + errors.size() + " error(s); first: " + errors.get(0)); + errors.forEach(failure::addSuppressed); + throw failure; + } + } + + /** + * Verifies that expectedNodesVisited() always returns a value within its documented + * bounds: at least min(limit, graphSize) and at most graphSize. + *

+ * This is a pure arithmetic test with no index infrastructure required. It exercises + * the boundary conditions that matter for the brute-force/ANN threshold decision in + * maxBruteForceRows(): if the formula underflows its lower bound, small queries will + * incorrectly use ANN; if it overflows its upper bound, the result is nonsensical. + */ + @Test + public void testExpectedNodesVisitedRespectsBounds() + { + int[] graphSizes = { 1, 2, 10, 100, 1000, 10000 }; + int[] limits = { 1, 2, 5, 10, 50, 100 }; + double[] permittedFractions = { 0.01, 0.1, 0.5, 1.0, 2.0 }; + + for (int graphSize : graphSizes) + { + for (int limit : limits) + { + for (double fraction : permittedFractions) + { + int permitted = Math.max(1, (int) (graphSize * fraction)); + int result = VectorMemoryIndex.expectedNodesVisited(limit, permitted, graphSize); + int lowerBound = Math.min(limit, graphSize); + + assertTrue(String.format("expectedNodesVisited(%d, %d, %d) = %d is below lower bound %d", limit, permitted, graphSize, result, lowerBound), + result >= lowerBound); + + assertTrue(String.format("expectedNodesVisited(%d, %d, %d) = %d exceeds graphSize %d", limit, permitted, graphSize, result, graphSize), + result <= graphSize); + } + } + } } + /** + * Verifies that orderResultsBy() never throws while concurrent add() calls are in + * progress, and that the index reaches a consistent state once writes settle. + *

+ * orderResultsBy() reads the unsynchronized fields minimumKey and maximumKey to bound + * the materialized key list before scoring. A stale read of either field can silently + * drop valid keys. This test confirms: + *

    + *
  • Safety: no exceptions, no null PKs, no non-finite scores during concurrent writes.
  • + *
  • Liveness: after all writers finish, a call with the full key list returns at least + * RECALL_THRESHOLD of the inserted writer keys.
  • + *
+ *

+ * The materialized key list passed to orderResultsBy() is built from keyMap, which is a + * ConcurrentHashMap updated by every addRow() call. A snapshot taken mid-write may be + * incomplete — this is intentional and mirrors the production path where the source + * KeyRangeIterator only sees keys committed before the non-ANN index scan ran. + */ @Test - public void indexIteratorTest() + public void testConcurrentAddsAndOrderResultsByNeverThrow() throws Exception + { + Memtable memtable = Mockito.mock(Memtable.class); + memtableIndex = new VectorMemoryIndex(index, memtable); + + int numWriterThreads = Runtime.getRuntime().availableProcessors(); + int numReaderThreads = Runtime.getRuntime().availableProcessors(); + int vectorsPerWriter = 2000; + int totalInserted = numWriterThreads * vectorsPerWriter; + + // Pre-seed rows so orderResultsBy() always has a non-empty [minimumKey, maximumKey] + // window and a non-trivial resultsInRange list on the first reader pass. + int preSeedCount = 50; + for (int i = 1; i <= preSeedCount; i++) + addRow(-i, randomVector()); // negative PKs, disjoint from writer range [0, totalInserted) + + CountDownLatch writersFinished = new CountDownLatch(numWriterThreads); + AtomicBoolean phase1Executed = new AtomicBoolean(false); + CopyOnWriteArrayList errors = new CopyOnWriteArrayList<>(); + + ExecutorService executor = Executors.newFixedThreadPool(numWriterThreads + numReaderThreads); + CyclicBarrier barrier = new CyclicBarrier(numWriterThreads + numReaderThreads); + + // Writers: each inserts into a disjoint PK range [threadId*vectorsPerWriter, (threadId+1)*vectorsPerWriter) + for (int t = 0; t < numWriterThreads; t++) + { + final int threadId = t; + executor.submit(() -> { + try + { + barrier.await(); + for (int i = 0; i < vectorsPerWriter; i++) + addRow(threadId * vectorsPerWriter + i, randomVectorFromThreadLocal()); + } + catch (Throwable e) + { + errors.add(e); + } + finally + { + writersFinished.countDown(); + } + }); + } + + for (int t = 0; t < numReaderThreads; t++) + { + executor.submit(() -> { + try + { + barrier.await(); + + ReadCommand command = PartitionRangeReadCommand.create(cfs.metadata(), + FBUtilities.nowInSeconds(), + ColumnFilter.all(cfs.metadata()), + RowFilter.none(), + DataLimits.cqlLimits(totalInserted + preSeedCount), + DataRange.allData(cfs.metadata().partitioner)); + QueryContext queryContext = new QueryContext(command, DatabaseDescriptor.getRangeRpcTimeout(TimeUnit.MILLISECONDS)); + + // --- Phase 1: search during concurrent writes --- + // Snapshot the keys visible so far; the list may be incomplete, which is + // a valid linearization. We only assert safety here, not completeness. + List snapshotKeys = buildSortedPrimaryKeySnapshot(); + + ByteBuffer queryBuf = randomVectorFromThreadLocal(); + Expression concurrentExpression = Expression.create(index); + concurrentExpression.add(Operator.ANN, queryBuf); + + if (!snapshotKeys.isEmpty()) + { + try (CloseableIterator it = memtableIndex.orderResultsBy(queryContext, snapshotKeys, concurrentExpression)) + { + while (it.hasNext()) + { + PrimaryKeyWithScore result = it.next(); + assertNotNull("Null PrimaryKey during concurrent add() + orderResultsBy()", result.primaryKey()); + assertTrue("Non-finite score during concurrent add() + orderResultsBy(): " + result.score(), Float.isFinite(result.score())); + } + } + phase1Executed.set(true); + } + + // --- Phase 2: wait for all writers, then verify correctness --- + writersFinished.await(); + + List allKeys = buildSortedPrimaryKeySnapshot(); + ByteBuffer settledQueryBuf = randomVectorFromThreadLocal(); + Expression settledExpression = Expression.create(index); + settledExpression.add(Operator.ANN, settledQueryBuf); + + Set foundAfterSettle = new HashSet<>(); + try (CloseableIterator it = memtableIndex.orderResultsBy(queryContext, allKeys, settledExpression)) + { + while (it.hasNext()) + { + PrimaryKeyWithScore result = it.next(); + assertNotNull("Null PrimaryKey after writes settled in orderResultsBy()", result.primaryKey()); + assertTrue("Non-finite score after writes settled in orderResultsBy(): " + result.score(), Float.isFinite(result.score())); + assertTrue("Non-positive score after writes settled in orderResultsBy(): " + result.score(), result.score() > 0f); + int pk = Int32Type.instance.compose(result.primaryKey().partitionKey().getKey()); + foundAfterSettle.add(pk); + } + } + + long writerKeysFound = foundAfterSettle.stream().filter(pk -> pk >= 0).count(); + int expectedMinimum = (int) (totalInserted * RECALL_THRESHOLD); + assertTrue("orderResultsBy() returned " + writerKeysFound + " of " + totalInserted + " writer-inserted keys after writes settled (expected at least " + expectedMinimum + ')', + writerKeysFound >= expectedMinimum); + } + catch (Throwable e) + { + errors.add(e); + } + }); + } + + executor.shutdown(); + assertTrue("Timed out waiting for concurrent add() + orderResultsBy()", executor.awaitTermination(60, TimeUnit.SECONDS)); + assertTrue("No reader executed a Phase 1 search during the concurrent write window; increase vectorsPerWriter to widen the write window", phase1Executed.get()); + + if (!errors.isEmpty()) + { + AssertionError failure = new AssertionError("Concurrent add() + orderResultsBy() produced " + errors.size() + " error(s); first: " + errors.get(0)); + errors.forEach(failure::addSuppressed); + throw failure; + } + } + + /** + * Builds a sorted List from the current snapshot of keyMap. + * Mirrors the production input to orderResultsBy(): a pre-sorted list of keys + * that passed a prior non-ANN index scan, in natural PrimaryKey order. + */ + private List buildSortedPrimaryKeySnapshot() + { + return keyMap.keySet() + .stream() + .map(dk -> index.hasClustering() ? index.keyFactory().create(dk, Clustering.EMPTY) : index.keyFactory().create(dk)) + .sorted() + .collect(Collectors.toList()); + } + + /** + * @return a thread-safe random vector {@link ByteBuffer} + */ + private ByteBuffer randomVectorFromThreadLocal() { - // VSTODO + List rawVector = new ArrayList<>(dimensionCount); + for (int i = 0; i < dimensionCount; i++) + rawVector.add(ThreadLocalRandom.current().nextFloat()); + return VectorType.getInstance(FloatType.instance, dimensionCount).getSerializer().serialize(rawVector); } private Expression generateRandomExpression() @@ -203,11 +690,11 @@ private Expression generateRandomExpression() return expression; } - private ByteBuffer randomVector() { + private ByteBuffer randomVector() + { List rawVector = new ArrayList<>(dimensionCount); - for (int i = 0; i < dimensionCount; i++) { + for (int i = 0; i < dimensionCount; i++) rawVector.add(getRandom().nextFloat()); - } return VectorType.getInstance(FloatType.instance, dimensionCount).getSerializer().serialize(rawVector); } From 3b03779c4f42839bed28bcd30e9088e137dd1794 Mon Sep 17 00:00:00 2001 From: Caleb Rackliffe Date: Tue, 30 Jun 2026 17:43:53 -0500 Subject: [PATCH 2/4] - expanded old testConcurrentAddsProduceConsistentFinalState() to include vector data more likely to produce conflicts/races - moved primaryKeys.add() after graph.add() to be consistent with update() --- .../index/sai/memory/VectorMemoryIndex.java | 2 +- .../sai/memory/VectorMemoryIndexTest.java | 57 ++++++++++++------- 2 files changed, 39 insertions(+), 20 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java b/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java index 8c35cc66e15e..dba006bcb8c7 100644 --- a/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java +++ b/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java @@ -98,8 +98,8 @@ public long add(DecoratedKey key, Clustering clustering, ByteBuffer value) private long index(PrimaryKey primaryKey, ByteBuffer value) { writeCount.increment(); - primaryKeys.add(primaryKey); long bytesUsed = graph.add(value, primaryKey, OnHeapGraph.InvalidVectorBehavior.FAIL); + primaryKeys.add(primaryKey); updateKeyBounds(primaryKey); return bytesUsed; } diff --git a/test/unit/org/apache/cassandra/index/sai/memory/VectorMemoryIndexTest.java b/test/unit/org/apache/cassandra/index/sai/memory/VectorMemoryIndexTest.java index dc11fc97b1fb..d5f921b87179 100644 --- a/test/unit/org/apache/cassandra/index/sai/memory/VectorMemoryIndexTest.java +++ b/test/unit/org/apache/cassandra/index/sai/memory/VectorMemoryIndexTest.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -38,6 +39,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; import java.util.stream.Collectors; import org.junit.Before; @@ -98,6 +100,7 @@ public class VectorMemoryIndexTest extends SAITester .build(); private static final double RECALL_THRESHOLD = 0.9; + private static final int VECTORS_PER_THREAD = 2000; private ColumnFamilyStore cfs; private StorageAttachedIndex index; @@ -206,6 +209,18 @@ public void randomQueryTest() throws Exception actualVectorsReturned >= expectedVectorsReturned * RECALL_THRESHOLD); } + @Test + public void testConcurrentAddsWithRandomVectors() throws Exception + { + testConcurrentAddsAreEventuallyConsistent((threadId, i) -> randomVectorFromThreadLocal()); + } + + @Test + public void testConcurrentAddsWithSharedVectors() throws Exception + { + testConcurrentAddsAreEventuallyConsistent((threadId, i) -> makeSharedVector(i)); + } + /** * Verifies that concurrent calls to add() do not corrupt the graph or lose data. *

@@ -214,23 +229,19 @@ public void randomQueryTest() throws Exception * and scratch arrays. This test validates the full stack from VectorMemoryIndex.index() * through OnHeapGraph.add() through GraphIndexBuilder.addGraphNode(). *

- * Each thread owns a disjoint range of partition key integers so every insert is a - * pure add with no PK collisions, isolating add() from update() semantics. - *

* After all writers complete, a full-ring search must return the vast majority of * inserted keys with valid scores, confirming no data was lost or corrupted. */ - @Test - public void testConcurrentAddsProduceConsistentFinalState() throws Exception + private void testConcurrentAddsAreEventuallyConsistent(BiFunction vectorFactory) throws Exception { Memtable memtable = Mockito.mock(Memtable.class); memtableIndex = new VectorMemoryIndex(index, memtable); int numThreads = Runtime.getRuntime().availableProcessors(); - int vectorsPerThread = 2000; - int totalInserted = numThreads * vectorsPerThread; + int totalInserted = numThreads * VECTORS_PER_THREAD; ExecutorService executor = Executors.newFixedThreadPool(numThreads); + // CyclicBarrier ensures all threads begin inserting simultaneously, // maximizing contention on GraphIndexBuilder and ConcurrentVectorValues. CyclicBarrier barrier = new CyclicBarrier(numThreads); @@ -243,11 +254,10 @@ public void testConcurrentAddsProduceConsistentFinalState() throws Exception try { barrier.await(); - for (int i = 0; i < vectorsPerThread; i++) + for (int i = 0; i < VECTORS_PER_THREAD; i++) { - // Partition key is globally unique across all threads - int pk = threadId * vectorsPerThread + i; - addRow(pk, randomVectorFromThreadLocal()); + int pk = threadId * VECTORS_PER_THREAD + i; + addRow(pk, vectorFactory.apply(threadId, i)); } } catch (BrokenBarrierException | InterruptedException e) @@ -296,7 +306,6 @@ public void testConcurrentAddsProduceConsistentFinalState() throws Exception { PrimaryKeyWithScore result = iterator.next(); assertNotNull("Null PrimaryKey in search results after concurrent adds", result.primaryKey()); - float score = result.score(); assertTrue("Non-finite score after concurrent adds: " + score, Float.isFinite(score)); @@ -312,8 +321,6 @@ public void testConcurrentAddsProduceConsistentFinalState() throws Exception } } - // ANN recall is approximate — use a tolerance consistent with randomQueryTest - // rather than asserting exact equality, which would be flaky at high dimensionCount. assertTrue("Search returned " + foundKeys.size() + " of " + totalInserted + " results after concurrent adds (expected at least " + (int) (totalInserted * RECALL_THRESHOLD) + ')', foundKeys.size() >= totalInserted * RECALL_THRESHOLD); } @@ -438,7 +445,12 @@ public void testConcurrentAddsAndSearchesNeverThrow() throws Exception PrimaryKeyWithScore result = it.next(); assertNotNull("Null PrimaryKey after writes settled", result.primaryKey()); assertTrue("Non-finite score after writes settled: " + result.score(), Float.isFinite(result.score())); + + // All vector components are drawn from [0, 1) via ThreadLocalRandom.nextFloat(), + // so every term in the dot product is non-negative and the sum is strictly positive. + // A score of 0f or below would indicate graph corruption, not a valid similarity result. assertTrue("Non-positive score after writes settled: " + result.score(), result.score() > 0f); + int pk = Int32Type.instance.compose(result.primaryKey().partitionKey().getKey()); foundAfterSettle.add(pk); } @@ -628,7 +640,12 @@ public void testConcurrentAddsAndOrderResultsByNeverThrow() throws Exception PrimaryKeyWithScore result = it.next(); assertNotNull("Null PrimaryKey after writes settled in orderResultsBy()", result.primaryKey()); assertTrue("Non-finite score after writes settled in orderResultsBy(): " + result.score(), Float.isFinite(result.score())); + + // All vector components are drawn from [0, 1) via ThreadLocalRandom.nextFloat(), + // so every term in the dot product is non-negative and the sum is strictly positive. + // A score of 0f or below would indicate graph corruption, not a valid similarity result. assertTrue("Non-positive score after writes settled in orderResultsBy(): " + result.score(), result.score() > 0f); + int pk = Int32Type.instance.compose(result.primaryKey().partitionKey().getKey()); foundAfterSettle.add(pk); } @@ -658,11 +675,6 @@ public void testConcurrentAddsAndOrderResultsByNeverThrow() throws Exception } } - /** - * Builds a sorted List from the current snapshot of keyMap. - * Mirrors the production input to orderResultsBy(): a pre-sorted list of keys - * that passed a prior non-ANN index scan, in natural PrimaryKey order. - */ private List buildSortedPrimaryKeySnapshot() { return keyMap.keySet() @@ -683,6 +695,13 @@ private ByteBuffer randomVectorFromThreadLocal() return VectorType.getInstance(FloatType.instance, dimensionCount).getSerializer().serialize(rawVector); } + private ByteBuffer makeSharedVector(int i) + { + List raw = new ArrayList<>(Collections.nCopies(dimensionCount - 1, 0.5f)); + raw.add(i / (float) VECTORS_PER_THREAD); + return VectorType.getInstance(FloatType.instance, dimensionCount).getSerializer().serialize(raw); + } + private Expression generateRandomExpression() { Expression expression = Expression.create(index); From 030278351efac9e206c6134cba226a70276423f1 Mon Sep 17 00:00:00 2001 From: Caleb Rackliffe Date: Tue, 30 Jun 2026 18:19:59 -0500 Subject: [PATCH 3/4] minor deduplication --- .../sai/memory/VectorMemoryIndexTest.java | 91 +++++++++---------- 1 file changed, 45 insertions(+), 46 deletions(-) diff --git a/test/unit/org/apache/cassandra/index/sai/memory/VectorMemoryIndexTest.java b/test/unit/org/apache/cassandra/index/sai/memory/VectorMemoryIndexTest.java index d5f921b87179..2537c7a8a4c3 100644 --- a/test/unit/org/apache/cassandra/index/sai/memory/VectorMemoryIndexTest.java +++ b/test/unit/org/apache/cassandra/index/sai/memory/VectorMemoryIndexTest.java @@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.junit.Before; @@ -164,14 +165,7 @@ public void randomQueryTest() throws Exception Set foundKeys = new HashSet<>(); int limit = getRandom().nextIntBetween(1, 100); - - ReadCommand command = PartitionRangeReadCommand.create(cfs.metadata(), - FBUtilities.nowInSeconds(), - ColumnFilter.all(cfs.metadata()), - RowFilter.none(), - DataLimits.cqlLimits(limit), - DataRange.allData(cfs.metadata().partitioner)); - + ReadCommand command = createRangeRead(limit); long expectedResults = Math.min(limit, keysInRange.size()); try (CloseableIterator iterator = memtableIndex.orderBy(new QueryContext(command, @@ -290,12 +284,7 @@ private void testConcurrentAddsAreEventuallyConsistent(BiFunction fullRing = new Range<>(partitioner.getMinimumToken().minKeyBound(), partitioner.getMinimumToken().minKeyBound()); Expression expression = generateRandomExpression(); - ReadCommand command = PartitionRangeReadCommand.create(cfs.metadata(), - FBUtilities.nowInSeconds(), - ColumnFilter.all(cfs.metadata()), - RowFilter.none(), - DataLimits.cqlLimits(totalInserted), - DataRange.allData(cfs.metadata().partitioner)); + ReadCommand command = createRangeRead(totalInserted); QueryContext queryContext = new QueryContext(command, DatabaseDescriptor.getRangeRpcTimeout(TimeUnit.MILLISECONDS)); Set foundKeys = new HashSet<>(); @@ -400,14 +389,7 @@ public void testConcurrentAddsAndSearchesNeverThrow() throws Exception barrier.await(); AbstractBounds fullRing = new Range<>(partitioner.getMinimumToken().minKeyBound(), partitioner.getMinimumToken().minKeyBound()); - - ReadCommand command = PartitionRangeReadCommand.create(cfs.metadata(), - FBUtilities.nowInSeconds(), - ColumnFilter.all(cfs.metadata()), - RowFilter.none(), - DataLimits.cqlLimits(totalInserted + preSeedCount), - DataRange.allData(cfs.metadata().partitioner)); - + ReadCommand command = createRangeRead(totalInserted + preSeedCount); QueryContext queryContext = new QueryContext(command, DatabaseDescriptor.getRangeRpcTimeout(TimeUnit.MILLISECONDS)); // Build query vectors inline — getRandom() is not thread-safe from @@ -522,6 +504,18 @@ public void testExpectedNodesVisitedRespectsBounds() } } + @Test + public void testConcurrentAddsAndOrderResultsByRandomVectors() throws Exception + { + testConcurrentAddsAndOrderResultsByNeverThrow((threadId, i) -> randomVectorFromThreadLocal()); + } + + @Test + public void testConcurrentAddsAndOrderResultsBySharedVectors() throws Exception + { + testConcurrentAddsAndOrderResultsByNeverThrow((threadId, i) -> makeSharedVector(i)); + } + /** * Verifies that orderResultsBy() never throws while concurrent add() calls are in * progress, and that the index reaches a consistent state once writes settle. @@ -540,16 +534,14 @@ public void testExpectedNodesVisitedRespectsBounds() * incomplete — this is intentional and mirrors the production path where the source * KeyRangeIterator only sees keys committed before the non-ANN index scan ran. */ - @Test - public void testConcurrentAddsAndOrderResultsByNeverThrow() throws Exception + private void testConcurrentAddsAndOrderResultsByNeverThrow(BiFunction vectorFactory) throws Exception { Memtable memtable = Mockito.mock(Memtable.class); memtableIndex = new VectorMemoryIndex(index, memtable); int numWriterThreads = Runtime.getRuntime().availableProcessors(); int numReaderThreads = Runtime.getRuntime().availableProcessors(); - int vectorsPerWriter = 2000; - int totalInserted = numWriterThreads * vectorsPerWriter; + int totalInserted = numWriterThreads * VECTORS_PER_THREAD; // Pre-seed rows so orderResultsBy() always has a non-empty [minimumKey, maximumKey] // window and a non-trivial resultsInRange list on the first reader pass. @@ -572,8 +564,11 @@ public void testConcurrentAddsAndOrderResultsByNeverThrow() throws Exception try { barrier.await(); - for (int i = 0; i < vectorsPerWriter; i++) - addRow(threadId * vectorsPerWriter + i, randomVectorFromThreadLocal()); + for (int i = 0; i < VECTORS_PER_THREAD; i++) + { + int pk = threadId * VECTORS_PER_THREAD + i; + addRow(pk, vectorFactory.apply(threadId, i)); + } } catch (Throwable e) { @@ -593,12 +588,7 @@ public void testConcurrentAddsAndOrderResultsByNeverThrow() throws Exception { barrier.await(); - ReadCommand command = PartitionRangeReadCommand.create(cfs.metadata(), - FBUtilities.nowInSeconds(), - ColumnFilter.all(cfs.metadata()), - RowFilter.none(), - DataLimits.cqlLimits(totalInserted + preSeedCount), - DataRange.allData(cfs.metadata().partitioner)); + ReadCommand command = createRangeRead(totalInserted + preSeedCount); QueryContext queryContext = new QueryContext(command, DatabaseDescriptor.getRangeRpcTimeout(TimeUnit.MILLISECONDS)); // --- Phase 1: search during concurrent writes --- @@ -675,6 +665,16 @@ public void testConcurrentAddsAndOrderResultsByNeverThrow() throws Exception } } + private PartitionRangeReadCommand createRangeRead(int limit) + { + return PartitionRangeReadCommand.create(cfs.metadata(), + FBUtilities.nowInSeconds(), + ColumnFilter.all(cfs.metadata()), + RowFilter.none(), + DataLimits.cqlLimits(limit), + DataRange.allData(cfs.metadata().partitioner)); + } + private List buildSortedPrimaryKeySnapshot() { return keyMap.keySet() @@ -684,17 +684,6 @@ private List buildSortedPrimaryKeySnapshot() .collect(Collectors.toList()); } - /** - * @return a thread-safe random vector {@link ByteBuffer} - */ - private ByteBuffer randomVectorFromThreadLocal() - { - List rawVector = new ArrayList<>(dimensionCount); - for (int i = 0; i < dimensionCount; i++) - rawVector.add(ThreadLocalRandom.current().nextFloat()); - return VectorType.getInstance(FloatType.instance, dimensionCount).getSerializer().serialize(rawVector); - } - private ByteBuffer makeSharedVector(int i) { List raw = new ArrayList<>(Collections.nCopies(dimensionCount - 1, 0.5f)); @@ -710,10 +699,20 @@ private Expression generateRandomExpression() } private ByteBuffer randomVector() + { + return randomVector(() -> getRandom().nextFloat()); + } + + private ByteBuffer randomVectorFromThreadLocal() + { + return randomVector(() -> ThreadLocalRandom.current().nextFloat()); + } + + private ByteBuffer randomVector(Supplier supplier) { List rawVector = new ArrayList<>(dimensionCount); for (int i = 0; i < dimensionCount; i++) - rawVector.add(getRandom().nextFloat()); + rawVector.add(supplier.get()); return VectorType.getInstance(FloatType.instance, dimensionCount).getSerializer().serialize(rawVector); } From 28c78bf6c6ef619f03248bd6441074de5a7d84c2 Mon Sep 17 00:00:00 2001 From: Caleb Rackliffe Date: Tue, 30 Jun 2026 18:26:46 -0500 Subject: [PATCH 4/4] - moved writeCount increment after the actual graph addition - cleaned up obsolete JavaDoc --- .../cassandra/index/sai/memory/VectorMemoryIndex.java | 2 +- .../index/sai/memory/VectorMemoryIndexTest.java | 9 --------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java b/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java index dba006bcb8c7..da1fd342eb9d 100644 --- a/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java +++ b/src/java/org/apache/cassandra/index/sai/memory/VectorMemoryIndex.java @@ -97,8 +97,8 @@ public long add(DecoratedKey key, Clustering clustering, ByteBuffer value) private long index(PrimaryKey primaryKey, ByteBuffer value) { - writeCount.increment(); long bytesUsed = graph.add(value, primaryKey, OnHeapGraph.InvalidVectorBehavior.FAIL); + writeCount.increment(); primaryKeys.add(primaryKey); updateKeyBounds(primaryKey); return bytesUsed; diff --git a/test/unit/org/apache/cassandra/index/sai/memory/VectorMemoryIndexTest.java b/test/unit/org/apache/cassandra/index/sai/memory/VectorMemoryIndexTest.java index 2537c7a8a4c3..6db414089556 100644 --- a/test/unit/org/apache/cassandra/index/sai/memory/VectorMemoryIndexTest.java +++ b/test/unit/org/apache/cassandra/index/sai/memory/VectorMemoryIndexTest.java @@ -520,15 +520,6 @@ public void testConcurrentAddsAndOrderResultsBySharedVectors() throws Exception * Verifies that orderResultsBy() never throws while concurrent add() calls are in * progress, and that the index reaches a consistent state once writes settle. *

- * orderResultsBy() reads the unsynchronized fields minimumKey and maximumKey to bound - * the materialized key list before scoring. A stale read of either field can silently - * drop valid keys. This test confirms: - *

    - *
  • Safety: no exceptions, no null PKs, no non-finite scores during concurrent writes.
  • - *
  • Liveness: after all writers finish, a call with the full key list returns at least - * RECALL_THRESHOLD of the inserted writer keys.
  • - *
- *

* The materialized key list passed to orderResultsBy() is built from keyMap, which is a * ConcurrentHashMap updated by every addRow() call. A snapshot taken mid-write may be * incomplete — this is intentional and mirrors the production path where the source