diff --git a/src/java/org/apache/cassandra/db/tries/InMemoryTrie.java b/src/java/org/apache/cassandra/db/tries/InMemoryTrie.java index c2cfb2b90106..1525c8a0a489 100644 --- a/src/java/org/apache/cassandra/db/tries/InMemoryTrie.java +++ b/src/java/org/apache/cassandra/db/tries/InMemoryTrie.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.IntPredicate; import com.google.common.annotations.VisibleForTesting; @@ -809,6 +810,17 @@ public interface UpsertTransformer * @return The combined value to use. Cannot be null. */ T apply(T existing, U update); + + /** + * Called when intermediate-node content is applied during a recursive put driven by an insertion policy + * (see {@link #putSingleton(ByteComparable, Object, UpsertTransformer, boolean, IntPredicate)}). + * The default implementation delegates to {@link #apply(Object, Object)}; transformers that need to + * distinguish intermediate (prefix) applications from terminal (exact) ones should override it. + */ + default T applyIntermediate(T existing, U update) + { + return apply(existing, update); + } } /** @@ -899,6 +911,69 @@ public void putRecursive(ByteComparable key, R value, final UpsertTransforme root = newRoot; } + /** + * A version of {@link #putSingleton(ByteComparable, Object, UpsertTransformer, boolean)} which, in addition to + * applying the value at the terminal node, also applies it as intermediate content at every depth selected by + * {@code accumulateIntermediateAtDepth}. Intermediate applications go through + * {@link UpsertTransformer#applyIntermediate}, while the terminal application uses {@link UpsertTransformer#apply}. + *

+ * Depth is 1-based on the key bytes (the first byte is depth 1); the empty prefix (depth 0, the root) is never + * accumulated. A null policy is equivalent to {@link #putSingleton(ByteComparable, Object, UpsertTransformer, boolean)}. + */ + public void putSingleton(ByteComparable key, + R value, + UpsertTransformer transformer, + boolean useRecursive, + IntPredicate accumulateIntermediateAtDepth) throws SpaceExhaustedException + { + if (accumulateIntermediateAtDepth == null) + { + putSingleton(key, value, transformer, useRecursive); + return; + } + putRecursiveWithPolicy(key, value, transformer, accumulateIntermediateAtDepth); + } + + @SuppressWarnings("unchecked") + public void putRecursiveWithPolicy(ByteComparable key, + R value, + UpsertTransformer transformer, + IntPredicate accumulateIntermediateAtDepth) throws SpaceExhaustedException + { + int newRoot = putRecursiveWithPolicy(root, key.asComparableBytes(BYTE_COMPARABLE_VERSION), value, + (UpsertTransformer) transformer, accumulateIntermediateAtDepth, 0); + if (newRoot != root) + root = newRoot; + } + + private int putRecursiveWithPolicy(int node, ByteSource key, R value, final UpsertTransformer transformer, + IntPredicate accumulateIntermediateAtDepth, int depth) throws SpaceExhaustedException + { + int transition = key.next(); + if (transition == ByteSource.END_OF_STREAM) + return applyContent(node, value, transformer, false); + + boolean appliedHere = false; + if (depth >= 1 && accumulateIntermediateAtDepth.test(depth)) + { + node = applyContent(node, value, transformer, true); + appliedHere = true; + } + + int child = getChild(node, transition); + + int newChild = putRecursiveWithPolicy(child, key, value, transformer, accumulateIntermediateAtDepth, depth + 1); + if (newChild == child && !appliedHere) + return node; + + int skippedContent = followContentTransition(node); + int attachedChild = !isNull(skippedContent) + ? attachChild(skippedContent, transition, newChild) // Single path, no copying required + : expandOrCreateChainNode(transition, newChild); + + return preserveContent(node, skippedContent, attachedChild); + } + private int putRecursive(int node, ByteSource key, R value, final UpsertTransformer transformer) throws SpaceExhaustedException { int transition = key.next(); @@ -920,25 +995,35 @@ private int putRecursive(int node, ByteSource key, R value, final UpsertTran } private int applyContent(int node, R value, UpsertTransformer transformer) throws SpaceExhaustedException + { + return applyContent(node, value, transformer, false); + } + + private int applyContent(int node, R value, UpsertTransformer transformer, boolean intermediate) throws SpaceExhaustedException { if (isNull(node)) - return ~addContent(transformer.apply(null, value)); + return ~addContent(combine(transformer, null, value, intermediate)); if (isLeaf(node)) { int contentIndex = ~node; - setContent(contentIndex, transformer.apply(getContent(contentIndex), value)); + setContent(contentIndex, combine(transformer, getContent(contentIndex), value, intermediate)); return node; } if (offset(node) == PREFIX_OFFSET) { int contentIndex = getInt(node + PREFIX_CONTENT_OFFSET); - setContent(contentIndex, transformer.apply(getContent(contentIndex), value)); + setContent(contentIndex, combine(transformer, getContent(contentIndex), value, intermediate)); return node; } else - return createPrefixNode(addContent(transformer.apply(null, value)), node, false); + return createPrefixNode(addContent(combine(transformer, null, value, intermediate)), node, false); + } + + private T combine(UpsertTransformer transformer, T existing, R value, boolean intermediate) + { + return intermediate ? transformer.applyIntermediate(existing, value) : transformer.apply(existing, value); } /** diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java index 1dba97d3ea5c..3f2a57a8b9f7 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java @@ -157,6 +157,7 @@ public class StorageAttachedIndex implements Index IndexWriterConfig.CONSTRUCTION_BEAM_WIDTH, IndexWriterConfig.SIMILARITY_FUNCTION, IndexWriterConfig.OPTIMIZE_FOR, + IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI, NonTokenizingOptions.CASE_SENSITIVE, NonTokenizingOptions.NORMALIZE, NonTokenizingOptions.ASCII); @@ -279,6 +280,17 @@ public static Map validateOptions(Map options, T AbstractAnalyzer.fromOptions(indexTermType, analysisOptions); IndexWriterConfig config = IndexWriterConfig.fromOptions(null, indexTermType, options); + if (options.containsKey(IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI)) + { + String val = options.get(IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI); + if (!"true".equalsIgnoreCase(val) && !"false".equalsIgnoreCase(val)) + throw new InvalidRequestException( + IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI + " must be 'true' or 'false', got: " + val); + if (!indexTermType.isLiteral()) + throw new InvalidRequestException( + IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI + " is only supported on string/literal columns"); + } + // If we are indexing map entries we need to validate the subtypes if (indexTermType.isComposite()) { @@ -450,7 +462,16 @@ public boolean dependsOn(ColumnMetadata column) @Override public boolean supportsExpression(ColumnMetadata column, Operator operator) { - return dependsOn(column) && indexTermType.supports(operator); + if (!dependsOn(column)) + return false; + + // LIKE is initially parsed as the generic operator (the specific variant is resolved from the bound value), + // so advertise support for both the generic LIKE and LIKE_PREFIX. Only prefix-enabled literal indexes qualify; + // other LIKE variants (suffix/contains/matches) are rejected at execution time. + if (operator == Operator.LIKE || operator == Operator.LIKE_PREFIX) + return indexTermType.isLiteral() && indexWriterConfig.isLiteralPrefixEnabled(); + + return indexTermType.supports(operator); } @Override diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/IndexWriterConfig.java b/src/java/org/apache/cassandra/index/sai/disk/v1/IndexWriterConfig.java index c9f011649069..427d691c19c8 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/IndexWriterConfig.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/IndexWriterConfig.java @@ -52,13 +52,15 @@ public class IndexWriterConfig public static final String OPTIMIZE_FOR = "optimize_for"; private static final OptimizeFor DEFAULT_OPTIMIZE_FOR = OptimizeFor.LATENCY; + + public static final String ENABLE_LITERAL_PREFIX_SAI = "enable_literal_prefix_sai"; private static final String validOptimizeFor = Arrays.stream(OptimizeFor.values()) .map(Enum::name) .collect(Collectors.joining(", ")); public static final int MAX_TOP_K = SAI_VECTOR_SEARCH_MAX_TOP_K.getInt(); - private static final IndexWriterConfig EMPTY_CONFIG = new IndexWriterConfig(-1, -1, null, null); + private static final IndexWriterConfig EMPTY_CONFIG = new IndexWriterConfig(-1, -1, null, null, false); // The maximum number of outgoing connections a node can have in a graph. private final int maximumNodeConnections; @@ -71,15 +73,19 @@ public class IndexWriterConfig private final OptimizeFor optimizeFor; + private final boolean literalPrefixEnabled; + public IndexWriterConfig(int maximumNodeConnections, int constructionBeamWidth, VectorSimilarityFunction similarityFunction, - OptimizeFor optimizerFor) + OptimizeFor optimizerFor, + boolean literalPrefixEnabled) { this.maximumNodeConnections = maximumNodeConnections; this.constructionBeamWidth = constructionBeamWidth; this.similarityFunction = similarityFunction; this.optimizeFor = optimizerFor; + this.literalPrefixEnabled = literalPrefixEnabled; } public int getMaximumNodeConnections() @@ -102,6 +108,11 @@ public OptimizeFor getOptimizeFor() return optimizeFor; } + public boolean isLiteralPrefixEnabled() + { + return literalPrefixEnabled; + } + public static IndexWriterConfig fromOptions(String indexName, IndexTermType indexTermType, Map options) { int maximumNodeConnections = DEFAULT_MAXIMUM_NODE_CONNECTIONS; @@ -178,7 +189,8 @@ public static IndexWriterConfig fromOptions(String indexName, IndexTermType inde } } } - return new IndexWriterConfig(maximumNodeConnections, queueSize, similarityFunction, optimizeFor); + boolean literalPrefixEnabled = "true".equalsIgnoreCase(options.get(ENABLE_LITERAL_PREFIX_SAI)); + return new IndexWriterConfig(maximumNodeConnections, queueSize, similarityFunction, optimizeFor, literalPrefixEnabled); } public static IndexWriterConfig emptyConfig() diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java index 795d5c1a0d6b..2181d79a7372 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.index.sai.disk.PerColumnIndexWriter; import org.apache.cassandra.index.sai.disk.RowMapping; @@ -35,11 +36,14 @@ import org.apache.cassandra.index.sai.disk.format.IndexDescriptor; import org.apache.cassandra.index.sai.disk.v1.bbtree.NumericIndexWriter; import org.apache.cassandra.index.sai.disk.v1.segment.SegmentMetadata; +import org.apache.cassandra.index.sai.disk.v1.segment.SegmentTrieBuffer; import org.apache.cassandra.index.sai.disk.v1.segment.SegmentWriter; import org.apache.cassandra.index.sai.disk.v1.trie.LiteralIndexWriter; import org.apache.cassandra.index.sai.memory.MemtableIndex; import org.apache.cassandra.index.sai.memory.MemtableTermsIterator; import org.apache.cassandra.index.sai.metrics.IndexMetrics; +import org.apache.cassandra.index.sai.postings.PostingList; +import org.apache.cassandra.index.sai.utils.IndexEntry; import org.apache.cassandra.index.sai.utils.IndexIdentifier; import org.apache.cassandra.index.sai.utils.IndexTermType; import org.apache.cassandra.index.sai.utils.PrimaryKey; @@ -55,6 +59,7 @@ public class MemtableIndexWriter implements PerColumnIndexWriter { private static final Logger logger = LoggerFactory.getLogger(MemtableIndexWriter.class); private static final int NO_ROWS = -1; + private static final int MAX_RECURSIVE_TERM_LENGTH = 128; private final IndexDescriptor indexDescriptor; private final IndexTermType indexTermType; @@ -62,6 +67,7 @@ public class MemtableIndexWriter implements PerColumnIndexWriter private final IndexMetrics indexMetrics; private final MemtableIndex memtable; private final RowMapping rowMapping; + private final boolean literalPrefixEnabled; private PrimaryKey minKey; private PrimaryKey maxKey; @@ -73,7 +79,8 @@ public MemtableIndexWriter(MemtableIndex memtable, IndexTermType indexTermType, IndexIdentifier indexIdentifier, IndexMetrics indexMetrics, - RowMapping rowMapping) + RowMapping rowMapping, + boolean literalPrefixEnabled) { assert rowMapping != null && rowMapping != RowMapping.DUMMY : "Row mapping must exist during FLUSH."; @@ -83,6 +90,7 @@ public MemtableIndexWriter(MemtableIndex memtable, this.indexMetrics = indexMetrics; this.memtable = memtable; this.rowMapping = rowMapping; + this.literalPrefixEnabled = literalPrefixEnabled; } @Override @@ -176,13 +184,39 @@ public void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException private long flush(MemtableTermsIterator terms) throws IOException { - SegmentWriter writer = indexTermType.isLiteral() ? new LiteralIndexWriter(indexDescriptor, indexIdentifier) - : new NumericIndexWriter(indexDescriptor, - indexIdentifier, - indexTermType.fixedSizeOf()); + SegmentMetadata.ComponentMetadataMap indexMetas; + long numRows; - SegmentMetadata.ComponentMetadataMap indexMetas = writer.writeCompleteSegment(terms); - long numRows = writer.getNumberOfRows(); + if (indexTermType.isLiteral() && literalPrefixEnabled) + { + int skip = CassandraRelevantProperties.SAI_POSTINGS_SKIP.getInt(); + SegmentTrieBuffer buffer = new SegmentTrieBuffer(depth -> depth % skip == 0); + + while (terms.hasNext()) + { + IndexEntry entry = terms.next(); + try (PostingList postings = entry.postingList) + { + long rowId; + while ((rowId = postings.nextPosting()) != PostingList.END_OF_STREAM) + buffer.add(entry.term, MAX_RECURSIVE_TERM_LENGTH, (int) rowId); + } + } + + LiteralIndexWriter writer = new LiteralIndexWriter(indexDescriptor, indexIdentifier); + indexMetas = writer.writeCompleteSegment(buffer.iterator(), true); + numRows = writer.getNumberOfRows(); + } + else + { + SegmentWriter writer = indexTermType.isLiteral() ? new LiteralIndexWriter(indexDescriptor, indexIdentifier) + : new NumericIndexWriter(indexDescriptor, + indexIdentifier, + indexTermType.fixedSizeOf()); + + indexMetas = writer.writeCompleteSegment(terms); + numRows = writer.getNumberOfRows(); + } // If no rows were written we need to delete any created column index components // so that the index is correctly identified as being empty (only having a completion marker) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java index ba8b13ca4c42..7423ddea84bc 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java @@ -170,7 +170,8 @@ public PerColumnIndexWriter newPerColumnIndexWriter(StorageAttachedIndex index, index.termType(), index.identifier(), index.indexMetrics(), - rowMapping); + rowMapping, + index.indexWriterConfig().isLiteralPrefixEnabled()); } @Override diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/postings/PostingsReader.java b/src/java/org/apache/cassandra/index/sai/disk/v1/postings/PostingsReader.java index 34329f51ab45..70067fa2e3c4 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/postings/PostingsReader.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/postings/PostingsReader.java @@ -52,6 +52,12 @@ public class PostingsReader implements OrdinalPostingList private final QueryEventListener.PostingListEventListener listener; private final BlocksSummary summary; + // Block range [startBlock, endBlock) this reader is scoped to, and the number of postings to read. + // For a full (V1) posting list these are 0, numBlocks and numPostings respectively. + private final int startBlock; + private final int endBlock; + private final long limit; + // Current block index private int blockIndex; // Current posting index within block @@ -69,13 +75,75 @@ public PostingsReader(IndexInput input, long summaryOffset, QueryEventListener.P } public PostingsReader(IndexInput input, BlocksSummary summary, QueryEventListener.PostingListEventListener listener) throws IOException + { + this(input, summary, listener, 0, summary.numBlocks(), summary.numPostings); + } + + /** + * Creates a reader scoped to a single block-aligned section of a posting list. + * + * @param startBlock first block (inclusive) of the section; its {@code firstPosting} VLong is read fresh + * @param endBlock last block (exclusive) of the section, used to bound skip-table binary search + * @param postingsCount number of postings to read from the section + */ + private PostingsReader(IndexInput input, BlocksSummary summary, QueryEventListener.PostingListEventListener listener, + int startBlock, int endBlock, long postingsCount) throws IOException { this.input = input; this.seekingInput = new SeekingRandomAccessInput(input); this.listener = listener; this.summary = summary; + this.startBlock = startBlock; + this.endBlock = endBlock; + this.limit = postingsCount; + this.blockIndex = startBlock; + + if (postingsCount > 0) + reBuffer(); + } - reBuffer(); + /** + * Opens a reader over the exact-match section ({@code [0, prefixIndex)}) of a V2 posting list. + */ + public static PostingsReader exactSection(IndexInput input, BlocksSummary summary, + QueryEventListener.PostingListEventListener listener) throws IOException + { + int exactBlocks = blocksFor(summary.prefixIndex, summary.blockSize); + return new PostingsReader(input, summary, listener, 0, exactBlocks, summary.prefixIndex); + } + + /** + * Opens a reader over the prefix section ({@code [prefixIndex, suffixIndex)}) of a V2 posting list. + * Returns null if there are no prefix postings. + */ + public static PostingsReader prefixSection(IndexInput input, BlocksSummary summary, + QueryEventListener.PostingListEventListener listener) throws IOException + { + int prefixCount = summary.suffixIndex - summary.prefixIndex; + if (prefixCount <= 0) + return null; + int exactBlocks = blocksFor(summary.prefixIndex, summary.blockSize); + return new PostingsReader(input, summary, listener, exactBlocks, summary.numBlocks(), prefixCount); + } + + /** + * Opens a reader over both exact and prefix sections ({@code [0, suffixIndex)}) of a V2 posting list. + * This reads exact and prefix postings in a single contiguous read, which is more efficient than + * two separate I/O operations. Returns null if there are no postings in either section. + */ + public static PostingsReader combinedExactAndPrefixSections(IndexInput input, BlocksSummary summary, + QueryEventListener.PostingListEventListener listener) throws IOException + { + int combinedCount = summary.suffixIndex; // Exact + prefix postings + if (combinedCount <= 0) + return null; + int combinedBlocks = blocksFor(summary.suffixIndex, summary.blockSize); + return new PostingsReader(input, summary, listener, 0, combinedBlocks, combinedCount); + } + + private static int blocksFor(int postings, int blockSize) + { + return (postings + blockSize - 1) / blockSize; } @Override @@ -88,18 +156,43 @@ public static class BlocksSummary { private final IndexInput input; final int blockSize; - final int numPostings; + public final int numPostings; + // V2 section boundaries expressed as posting counts. For V1 / exact-only lists both equal numPostings. + public final int prefixIndex; + public final int suffixIndex; final LongArray offsets; final LongArray maxValues; public BlocksSummary(IndexInput input, long offset) throws IOException + { + this(input, offset, false); + } + + public BlocksSummary(IndexInput input, long offset, boolean isV2) throws IOException { this.input = input; input.seek(offset); + + int pIdx = -1; + int sIdx = -1; + if (isV2) + { + pIdx = input.readVInt(); + sIdx = input.readVInt(); + } + this.blockSize = input.readVInt(); //TODO This should need to change because we can potentially end up with postings of more than Integer.MAX_VALUE? this.numPostings = input.readVInt(); + if (!isV2) + { + pIdx = numPostings; + sIdx = numPostings; + } + this.prefixIndex = pIdx; + this.suffixIndex = sIdx; + SeekingRandomAccessInput randomAccessInput = new SeekingRandomAccessInput(input); int numBlocks = input.readVInt(); long maxBlockValuesLength = input.readVLong(); @@ -117,7 +210,12 @@ public BlocksSummary(IndexInput input, long offset) throws IOException this.maxValues = new LongArrayReader(lvValues, numBlocks); } - void close() + int numBlocks() + { + return Math.toIntExact(offsets.length()); + } + + public void close() { FileUtils.closeQuietly(input); } @@ -164,7 +262,7 @@ public void close() @Override public long size() { - return summary.numPostings; + return limit; } /** @@ -206,7 +304,7 @@ public long advance(long targetRowID) throws IOException private long slowAdvance(long targetRowID) throws IOException { - while (totalPostingsRead < summary.numPostings) + while (totalPostingsRead < limit) { long segmentRowId = peekNext(); @@ -225,8 +323,8 @@ private long slowAdvance(long targetRowID) throws IOException // crossing blocks, the preceeding block index private int binarySearchBlocks(long targetRowID) { - int lowBlockIndex = blockIndex - 1; - int highBlockIndex = Math.toIntExact(summary.maxValues.length()) - 1; + int lowBlockIndex = Math.max(blockIndex - 1, startBlock); + int highBlockIndex = endBlock - 1; // in current block if (lowBlockIndex <= highBlockIndex && targetRowID <= summary.maxValues.get(lowBlockIndex)) @@ -253,7 +351,7 @@ else if (maxValueOfMidBlock > targetRowID) // This following check is to see if we have a duplicate value in the last entry of the // preceeding block. This check is only going to be successful if the entire current // block is full of duplicates. - if (midBlockIndex > 0 && summary.maxValues.get(midBlockIndex - 1) == targetRowID) + if (midBlockIndex > startBlock && summary.maxValues.get(midBlockIndex - 1) == targetRowID) { // there is a duplicate in the preceeding block so restrict search to finish // at that block @@ -293,7 +391,7 @@ public long nextPosting() throws IOException private long peekNext() throws IOException { - if (totalPostingsRead >= summary.numPostings) + if (totalPostingsRead >= limit) { return END_OF_STREAM; } @@ -329,7 +427,7 @@ private void reBuffer() throws IOException } input.seek(pointer); - long left = summary.numPostings - totalPostingsRead; + long left = limit - totalPostingsRead; assert left > 0; readFoRBlock(input); @@ -340,7 +438,7 @@ private void reBuffer() throws IOException private void readFoRBlock(IndexInput in) throws IOException { - if (blockIndex == 0) + if (blockIndex == startBlock) actualPosting = in.readVLong(); byte bitsPerValue = in.readByte(); diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/postings/PostingsWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/postings/PostingsWriter.java index 3f6b058f6984..5d56b94ff1a1 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/postings/PostingsWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/postings/PostingsWriter.java @@ -204,6 +204,67 @@ public long getTotalPostings() return totalPostings; } + /** + * Writes a V2 posting list with separate exact and prefix sections. + *

+ * On-disk layout: {@code [exact FOR blocks][prefix FOR blocks][V2 BLOCK SUMMARY]}. Each section is + * block-aligned (its final partial block is flushed before the next section starts) and the first block + * of each section carries its own {@code firstPosting} VLong, so the sections can be read independently. + * The V2 block summary prepends {@code prefixIndex} (= number of exact postings) and {@code suffixIndex} + * (= exact + prefix postings) before the standard summary fields. + * + * @param exactPostings ascending row IDs for the exact section; may be null/empty + * @param prefixPostings ascending row IDs for the prefix section; may be null/empty + * @return file offset to the V2 block summary + */ + public long writeV2(PostingList exactPostings, PostingList prefixPostings) throws IOException + { + resetBlockCounters(); + blockOffsets.clear(); + blockMaximumPostings.clear(); + + int exactCount = writeSection(exactPostings); + int prefixCount = writeSection(prefixPostings); + int totalCount = exactCount + prefixCount; + + assert totalCount > 0 : "V2 posting list must have at least one posting"; + + final long summaryOffset = dataOutput.getFilePointer(); + // V2 header: prefixIndex (count of exact postings) then suffixIndex (exact + prefix), then standard summary. + dataOutput.writeVInt(exactCount); + dataOutput.writeVInt(totalCount); + writeSummary(totalCount); + return summaryOffset; + } + + /** + * Writes one section's postings as a self-contained run of FOR blocks (the first block carries its own + * {@code firstPosting}). Returns the number of postings written. + */ + private int writeSection(PostingList postings) throws IOException + { + if (postings == null) + return 0; + + // Reset the delta base so this section starts fresh (its first block writes firstPosting). + lastPosting = Long.MIN_VALUE; + resetBlockCounters(); + + int count = 0; + long posting; + while ((posting = postings.nextPosting()) != PostingList.END_OF_STREAM) + { + writePosting(posting); + count++; + totalPostings++; + } + + if (count > 0) + finish(); + + return count; + } + private void writePosting(long posting) throws IOException { if (lastPosting == Long.MIN_VALUE) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/LiteralIndexSegmentSearcher.java b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/LiteralIndexSegmentSearcher.java index 429d81d74fdf..dcc656da423e 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/LiteralIndexSegmentSearcher.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/LiteralIndexSegmentSearcher.java @@ -19,6 +19,7 @@ package org.apache.cassandra.index.sai.disk.v1.segment; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Map; import com.google.common.base.MoreObjects; @@ -34,10 +35,12 @@ import org.apache.cassandra.index.sai.disk.format.IndexComponent; import org.apache.cassandra.index.sai.disk.v1.PerColumnIndexFiles; import org.apache.cassandra.index.sai.disk.v1.SAICodecUtils; +import org.apache.cassandra.index.sai.disk.v1.trie.LiteralIndexWriter; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; import org.apache.cassandra.index.sai.metrics.MulticastQueryEventListeners; import org.apache.cassandra.index.sai.metrics.QueryEventListener; import org.apache.cassandra.index.sai.plan.Expression; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.bytecomparable.ByteComparable; /** @@ -65,8 +68,9 @@ public class LiteralIndexSegmentSearcher extends IndexSegmentSearcher Map map = metadata.componentMetadatas.get(IndexComponent.TERMS_DATA).attributes; String footerPointerString = map.get(SAICodecUtils.FOOTER_POINTER); long footerPointer = footerPointerString == null ? -1 : Long.parseLong(footerPointerString); + boolean isV2 = LiteralIndexWriter.POSTINGS_FORMAT_V2.equals(map.get(LiteralIndexWriter.POSTINGS_FORMAT)); - reader = new LiteralIndexSegmentTermsReader(index.identifier(), indexFiles.termsData(), indexFiles.postingLists(), root, footerPointer); + reader = new LiteralIndexSegmentTermsReader(index.identifier(), indexFiles.termsData(), indexFiles.postingLists(), root, footerPointer, isV2); } @Override @@ -82,14 +86,44 @@ public KeyRangeIterator search(Expression expression, AbstractBounds index.termType().asComparableBytes(prefixValue, v); + ByteBuffer successor = prefixSuccessor(prefixValue); + ByteComparable end = successor == null ? null : v -> index.termType().asComparableBytes(successor, v); + return toPrimaryKeyIterator(reader.prefixMatch(start, end, listener, queryContext), queryContext); + } + if (!expression.getIndexOperator().isEquality()) throw new IllegalArgumentException(index.identifier().logMessage("Unsupported expression: " + expression)); ByteComparable term = v -> index.termType().asComparableBytes(expression.lower().value.encoded, v); - QueryEventListener.TrieIndexEventListener listener = MulticastQueryEventListeners.of(queryContext, perColumnEventListener); return toPrimaryKeyIterator(reader.exactMatch(term, listener, queryContext), queryContext); } + /** + * Computes the lexicographic successor of the given raw prefix bytes: the byte array with its last non-{@code 0xFF} + * byte incremented and any trailing {@code 0xFF} bytes removed. Returns null (an unbounded upper bound) when every + * byte is {@code 0xFF}. + */ + private static ByteBuffer prefixSuccessor(ByteBuffer prefix) + { + byte[] bytes = ByteBufferUtil.getArray(prefix); + int last = bytes.length - 1; + while (last >= 0 && (bytes[last] & 0xFF) == 0xFF) + last--; + + if (last < 0) + return null; + + byte[] successor = java.util.Arrays.copyOf(bytes, last + 1); + successor[last]++; + return ByteBuffer.wrap(successor); + } + @Override public String toString() { diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/LiteralIndexSegmentTermsReader.java b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/LiteralIndexSegmentTermsReader.java index e18f6256b36c..d5642b8da90f 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/LiteralIndexSegmentTermsReader.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/LiteralIndexSegmentTermsReader.java @@ -19,10 +19,14 @@ import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; +import javax.annotation.Nullable; + import org.apache.lucene.store.IndexInput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +34,7 @@ import org.apache.cassandra.exceptions.QueryCancelledException; import org.apache.cassandra.index.sai.QueryContext; import org.apache.cassandra.index.sai.disk.io.IndexFileUtils; +import org.apache.cassandra.index.sai.disk.v1.postings.MergePostingList; import org.apache.cassandra.index.sai.disk.v1.postings.PostingsReader; import org.apache.cassandra.index.sai.disk.v1.trie.TrieTermsDictionaryReader; import org.apache.cassandra.index.sai.metrics.QueryEventListener; @@ -62,17 +67,29 @@ public class LiteralIndexSegmentTermsReader implements Closeable private final FileHandle termDictionaryFile; private final FileHandle postingsFile; private final long termDictionaryRoot; + private final boolean isV2; public LiteralIndexSegmentTermsReader(IndexIdentifier indexIdentifier, FileHandle termsData, FileHandle postingLists, long root, long termsFooterPointer) throws IOException + { + this(indexIdentifier, termsData, postingLists, root, termsFooterPointer, false); + } + + public LiteralIndexSegmentTermsReader(IndexIdentifier indexIdentifier, + FileHandle termsData, + FileHandle postingLists, + long root, + long termsFooterPointer, + boolean isV2) throws IOException { this.indexIdentifier = indexIdentifier; termDictionaryFile = termsData; postingsFile = postingLists; termDictionaryRoot = root; + this.isV2 = isV2; try (final IndexInput indexInput = IndexFileUtils.instance.openInput(termDictionaryFile)) { @@ -98,6 +115,50 @@ public PostingList exactMatch(ByteComparable term, QueryEventListener.TrieIndexE return new TermQuery(term, perQueryEventListener, context).execute(); } + /** + * Returns a posting list of all rows whose indexed term starts with the queried prefix (this includes a row whose + * term equals the prefix exactly). Requires a V2 (prefix-enabled) segment. + * + * @param start the prefix term (inclusive lower bound of the trie scan) + * @param end the lexicographic successor of the prefix, or null for an unbounded upper bound + */ + public PostingList prefixMatch(ByteComparable start, ByteComparable end, + QueryEventListener.TrieIndexEventListener perQueryEventListener, QueryContext context) + { + perQueryEventListener.onSegmentHit(); + return new PrefixQuery(start, end, perQueryEventListener, context).execute(); + } + + /** + * Counts nodes visited by {@link PrefixQuery#collectFromNode} during a prefix traversal. + * Populated only when passed to {@link #prefixMatchWithStats}; the production + * {@link #prefixMatch} path passes {@code null} and pays zero overhead. + */ + @VisibleForTesting + public static class TraversalStats + { + /** Nodes where {@code suffixIndex > prefixIndex}: combined section used, subtree skipped. */ + public int combinedSectionHits; + /** Nodes where {@code prefixIndex > 0} but no combined section: exact section read, recursion continued. */ + public int exactSectionHits; + /** Nodes with no payload: recursion only. */ + public int emptyNodes; + } + + /** + * Like {@link #prefixMatch} but also populates {@code stats} with counts of each traversal + * branch taken. For use in tests only. + */ + @VisibleForTesting + public PostingList prefixMatchWithStats(ByteComparable start, ByteComparable end, + QueryEventListener.TrieIndexEventListener listener, + QueryContext context, + TraversalStats stats) + { + listener.onSegmentHit(); + return new PrefixQuery(start, end, listener, context, stats).execute(); + } + @VisibleForTesting public class TermQuery { @@ -168,9 +229,169 @@ public long lookupPostingsOffset(ByteComparable term) public PostingsReader getPostingsReader(long offset) throws IOException { - PostingsReader.BlocksSummary header = new PostingsReader.BlocksSummary(postingsSummaryInput, offset); + PostingsReader.BlocksSummary header = new PostingsReader.BlocksSummary(postingsSummaryInput, offset, isV2); + + if (isV2) + return PostingsReader.exactSection(postingsInput, header, listener.postingListEventListener()); return new PostingsReader(postingsInput, header, listener.postingListEventListener()); } } + + /** + * Collects, from every term in the trie range {@code [start, end]}, a posting list of the rows for that term, and + * merges them into a single ascending {@link PostingList}. The candidate set may slightly over-include at the + * upper bound; the query layer applies the {@code LIKE} predicate as an exact post-filter. + */ + public class PrefixQuery + { + private final ByteComparable start; + private final QueryEventListener.TrieIndexEventListener listener; + private final QueryContext context; + @Nullable private final TraversalStats stats; + + PrefixQuery(ByteComparable start, ByteComparable end, QueryEventListener.TrieIndexEventListener listener, QueryContext context) + { + this(start, end, listener, context, null); + } + + PrefixQuery(ByteComparable start, ByteComparable end, QueryEventListener.TrieIndexEventListener listener, QueryContext context, + @Nullable TraversalStats stats) + { + this.start = start; + // end is unused: the trie DFS from the prefix node naturally covers the full subtree. + this.listener = listener; + this.context = context; + this.stats = stats; + } + + public PostingList execute() + { + List readers = new ArrayList<>(); + try + { + // BBTree-style single-pass DFS: navigate to the prefix node, then recursively + // collect posting lists. At each node, if it has a combined (exact+prefix) section + // the entire subtree is covered — add it and stop recursing (analogous to + // BlockBalancedTreeReader.collectPostingLists returning when postingsIndex.exists()). + try (TrieTermsDictionaryReader trieReader = new TrieTermsDictionaryReader( + termDictionaryFile.instantiateRebufferer(null), termDictionaryRoot)) + { + long prefixNode = trieReader.followToPrefix(start); + if (prefixNode != TrieTermsDictionaryReader.NOT_FOUND) + collectFromNode(trieReader, prefixNode, readers); + } + + context.checkpoint(); + + if (readers.isEmpty()) + return null; + if (readers.size() == 1) + return readers.get(0); + return MergePostingList.merge(readers); + } + catch (Throwable e) + { + readers.forEach(FileUtils::closeQuietly); + if (!(e instanceof QueryCancelledException)) + logger.error(indexIdentifier.logMessage("Failed to execute prefix query"), e); + throw Throwables.cleaned(e); + } + } + + /** + * Recursively collects posting lists from the subtree rooted at {@code nodePos}. + *

+ * Mirrors {@link org.apache.cassandra.index.sai.disk.v1.bbtree.BlockBalancedTreeReader}'s + * {@code collectPostingLists()}: + *

+ */ + private void collectFromNode(TrieTermsDictionaryReader trieReader, long nodePos, List readers) throws IOException + { + long offset = trieReader.payloadAt(nodePos); + + if (offset != TrieTermsDictionaryReader.NOT_FOUND) + { + // Peek at the BlocksSummary to determine which sections are present. + // The peek input is closed immediately; readExactAndPrefixForOffset / + // addReaderForTerm each open their own inputs for the actual readers. + int prefixIndex, suffixIndex; + try (IndexInput peek = IndexFileUtils.instance.openInput(postingsFile)) + { + PostingsReader.BlocksSummary summary = new PostingsReader.BlocksSummary(peek, offset, true); + prefixIndex = summary.prefixIndex; + suffixIndex = summary.suffixIndex; + } + + if (suffixIndex > prefixIndex) + { + // Combined section covers the entire subtree — add it and stop. + if (stats != null) stats.combinedSectionHits++; + readers.add(readExactAndPrefixForOffset(offset)); + return; + } + + if (prefixIndex > 0) + { + if (stats != null) stats.exactSectionHits++; + addReaderForTerm(offset, readers); + } + } + else + { + if (stats != null) stats.emptyNodes++; + } + + // Collect children before recursing so Walker position is not clobbered mid-loop. + for (long child : trieReader.childrenOf(nodePos)) + collectFromNode(trieReader, child, readers); + } + + /** + * Reads exact and prefix sections together for a given postings offset. + */ + private PostingList readExactAndPrefixForOffset(long offset) throws IOException + { + IndexInput postings = IndexFileUtils.instance.openInput(postingsFile); + IndexInput summaryInput = IndexFileUtils.instance.openInput(postingsFile); + PostingsReader.BlocksSummary readSummary = new PostingsReader.BlocksSummary(summaryInput, offset, true); + + PostingList combined = PostingsReader.combinedExactAndPrefixSections(postings, readSummary, listener.postingListEventListener()); + if (combined == null) + { + FileUtils.closeQuietly(postings); + readSummary.close(); + } + return combined; + } + + /** Adds the exact-match posting list for a single term's payload offset. */ + private void addReaderForTerm(long offset, List readers) + { + try + { + IndexInput postings = IndexFileUtils.instance.openInput(postingsFile); + IndexInput summaryInput = IndexFileUtils.instance.openInput(postingsFile); + PostingsReader.BlocksSummary summary = new PostingsReader.BlocksSummary(summaryInput, offset, true); + + if (summary.prefixIndex > 0) + { + readers.add(PostingsReader.exactSection(postings, summary, listener.postingListEventListener())); + } + else + { + FileUtils.closeQuietly(postings); + summary.close(); + } + } + catch (IOException e) + { + throw Throwables.unchecked(e); + } + } + } } diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/PackedLongValuesList.java b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/PackedLongValuesList.java new file mode 100644 index 000000000000..cc463c825393 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/PackedLongValuesList.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sai.disk.v1.segment; + +import java.util.NoSuchElementException; + +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.packed.PackedInts; +import org.apache.lucene.util.packed.PackedLongValues; + +/** + * Holds one {@link PackedLongValues} section per {@link PostingType}, used as the trie-node payload in + * {@link SegmentTrieBuffer}. + *

+ * Currently two sections are active: {@code EXACT(0)} and {@code PREFIX(1)}. {@code SUFFIX(2)} is reserved — + * set {@link #FILTER_TYPES} to 3 when suffix search is added. + */ +public class PackedLongValuesList implements Accountable +{ + /** Increment to 3 when SUFFIX is implemented. */ + static final int FILTER_TYPES = 2; + + private final PackedLongValues exact; + private final PackedLongValues prefix; + + private PackedLongValuesList(PackedLongValues exact, PackedLongValues prefix) + { + this.exact = exact; + this.prefix = prefix; + } + + /** Number of exact-match postings (= prefixIndex in the V2 on-disk format). */ + public int exactCount() + { + return (int) exact.size(); + } + + /** Number of prefix postings. */ + public int prefixCount() + { + return (int) prefix.size(); + } + + /** Total postings (= suffixIndex in the V2 on-disk format). */ + public int totalCount() + { + return exactCount() + prefixCount(); + } + + @Override + public long ramBytesUsed() + { + return exact.ramBytesUsed() + prefix.ramBytesUsed(); + } + + /** Iterator over the exact-section row IDs, ascending. */ + public PackedLongValues.Iterator exactIterator() + { + return exact.iterator(); + } + + /** Iterator over the prefix-section row IDs, ascending. */ + public PackedLongValues.Iterator prefixIterator() + { + return prefix.iterator(); + } + + /** + * Iterator emitting, in order: {@code exactCount}, {@code totalCount}, all exact row IDs, then all prefix + * row IDs. This header-then-sections layout lets {@link SegmentTrieBuffer} expose the node payload as a + * single {@link org.apache.cassandra.index.sai.postings.PostingList}. + */ + public Iterator iterator() + { + return new Iterator(); + } + + public final class Iterator + { + private int headerIdx = 0; + private final PackedLongValues.Iterator exactIt = exact.iterator(); + private final PackedLongValues.Iterator prefixIt = prefix.iterator(); + + public boolean hasNext() + { + return headerIdx < 2 || exactIt.hasNext() || prefixIt.hasNext(); + } + + public long next() + { + if (headerIdx == 0) + { + headerIdx++; + return exactCount(); + } + if (headerIdx == 1) + { + headerIdx++; + return totalCount(); + } + if (exactIt.hasNext()) + return exactIt.next(); + if (prefixIt.hasNext()) + return prefixIt.next(); + throw new NoSuchElementException(); + } + } + + public static class Builder implements Accountable + { + private final PackedLongValues.Builder exactBuilder = PackedLongValues.deltaPackedBuilder(PackedInts.COMPACT); + private final PackedLongValues.Builder prefixBuilder = PackedLongValues.deltaPackedBuilder(PackedInts.COMPACT); + + /** + * @param rowId segment row ID + * @param type the {@link PostingType} determining which section receives the row ID + */ + public Builder add(long rowId, PostingType type) + { + switch (type) + { + case EXACT: + exactBuilder.add(rowId); + break; + case PREFIX: + prefixBuilder.add(rowId); + break; + default: + throw new IllegalArgumentException("Unhandled PostingType: " + type); + } + return this; + } + + public PackedLongValuesList build() + { + return new PackedLongValuesList(exactBuilder.build(), prefixBuilder.build()); + } + + @Override + public long ramBytesUsed() + { + return exactBuilder.ramBytesUsed() + prefixBuilder.ramBytesUsed(); + } + } +} diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/PostingType.java b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/PostingType.java new file mode 100644 index 000000000000..3c8569b64448 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/PostingType.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sai.disk.v1.segment; + +/** + * Classifies a trie node posting as an exact match or an intermediate (prefix) posting. + * The {@link #id} is the section index in {@link PackedLongValuesList}. + *

+ * To add suffix search: add {@code SUFFIX(2)} here and set {@link PackedLongValuesList#FILTER_TYPES} to 3. + */ +public enum PostingType +{ + EXACT(0), // terminal node — this term equals the indexed value + PREFIX(1); // intermediate node — this term is a prefix of the indexed value + // SUFFIX(2) — reserved; add when suffix search is implemented + + public final int id; + + PostingType(int id) + { + this.id = id; + } +} diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentBuilder.java b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentBuilder.java index 6793638428ae..ecd69ebab939 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentBuilder.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntPredicate; import javax.annotation.concurrent.NotThreadSafe; @@ -28,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.index.sai.StorageAttachedIndex; import org.apache.cassandra.index.sai.disk.format.IndexDescriptor; import org.apache.cassandra.index.sai.disk.v1.bbtree.NumericIndexWriter; @@ -77,12 +79,22 @@ public abstract class SegmentBuilder public static class TrieSegmentBuilder extends SegmentBuilder { protected final SegmentTrieBuffer segmentTrieBuffer; + private final boolean prefixEnabled; public TrieSegmentBuilder(StorageAttachedIndex index, NamedMemoryLimiter limiter) { super(index, limiter); - segmentTrieBuffer = new SegmentTrieBuffer(); + this.prefixEnabled = index.termType().isLiteral() && index.indexWriterConfig().isLiteralPrefixEnabled(); + + IntPredicate prefixAtDepth = null; + if (prefixEnabled) + { + int skip = CassandraRelevantProperties.SAI_POSTINGS_SKIP.getInt(); + prefixAtDepth = depth -> depth % skip == 0; + } + + segmentTrieBuffer = new SegmentTrieBuffer(prefixAtDepth); totalBytesAllocated = segmentTrieBuffer.memoryUsed(); } @@ -95,9 +107,13 @@ protected long addInternal(ByteBuffer term, int segmentRowId) @Override protected SegmentMetadata.ComponentMetadataMap flushInternal(IndexDescriptor indexDescriptor) throws IOException { - SegmentWriter writer = index.termType().isLiteral() ? new LiteralIndexWriter(indexDescriptor, index.identifier()) - : new NumericIndexWriter(indexDescriptor, index.identifier(), index.termType().fixedSizeOf()); + if (index.termType().isLiteral()) + { + LiteralIndexWriter writer = new LiteralIndexWriter(indexDescriptor, index.identifier()); + return writer.writeCompleteSegment(segmentTrieBuffer.iterator(), prefixEnabled); + } + NumericIndexWriter writer = new NumericIndexWriter(indexDescriptor, index.identifier(), index.termType().fixedSizeOf()); return writer.writeCompleteSegment(segmentTrieBuffer.iterator()); } diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentTrieBuffer.java b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentTrieBuffer.java index 96e307d84ff0..ea6e7f2a6865 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentTrieBuffer.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentTrieBuffer.java @@ -20,10 +20,10 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.atomic.LongAdder; +import java.util.function.IntPredicate; import javax.annotation.concurrent.NotThreadSafe; -import org.apache.lucene.util.packed.PackedInts; import org.apache.lucene.util.packed.PackedLongValues; import org.apache.cassandra.config.DatabaseDescriptor; @@ -34,21 +34,41 @@ import org.apache.cassandra.utils.bytecomparable.ByteComparable; /** - * On-heap buffer for values that provides a sorted view of itself as an {@link Iterator}. + * On-heap buffer for indexed terms and row IDs backed by an {@link InMemoryTrie} that provides a sorted view of + * itself as an {@link Iterator}. + *

+ * When a non-null {@code prefixAtDepth} policy is provided, each {@link #add} call accumulates intermediate-node + * (prefix) postings at every depth where {@code prefixAtDepth.test(depth)} is true; the terminal node always + * receives an exact posting. Each trie node stores a {@link PackedLongValuesList.Builder} holding one section per + * {@link PostingType}. {@link #iterator()} yields all entries (leaf + intermediate nodes that received any + * postings) in sorted order; each entry's {@link PostingList} emits {@code exactCount}, {@code totalCount}, then + * all exact row IDs followed by all prefix row IDs. */ @NotThreadSafe public class SegmentTrieBuffer { private static final int MAX_RECURSIVE_TERM_LENGTH = 128; - private final InMemoryTrie trie; + private final InMemoryTrie trie; private final PostingsAccumulator postingsAccumulator; + private final IntPredicate prefixAtDepth; // null = no intermediate (prefix) accumulation private int numRows; + /** V1 — no intermediate (prefix) accumulation. */ public SegmentTrieBuffer() + { + this(null); + } + + /** + * @param prefixAtDepth nullable depth policy; when non-null, prefix postings are accumulated at every depth + * for which {@code prefixAtDepth.test(depth)} returns true. Null means V1 (exact-only). + */ + public SegmentTrieBuffer(IntPredicate prefixAtDepth) { trie = new InMemoryTrie<>(DatabaseDescriptor.getMemtableAllocationType().toBufferType()); postingsAccumulator = new PostingsAccumulator(); + this.prefixAtDepth = prefixAtDepth; } public int numRows() @@ -68,7 +88,7 @@ public long add(ByteComparable term, int termLength, int segmentRowId) try { - trie.putSingleton(term, segmentRowId, postingsAccumulator, termLength <= MAX_RECURSIVE_TERM_LENGTH); + trie.putSingleton(term, segmentRowId, postingsAccumulator, termLength <= MAX_RECURSIVE_TERM_LENGTH, prefixAtDepth); } catch (InMemoryTrie.SpaceExhaustedException e) { @@ -81,7 +101,7 @@ public long add(ByteComparable term, int termLength, int segmentRowId) public Iterator iterator() { - Iterator> iterator = trie.entrySet().iterator(); + Iterator> iterator = trie.entrySet().iterator(); return new Iterator<>() { @@ -94,49 +114,92 @@ public boolean hasNext() @Override public IndexEntry next() { - Map.Entry entry = iterator.next(); - PackedLongValues postings = entry.getValue().build(); - PackedLongValues.Iterator postingsIterator = postings.iterator(); - return IndexEntry.create(entry.getKey(), new PostingList() - { - @Override - public long nextPosting() - { - if (postingsIterator.hasNext()) - return postingsIterator.next(); - return END_OF_STREAM; - } - - @Override - public long size() - { - return postings.size(); - } - - @Override - public long advance(long targetRowID) - { - throw new UnsupportedOperationException(); - } - }); + Map.Entry entry = iterator.next(); + PackedLongValuesList list = entry.getValue().build(); + return IndexEntry.create(entry.getKey(), prefixAtDepth == null ? rawPostings(list) + : sectionedPostings(list)); + } + }; + } + + /** V1 posting list: raw exact row IDs only (numeric and non-prefix literal indexes). */ + private static PostingList rawPostings(PackedLongValuesList list) + { + PackedLongValues.Iterator exactIterator = list.exactIterator(); + return new PostingList() + { + @Override + public long nextPosting() + { + return exactIterator.hasNext() ? exactIterator.next() : END_OF_STREAM; + } + + @Override + public long size() + { + return list.exactCount(); + } + + @Override + public long advance(long targetRowID) + { + throw new UnsupportedOperationException(); + } + }; + } + + /** V2 posting list: emits exactCount, totalCount, then exact rows followed by prefix rows. */ + private static PostingList sectionedPostings(PackedLongValuesList list) + { + PackedLongValuesList.Iterator listIterator = list.iterator(); + return new PostingList() + { + @Override + public long nextPosting() + { + return listIterator.hasNext() ? listIterator.next() : END_OF_STREAM; + } + + @Override + public long size() + { + // FILTER_TYPES header values followed by the actual postings. + return PackedLongValuesList.FILTER_TYPES + list.totalCount(); + } + + @Override + public long advance(long targetRowID) + { + throw new UnsupportedOperationException(); } }; } - private static class PostingsAccumulator implements InMemoryTrie.UpsertTransformer + private static class PostingsAccumulator implements InMemoryTrie.UpsertTransformer { private final LongAdder heapAllocations = new LongAdder(); @Override - public PackedLongValues.Builder apply(PackedLongValues.Builder existing, Integer rowID) + public PackedLongValuesList.Builder apply(PackedLongValuesList.Builder existing, Integer rowID) + { + return applyWithType(existing, rowID, PostingType.EXACT); + } + + @Override + public PackedLongValuesList.Builder applyIntermediate(PackedLongValuesList.Builder existing, Integer rowID) + { + return applyWithType(existing, rowID, PostingType.PREFIX); + } + + private PackedLongValuesList.Builder applyWithType(PackedLongValuesList.Builder existing, int rowID, PostingType type) { if (existing == null) { - existing = PackedLongValues.deltaPackedBuilder(PackedInts.COMPACT); + existing = new PackedLongValuesList.Builder(); heapAllocations.add(existing.ramBytesUsed()); } long ramBefore = existing.ramBytesUsed(); - existing.add(rowID); + existing.add(rowID, type); heapAllocations.add(existing.ramBytesUsed() - ramBefore); return existing; } diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/trie/LiteralIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/trie/LiteralIndexWriter.java index 2c3a68525cf6..3ab8777c456a 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/trie/LiteralIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/trie/LiteralIndexWriter.java @@ -18,6 +18,7 @@ package org.apache.cassandra.index.sai.disk.v1.trie; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -26,12 +27,14 @@ import org.apache.commons.lang3.mutable.MutableLong; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.index.sai.disk.format.IndexComponent; import org.apache.cassandra.index.sai.disk.format.IndexDescriptor; import org.apache.cassandra.index.sai.disk.v1.SAICodecUtils; import org.apache.cassandra.index.sai.disk.v1.postings.PostingsWriter; import org.apache.cassandra.index.sai.disk.v1.segment.SegmentMetadata; import org.apache.cassandra.index.sai.disk.v1.segment.SegmentWriter; +import org.apache.cassandra.index.sai.postings.IntArrayPostingList; import org.apache.cassandra.index.sai.postings.PostingList; import org.apache.cassandra.index.sai.utils.IndexEntry; import org.apache.cassandra.index.sai.utils.IndexIdentifier; @@ -42,6 +45,10 @@ @NotThreadSafe public class LiteralIndexWriter implements SegmentWriter { + /** Attribute key on TERMS_DATA marking a segment as using the V2 (prefix-enabled) postings format. */ + public static final String POSTINGS_FORMAT = "postings_format"; + public static final String POSTINGS_FORMAT_V2 = "v2"; + private final IndexDescriptor indexDescriptor; private final IndexIdentifier indexIdentifier; private long postingsAdded; @@ -54,9 +61,27 @@ public LiteralIndexWriter(IndexDescriptor indexDescriptor, IndexIdentifier index @Override public SegmentMetadata.ComponentMetadataMap writeCompleteSegment(Iterator iterator) throws IOException + { + return writeCompleteSegment(iterator, false); + } + + /** + * Writes the terms dictionary and postings lists for a segment. + * + * @param iterator sorted entries. When {@code prefixEnabled} is false each entry's posting list holds raw + * row IDs. When true, each entry's posting list emits {@code exactCount}, {@code totalCount}, + * then all exact row IDs followed by all prefix row IDs + * (see {@link org.apache.cassandra.index.sai.disk.v1.segment.SegmentTrieBuffer}). + * @param prefixEnabled when true, eligible nodes are written using the V2 posting list format and the segment is + * tagged with {@code postings_format = v2} + */ + public SegmentMetadata.ComponentMetadataMap writeCompleteSegment(Iterator iterator, boolean prefixEnabled) throws IOException { SegmentMetadata.ComponentMetadataMap components = new SegmentMetadata.ComponentMetadataMap(); + final int minimumLeaves = prefixEnabled ? CassandraRelevantProperties.SAI_MINIMUM_POSTINGS_LEAVES.getInt() + : Integer.MAX_VALUE; + try (TrieTermsDictionaryWriter termsDictionaryWriter = new TrieTermsDictionaryWriter(indexDescriptor, indexIdentifier); PostingsWriter postingsWriter = new PostingsWriter(indexDescriptor, indexIdentifier)) { @@ -69,7 +94,40 @@ public SegmentMetadata.ComponentMetadataMap writeCompleteSegment(Iterator 0; + boolean writePrefixSection = prefixCount >= minimumLeaves; + + if (!isTerminal && !writePrefixSection) + { + // Pure intermediate node below the prefix threshold: no on-disk entry (descent will reach leaves). + drain(postings, totalCount); + continue; + } + + int[] exactRows = drainSortedInts(postings, exactCount); + + int[] prefixRows = null; + if (writePrefixSection && prefixCount > 0) + prefixRows = drainSortedInts(postings, prefixCount); + else + drain(postings, prefixCount); + + PostingList exactPostings = exactCount > 0 ? new IntArrayPostingList(exactRows) : null; + PostingList prefixPostings = prefixRows != null ? new IntArrayPostingList(prefixRows) : null; + + long offset = postingsWriter.writeV2(exactPostings, prefixPostings); termsDictionaryWriter.add(indexEntry.term, offset); } } @@ -83,6 +141,8 @@ public SegmentMetadata.ComponentMetadataMap writeCompleteSegment(Iterator map = new HashMap<>(2); map.put(SAICodecUtils.FOOTER_POINTER, footerPointer.getValue().toString()); + if (prefixEnabled) + map.put(POSTINGS_FORMAT, POSTINGS_FORMAT_V2); // Postings list file pointers are stored directly in TERMS_DATA, so a root is not needed. components.put(IndexComponent.POSTING_LISTS, -1, postingsOffset, postingsLength); @@ -91,6 +151,24 @@ public SegmentMetadata.ComponentMetadataMap writeCompleteSegment(Iterator + * The {@link ByteSource} encoding for string types ends with an {@code ESCAPE} (0x00) terminator + * before {@code END_OF_STREAM}. Using a one-byte look-ahead, we detect the terminator before + * following it: when the current byte is {@code ESCAPE} and the next is {@code END_OF_STREAM}, + * we stop at the current node (which is exactly the prefix subtree root) rather than following + * the terminator into the exact-match child. This correctly handles both: + *

    + *
  • Prefixes that are NOT terms themselves (e.g. "grp42x" when only "grp42x…" variants exist)
  • + *
  • Prefixes that ARE terms (e.g. "exact" when both "exact" and "exact_*" are indexed)
  • + *
+ * + * @return the trie node position if the full prefix path exists in the trie, or {@link #NOT_FOUND} + * when no indexed term starts with {@code prefix}. + */ + public long followToPrefix(ByteComparable prefix) + { + ByteSource stream = prefix.asComparableBytes(BYTE_COMPARABLE_VERSION); + go(root); + + int cur = stream.next(); + while (cur != ByteSource.END_OF_STREAM) + { + int next = stream.next(); // one-byte look-ahead + + // ESCAPE (0x00) followed by END_OF_STREAM is the null-escape terminator — do NOT follow it. + // The current node is the prefix subtree root: all children represent "prefix*" terms. + if (cur == ByteSource.ESCAPE && next == ByteSource.END_OF_STREAM) + return position; + + int childIndex = search(cur); + if (childIndex < 0) + return NOT_FOUND; // a content byte is not in the trie → no terms start with prefix + go(transition(childIndex)); + cur = next; + } + return position; // all bytes consumed without failure + } + + /** + * Positions the walker at {@code nodePos} and returns its payload (postings file offset), + * or {@link #NOT_FOUND} if the node carries no payload. + */ + public long payloadAt(long nodePos) + { + go(nodePos); + return getCurrentPayload(); + } + + /** + * Positions the walker at {@code nodePos} and returns the file positions of all non-null children, + * in transition-byte order. Children are collected into an array before returning so that the caller + * can safely recurse without the walker position being clobbered mid-iteration. + */ + public long[] childrenOf(long nodePos) + { + go(nodePos); + int count = transitionRange(); + long[] children = new long[count]; + int filled = 0; + for (int i = 0; i < count; i++) + { + long child = transition(i); + if (child != NONE) + children[filled++] = child; + } + return filled == count ? children : Arrays.copyOf(children, filled); + } + private long getCurrentPayload() { return getPayloadAt(buf, payloadPosition(), payloadFlags()); @@ -86,9 +158,8 @@ private long getCurrentPayload() private long getPayloadAt(ByteBuffer contents, int payloadPos, int bytes) { if (bytes == 0) - { return NOT_FOUND; - } return SizedInts.read(contents, payloadPos, bytes); } } + diff --git a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java index d66eb47e72fc..5c53a056d411 100644 --- a/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java +++ b/src/java/org/apache/cassandra/index/sai/memory/TrieMemoryIndex.java @@ -19,6 +19,7 @@ package org.apache.cassandra.index.sai.memory; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -49,6 +50,7 @@ import org.apache.cassandra.index.sai.utils.IndexIdentifier; import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.index.sai.utils.PrimaryKeys; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.bytecomparable.ByteComparable; @@ -156,6 +158,12 @@ public KeyRangeIterator search(QueryContext queryContext, Expression expression, if (keyCount > MINIMUM_PRIORITY_QUEUE_SIZE) lastPriorityQueueSize.set(keyCount); return keyIterator; + case LIKE_PREFIX: + KeyRangeIterator prefixIterator = prefixMatch(expression, keyRange); + int prefixKeyCount = (int) prefixIterator.getMaxKeys(); + if (prefixKeyCount > MINIMUM_PRIORITY_QUEUE_SIZE) + lastPriorityQueueSize.set(prefixKeyCount); + return prefixIterator; default: throw new IllegalArgumentException("Unsupported expression: " + expression); } @@ -360,6 +368,50 @@ private KeyRangeIterator rangeMatch(Expression expression, AbstractBounds keyRange) + { + ByteBuffer prefixBuffer = expression.lower().value.encoded; + ByteComparable lowerBound = asComparableBytes(prefixBuffer); + ByteBuffer successor = prefixSuccessor(prefixBuffer); + ByteComparable upperBound = successor == null ? null : asComparableBytes(successor); + + Collector cd = new Collector(keyRange, lastPriorityQueueSize.get()); + Iterator values = data.subtrie(lowerBound, true, upperBound, false).valueIterator(); + + while (values.hasNext()) + cd.processContent(values.next()); + + if (cd.mergedKeys.isEmpty()) + return KeyRangeIterator.empty(); + + return new InMemoryKeyRangeIterator(cd.mergedKeys.peek(), cd.maximumKey, cd.mergedKeys); + } + + /** + * Computes the lexicographic successor of the given raw prefix bytes: the byte array with its last non-{@code 0xFF} + * byte incremented and any trailing {@code 0xFF} bytes removed. Returns null (an unbounded upper bound) when every + * byte is {@code 0xFF}. For order-preserving literal encodings this is the smallest value greater than all values + * starting with the prefix. + */ + private static ByteBuffer prefixSuccessor(ByteBuffer prefix) + { + byte[] bytes = ByteBufferUtil.getArray(prefix); + int last = bytes.length - 1; + while (last >= 0 && (bytes[last] & 0xFF) == 0xFF) + last--; + + if (last < 0) + return null; + + byte[] successor = Arrays.copyOf(bytes, last + 1); + successor[last]++; + return ByteBuffer.wrap(successor); + } + private static class PrimaryKeysReducer implements InMemoryTrie.UpsertTransformer { private final LongAdder heapAllocations = new LongAdder(); diff --git a/src/java/org/apache/cassandra/index/sai/utils/IndexTermType.java b/src/java/org/apache/cassandra/index/sai/utils/IndexTermType.java index e5c75a53a24a..569918845574 100644 --- a/src/java/org/apache/cassandra/index/sai/utils/IndexTermType.java +++ b/src/java/org/apache/cassandra/index/sai/utils/IndexTermType.java @@ -671,11 +671,13 @@ public boolean supports(Operator operator) { if (operator == Operator.LIKE || operator == Operator.LIKE_CONTAINS || - operator == Operator.LIKE_PREFIX || operator == Operator.LIKE_MATCHES || operator == Operator.LIKE_SUFFIX || operator == Operator.IN) return false; + if (operator == Operator.LIKE_PREFIX) + return isLiteral(); + // ANN is only supported against vectors, and vector indexes only support ANN if (operator == Operator.ANN) return isVector(); diff --git a/src/java/org/apache/cassandra/index/sai/view/RangeTermTree.java b/src/java/org/apache/cassandra/index/sai/view/RangeTermTree.java index f2680d7587da..10c0fd9db8ae 100644 --- a/src/java/org/apache/cassandra/index/sai/view/RangeTermTree.java +++ b/src/java/org/apache/cassandra/index/sai/view/RangeTermTree.java @@ -54,7 +54,12 @@ private RangeTermTree(ByteBuffer min, ByteBuffer max, IntervalTree search(Expression e) { ByteBuffer minTerm = e.lower() == null ? min : e.lower().value.encoded; - ByteBuffer maxTerm = e.upper() == null ? max : e.upper().value.encoded; + // For a prefix query lower == upper == the prefix, but a segment whose terms are all strictly greater than + // the prefix (e.g. maxTerm "apple" for prefix "app") still contains matches. Use the global max as the upper + // bound so such segments are not pruned. + ByteBuffer maxTerm = e.getIndexOperator() == Expression.IndexOperator.LIKE_PREFIX + ? max + : (e.upper() == null ? max : e.upper().value.encoded); return rangeTree.search(Interval.create(new Term(minTerm, indexTermType), new Term(maxTerm, indexTermType), diff --git a/test/unit/org/apache/cassandra/index/sai/StorageAttachedIndexPrefixOptionValidationTest.java b/test/unit/org/apache/cassandra/index/sai/StorageAttachedIndexPrefixOptionValidationTest.java new file mode 100644 index 000000000000..699edec73620 --- /dev/null +++ b/test/unit/org/apache/cassandra/index/sai/StorageAttachedIndexPrefixOptionValidationTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sai; + +import java.util.Map; + +import org.junit.Test; + +import org.apache.cassandra.cql3.statements.schema.IndexTarget; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.index.sai.disk.v1.IndexWriterConfig; +import org.apache.cassandra.schema.TableMetadata; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class StorageAttachedIndexPrefixOptionValidationTest +{ + @Test + public void shouldAllowLiteralPrefixOptionOnLiteralColumn() + { + Map options = options("val", "true"); + assertTrue(StorageAttachedIndex.validateOptions(options, textTable()).isEmpty()); + } + + @Test + public void shouldRejectLiteralPrefixOptionOnNonLiteralColumn() + { + try + { + StorageAttachedIndex.validateOptions(options("val", "true"), intTable()); + fail("Expected InvalidRequestException"); + } + catch (InvalidRequestException e) + { + assertTrue(e.getMessage().contains(IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI + " is only supported on string/literal columns")); + } + } + + @Test + public void shouldRejectInvalidLiteralPrefixOptionValue() + { + try + { + StorageAttachedIndex.validateOptions(options("val", "maybe"), textTable()); + fail("Expected InvalidRequestException"); + } + catch (InvalidRequestException e) + { + assertTrue(e.getMessage().contains(IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI + " must be 'true' or 'false'")); + } + } + + private static Map options(String target, String value) + { + return Map.of(IndexTarget.CUSTOM_INDEX_OPTION_NAME, StorageAttachedIndex.class.getCanonicalName(), + IndexTarget.TARGET_OPTION_NAME, target, + IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI, value); + } + + private static TableMetadata textTable() + { + return TableMetadata.builder("ks", "tbl") + .addPartitionKeyColumn("pk", Int32Type.instance) + .addRegularColumn("val", UTF8Type.instance) + .partitioner(Murmur3Partitioner.instance) + .build(); + } + + private static TableMetadata intTable() + { + return TableMetadata.builder("ks", "tbl") + .addPartitionKeyColumn("pk", Int32Type.instance) + .addRegularColumn("val", Int32Type.instance) + .partitioner(Murmur3Partitioner.instance) + .build(); + } +} diff --git a/test/unit/org/apache/cassandra/index/sai/cql/PrefixSearchCQLTest.java b/test/unit/org/apache/cassandra/index/sai/cql/PrefixSearchCQLTest.java new file mode 100644 index 000000000000..cb7865ffc2ee --- /dev/null +++ b/test/unit/org/apache/cassandra/index/sai/cql/PrefixSearchCQLTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sai.cql; + +import org.junit.Test; + +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.index.sai.SAITester; + +public class PrefixSearchCQLTest extends SAITester +{ + private void createPrefixTable() + { + createTable("CREATE TABLE %s (id int PRIMARY KEY, name text)"); + createIndex("CREATE INDEX ON %s(name) USING 'sai' WITH OPTIONS = {'enable_literal_prefix_sai': 'true'}"); + } + + @Test + public void testPrefixFromMemtable() throws Throwable + { + createPrefixTable(); + execute("INSERT INTO %s (id, name) VALUES (1, 'apple')"); + execute("INSERT INTO %s (id, name) VALUES (2, 'application')"); + execute("INSERT INTO %s (id, name) VALUES (3, 'app')"); + execute("INSERT INTO %s (id, name) VALUES (4, 'car')"); + + assertRowCount(execute("SELECT * FROM %s WHERE name LIKE 'app%%'"), 3); + assertRowCount(execute("SELECT * FROM %s WHERE name LIKE 'appl%%'"), 2); + assertRowCount(execute("SELECT * FROM %s WHERE name LIKE 'car%%'"), 1); + assertRowCount(execute("SELECT * FROM %s WHERE name LIKE 'xyz%%'"), 0); + } + + @Test + public void testPrefixIncludesExactTermFromMemtable() throws Throwable + { + createPrefixTable(); + execute("INSERT INTO %s (id, name) VALUES (1, 'foo')"); + execute("INSERT INTO %s (id, name) VALUES (2, 'foobar')"); + + // 'foo%' matches both 'foo' (exact) and 'foobar' + assertRowCount(execute("SELECT * FROM %s WHERE name LIKE 'foo%%'"), 2); + } + + @Test + public void testExactMatchStillWorksOnPrefixIndexMemtable() throws Throwable + { + createPrefixTable(); + execute("INSERT INTO %s (id, name) VALUES (1, 'apple')"); + execute("INSERT INTO %s (id, name) VALUES (2, 'car')"); + + assertRows(execute("SELECT id FROM %s WHERE name = 'apple'"), row(1)); + assertRows(execute("SELECT id FROM %s WHERE name = 'car'"), row(2)); + assertRowCount(execute("SELECT * FROM %s WHERE name = 'ap'"), 0); + } + + @Test + public void testPrefixFromSSTable() throws Throwable + { + createPrefixTable(); + execute("INSERT INTO %s (id, name) VALUES (1, 'apple')"); + execute("INSERT INTO %s (id, name) VALUES (2, 'application')"); + execute("INSERT INTO %s (id, name) VALUES (3, 'app')"); + execute("INSERT INTO %s (id, name) VALUES (4, 'car')"); + + flush(); + + assertRowCount(execute("SELECT * FROM %s WHERE name LIKE 'app%%'"), 3); + assertRowCount(execute("SELECT * FROM %s WHERE name LIKE 'appl%%'"), 2); + assertRowCount(execute("SELECT * FROM %s WHERE name LIKE 'car%%'"), 1); + assertRowCount(execute("SELECT * FROM %s WHERE name LIKE 'xyz%%'"), 0); + } + + @Test + public void testPrefixIncludesExactTermFromSSTable() throws Throwable + { + createPrefixTable(); + execute("INSERT INTO %s (id, name) VALUES (1, 'foo')"); + execute("INSERT INTO %s (id, name) VALUES (2, 'foobar')"); + + flush(); + + assertRowCount(execute("SELECT * FROM %s WHERE name LIKE 'foo%%'"), 2); + } + + @Test + public void testExactMatchStillWorksOnPrefixIndexSSTable() throws Throwable + { + createPrefixTable(); + execute("INSERT INTO %s (id, name) VALUES (1, 'apple')"); + execute("INSERT INTO %s (id, name) VALUES (2, 'car')"); + + flush(); + + assertRows(execute("SELECT id FROM %s WHERE name = 'apple'"), row(1)); + assertRows(execute("SELECT id FROM %s WHERE name = 'car'"), row(2)); + } + + @Test + public void testPrefixAcrossMemtableAndSSTable() throws Throwable + { + createPrefixTable(); + execute("INSERT INTO %s (id, name) VALUES (1, 'apple')"); + execute("INSERT INTO %s (id, name) VALUES (2, 'apricot')"); + flush(); + execute("INSERT INTO %s (id, name) VALUES (3, 'application')"); + execute("INSERT INTO %s (id, name) VALUES (4, 'banana')"); + + // 'ap%' spans the flushed SSTable (apple, apricot) and the live memtable (application) + assertRowCount(execute("SELECT * FROM %s WHERE name LIKE 'ap%%'"), 3); + assertRowCount(execute("SELECT * FROM %s WHERE name LIKE 'appl%%'"), 2); + } + + @Test + public void testLargePrefixUsesAggregatedSection() throws Throwable + { + createPrefixTable(); + // 100 distinct terms sharing the 3-char prefix "abc" (an eligible depth for postings_skip=3 with + // >= minimum_postings_leaves=64 rows), so an aggregated prefix section is written and the read path + // can jump straight to it instead of scanning every term. + for (int i = 0; i < 100; i++) + execute("INSERT INTO %s (id, name) VALUES (?, ?)", i, String.format("abc%04d", i)); + execute("INSERT INTO %s (id, name) VALUES (1000, 'zzz')"); + + flush(); + + // 'abc%' lands on the aggregated prefix node (fast path); 'ab%' is a shorter prefix that falls back to the + // range scan. Both must return all 100 matching rows. + assertRowCount(execute("SELECT * FROM %s WHERE name LIKE 'abc%%'"), 100); + assertRowCount(execute("SELECT * FROM %s WHERE name LIKE 'ab%%'"), 100); + assertRowCount(execute("SELECT * FROM %s WHERE name LIKE 'abc0001%%'"), 1); + assertRowCount(execute("SELECT * FROM %s WHERE name LIKE 'zzz%%'"), 1); + } + + @Test + public void testInvalidOptionOnNonStringColumn() throws Throwable + { + createTable("CREATE TABLE %s (id int PRIMARY KEY, score int)"); + assertInvalidThrow(InvalidRequestException.class, + "CREATE INDEX ON %s(score) USING 'sai' WITH OPTIONS = {'enable_literal_prefix_sai': 'true'}"); + } +} diff --git a/test/unit/org/apache/cassandra/index/sai/cql/PrefixSearchStressTest.java b/test/unit/org/apache/cassandra/index/sai/cql/PrefixSearchStressTest.java new file mode 100644 index 000000000000..58e49caa73b9 --- /dev/null +++ b/test/unit/org/apache/cassandra/index/sai/cql/PrefixSearchStressTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sai.cql; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.index.sai.SAITester; +import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings; + +/** + * Compares the latency of {@code LIKE 'prefix%'} served by the SAI prefix index against the same query served by a + * full-table {@code ALLOW FILTERING} scan (the behaviour without the prefix feature), for both a selective prefix + * (a small subset of rows) and a non-selective prefix (every row). + */ +public class PrefixSearchStressTest extends SAITester +{ + private static final Logger logger = LoggerFactory.getLogger(PrefixSearchStressTest.class); + + private static final int ROWS = 20_000; + private static final int GROUPS = 100; // 100 distinct prefixes + private static final int ROWS_PER_GROUP = ROWS / GROUPS; // 200 rows per prefix + private static final int QUERY_ITERATIONS = 5; + + // A selective prefix matches one group (200 rows, 1%); the broad prefix matches everything. + private static final String SELECTIVE = "grp42x"; // depth 6 (eligible for postings_skip=3), 200 rows + private static final String BROAD = "grp"; // depth 3 (eligible), all 20,000 rows + + @Test + public void compareWithAndWithoutPrefixSearch() throws Throwable + { + // ---- WITH the SAI prefix index ---- + createTable("CREATE TABLE %s (id int PRIMARY KEY, name text)"); + createIndex("CREATE INDEX ON %s(name) USING 'sai' WITH OPTIONS = {'enable_literal_prefix_sai': 'true'}"); + populate(); + flush(); + + long withSelective = timeQuery("SELECT id FROM %s WHERE name LIKE '" + SELECTIVE + "%%'", ROWS_PER_GROUP); + long withBroad = timeQuery("SELECT id FROM %s WHERE name LIKE '" + BROAD + "%%'", ROWS); + + // ---- WITHOUT the prefix feature: a regular SAI index, LIKE served by an ALLOW FILTERING full scan ---- + createTable("CREATE TABLE %s (id int PRIMARY KEY, name text)"); + createIndex("CREATE INDEX ON %s(name) USING 'sai'"); + populate(); + flush(); + + long withoutSelective = timeQuery("SELECT id FROM %s WHERE name LIKE '" + SELECTIVE + "%%' ALLOW FILTERING", ROWS_PER_GROUP); + long withoutBroad = timeQuery("SELECT id FROM %s WHERE name LIKE '" + BROAD + "%%' ALLOW FILTERING", ROWS); + + String summary = String.format( + "%n==================== SAI prefix-search stress ====================%n" + + " dataset: %,d rows, %d prefixes of %,d rows each, best of %d runs%n" + + " query | WITH prefix idx | WITHOUT (filter) | speedup%n" + + " -----------------------------------+-----------------+------------------+--------%n" + + " LIKE '%s%%' (selective, %,4d rows) | %,12d us | %,13d us | %5.1fx%n" + + " LIKE '%s%%' (broad, %,6d rows) | %,12d us | %,13d us | %5.1fx%n" + + "==================================================================", + ROWS, GROUPS, ROWS_PER_GROUP, QUERY_ITERATIONS, + SELECTIVE, ROWS_PER_GROUP, withSelective, withoutSelective, ratio(withoutSelective, withSelective), + BROAD, ROWS, withBroad, withoutBroad, ratio(withoutBroad, withBroad)); + logger.info(summary); + System.out.println(summary); + } + + private static double ratio(long without, long with) + { + return without / Math.max(1.0, (double) with); + } + + private void populate() throws Throwable + { + // name = "grpNN" + 'x' + zero-padded id. Group NN = id % GROUPS, so each of the 100 prefixes "grpNNx" has + // ROWS_PER_GROUP rows. "grpNNx" is 6 chars (an eligible depth for postings_skip=3) with 200 rows + // (>= minimum_postings_leaves=64), so a selective prefix gets an aggregated section (the read fast path). + for (int i = 0; i < ROWS; i++) + execute("INSERT INTO %s (id, name) VALUES (?, ?)", i, String.format("grp%02dx%08d", i % GROUPS, i)); + } + + /** Runs the query {@link #QUERY_ITERATIONS} times and returns the best (minimum) wall time in microseconds. */ + private long timeQuery(String cql, int expectedRows) throws Throwable + { + long best = Long.MAX_VALUE; + for (int i = 0; i < QUERY_ITERATIONS; i++) + { + long start = System.nanoTime(); + UntypedResultSet result = runQuery(cql); + long elapsedUs = (System.nanoTime() - start) / 1_000; + assertRowCount(result, expectedRows); + best = Math.min(best, elapsedUs); + } + return best; + } + + /** Executes a read with the coordinator-warning lifecycle initialised (large result sets trip it otherwise). */ + private UntypedResultSet runQuery(String cql) throws Throwable + { + CoordinatorWarnings.init(); + try + { + return execute(cql); + } + finally + { + CoordinatorWarnings.reset(); + } + } +} diff --git a/test/unit/org/apache/cassandra/index/sai/disk/v1/IndexWriterConfigPrefixTest.java b/test/unit/org/apache/cassandra/index/sai/disk/v1/IndexWriterConfigPrefixTest.java new file mode 100644 index 000000000000..1fa1a1d3b9ff --- /dev/null +++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/IndexWriterConfigPrefixTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sai.disk.v1; + +import java.util.Collections; +import java.util.Map; + +import org.junit.Test; + +import org.apache.cassandra.cql3.statements.schema.IndexTarget; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.index.sai.utils.IndexTermType; +import org.apache.cassandra.schema.ColumnMetadata; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class IndexWriterConfigPrefixTest +{ + private static IndexTermType literalType() + { + ColumnMetadata col = ColumnMetadata.regularColumn("ks", "t", "v", UTF8Type.instance, ColumnMetadata.NO_UNIQUE_ID); + return IndexTermType.create(col, Collections.emptyList(), IndexTarget.Type.SIMPLE); + } + + @Test + public void testDisabledByDefault() + { + IndexWriterConfig cfg = IndexWriterConfig.fromOptions(null, literalType(), Map.of()); + assertFalse(cfg.isLiteralPrefixEnabled()); + } + + @Test + public void testEnabledWhenSet() + { + IndexWriterConfig cfg = IndexWriterConfig.fromOptions( + null, literalType(), + Map.of(IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI, "true")); + assertTrue(cfg.isLiteralPrefixEnabled()); + } +} diff --git a/test/unit/org/apache/cassandra/index/sai/disk/v1/postings/PostingsV2Test.java b/test/unit/org/apache/cassandra/index/sai/disk/v1/postings/PostingsV2Test.java new file mode 100644 index 000000000000..cf929be3c539 --- /dev/null +++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/postings/PostingsV2Test.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sai.disk.v1.postings; + +import java.io.IOException; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.index.sai.SAITester; +import org.apache.cassandra.index.sai.disk.ArrayPostingList; +import org.apache.cassandra.index.sai.disk.format.IndexComponent; +import org.apache.cassandra.index.sai.disk.format.IndexDescriptor; +import org.apache.cassandra.index.sai.metrics.QueryEventListener; +import org.apache.cassandra.index.sai.postings.PostingList; +import org.apache.cassandra.index.sai.utils.IndexIdentifier; +import org.apache.cassandra.index.sai.utils.SAIRandomizedTester; +import org.apache.lucene.store.IndexInput; + +import static org.junit.Assert.assertEquals; + +public class PostingsV2Test extends SAIRandomizedTester +{ + private IndexDescriptor indexDescriptor; + private IndexIdentifier indexIdentifier; + + @Before + public void setup() throws Throwable + { + indexDescriptor = newIndexDescriptor(); + indexIdentifier = SAITester.createIndexIdentifier(indexDescriptor.sstableDescriptor.ksname, + indexDescriptor.sstableDescriptor.cfname, + newIndex()); + } + + private IndexInput openInput() throws IOException + { + return indexDescriptor.openPerIndexInput(IndexComponent.POSTING_LISTS, indexIdentifier); + } + + @Test + public void testV2ExactSectionOnly() throws IOException + { + long summaryOffset; + try (PostingsWriter writer = new PostingsWriter(indexDescriptor, indexIdentifier)) + { + summaryOffset = writer.writeV2(new ArrayPostingList(0, 1, 2), null); + writer.complete(); + } + try (IndexInput input = openInput()) + { + PostingsReader.BlocksSummary summary = new PostingsReader.BlocksSummary(input, summaryOffset, true); + assertEquals(3, summary.prefixIndex); + assertEquals(3, summary.suffixIndex); + + try (PostingsReader reader = PostingsReader.exactSection(input, summary, + QueryEventListener.PostingListEventListener.NO_OP)) + { + assertEquals(0L, reader.nextPosting()); + assertEquals(1L, reader.nextPosting()); + assertEquals(2L, reader.nextPosting()); + assertEquals(PostingList.END_OF_STREAM, reader.nextPosting()); + } + assertEquals(null, PostingsReader.prefixSection(input, summary, + QueryEventListener.PostingListEventListener.NO_OP)); + } + } + + @Test + public void testV2WithPrefixSection() throws IOException + { + // exact=[5,10] prefix=[1,3,5,7,10] + long summaryOffset; + try (PostingsWriter writer = new PostingsWriter(indexDescriptor, indexIdentifier)) + { + summaryOffset = writer.writeV2(new ArrayPostingList(5, 10), + new ArrayPostingList(1, 3, 5, 7, 10)); + writer.complete(); + } + try (IndexInput input = openInput()) + { + PostingsReader.BlocksSummary summary = new PostingsReader.BlocksSummary(input, summaryOffset, true); + assertEquals(2, summary.prefixIndex); // 2 exact postings + assertEquals(7, summary.suffixIndex); // 2 exact + 5 prefix + + // Exact section reads [0, prefixIndex), sorted ascending. + try (PostingsReader exact = PostingsReader.exactSection(input, summary, + QueryEventListener.PostingListEventListener.NO_OP)) + { + assertEquals(5L, exact.nextPosting()); + assertEquals(10L, exact.nextPosting()); + assertEquals(PostingList.END_OF_STREAM, exact.nextPosting()); + } + } + try (IndexInput input = openInput()) + { + PostingsReader.BlocksSummary summary = new PostingsReader.BlocksSummary(input, summaryOffset, true); + // Prefix section reads [prefixIndex, suffixIndex), sorted ascending. + try (PostingsReader prefix = PostingsReader.prefixSection(input, summary, + QueryEventListener.PostingListEventListener.NO_OP)) + { + assertEquals(1L, prefix.nextPosting()); + assertEquals(3L, prefix.nextPosting()); + assertEquals(5L, prefix.nextPosting()); + assertEquals(7L, prefix.nextPosting()); + assertEquals(10L, prefix.nextPosting()); + assertEquals(PostingList.END_OF_STREAM, prefix.nextPosting()); + } + } + } + + @Test + public void testV2LargeSectionsWithAdvance() throws IOException + { + // Build large exact and prefix sections spanning multiple FOR blocks. + long[] exact = new long[300]; + for (int i = 0; i < exact.length; i++) + exact[i] = i * 2L; + long[] prefix = new long[300]; + for (int i = 0; i < prefix.length; i++) + prefix[i] = i * 3L; + + long summaryOffset; + try (PostingsWriter writer = new PostingsWriter(indexDescriptor, indexIdentifier)) + { + summaryOffset = writer.writeV2(new ArrayPostingList(exact), new ArrayPostingList(prefix)); + writer.complete(); + } + try (IndexInput input = openInput()) + { + PostingsReader.BlocksSummary summary = new PostingsReader.BlocksSummary(input, summaryOffset, true); + assertEquals(300, summary.prefixIndex); + assertEquals(600, summary.suffixIndex); + + try (PostingsReader prefixReader = PostingsReader.prefixSection(input, summary, + QueryEventListener.PostingListEventListener.NO_OP)) + { + // advance into the middle of the prefix section + assertEquals(300L, prefixReader.advance(299L)); // first prefix value >= 299 is 300 (=100*3) + assertEquals(303L, prefixReader.nextPosting()); + } + } + } + + @Test + public void testV1BackwardCompat() throws IOException + { + long summaryOffset; + try (PostingsWriter writer = new PostingsWriter(indexDescriptor, indexIdentifier)) + { + summaryOffset = writer.write(new ArrayPostingList(0, 1, 2)); + writer.complete(); + } + try (IndexInput input = openInput()) + { + PostingsReader.BlocksSummary summary = new PostingsReader.BlocksSummary(input, summaryOffset); + assertEquals(3, summary.numPostings); + try (PostingsReader reader = new PostingsReader(input, summary, + QueryEventListener.PostingListEventListener.NO_OP)) + { + assertEquals(0L, reader.nextPosting()); + assertEquals(1L, reader.nextPosting()); + assertEquals(2L, reader.nextPosting()); + assertEquals(PostingList.END_OF_STREAM, reader.nextPosting()); + } + } + } +} diff --git a/test/unit/org/apache/cassandra/index/sai/disk/v1/segment/LiteralIndexSegmentTermsReaderPrefixTest.java b/test/unit/org/apache/cassandra/index/sai/disk/v1/segment/LiteralIndexSegmentTermsReaderPrefixTest.java new file mode 100644 index 000000000000..eaafc912bc3f --- /dev/null +++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/segment/LiteralIndexSegmentTermsReaderPrefixTest.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sai.disk.v1.segment; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.TreeSet; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.index.sai.QueryContext; +import org.apache.cassandra.index.sai.disk.format.IndexComponent; +import org.apache.cassandra.index.sai.disk.format.IndexDescriptor; +import org.apache.cassandra.index.sai.disk.v1.SAICodecUtils; +import org.apache.cassandra.index.sai.disk.v1.trie.LiteralIndexWriter; +import org.apache.cassandra.index.sai.metrics.QueryEventListener; +import org.apache.cassandra.index.sai.postings.PostingList; +import org.apache.cassandra.index.sai.utils.IndexIdentifier; +import org.apache.cassandra.index.sai.utils.SAIRandomizedTester; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Direct-reader tests for {@link LiteralIndexSegmentTermsReader#prefixMatchWithStats}. + * Verifies both correct row IDs and the traversal path taken by + * {@link LiteralIndexSegmentTermsReader.PrefixQuery#collectFromNode}. + * + * Four scenarios: + *
    + *
  1. No match — prefix not in the trie.
  2. + *
  3. Combined section at the prefix node itself — DFS stops immediately.
  4. + *
  5. Combined sections 2 levels below the prefix node — DFS walks empty intermediates.
  6. + *
  7. No sections anywhere — DFS traverses all leaves, exact-section read per term.
  8. + *
+ * + * Mirrors the {@code BlockBalancedTreeReaderTest} pattern: write the index directly via + * {@link LiteralIndexWriter}, open the reader directly, no CQL stack. + */ +public class LiteralIndexSegmentTermsReaderPrefixTest extends SAIRandomizedTester +{ + // Combined sections are written at depth % skip == 0 AND prefixCount >= minimumLeaves. + private static final int POSTINGS_SKIP = 3; + private static final int MIN_POSTINGS_LEAVES = 64; + + @Before + public void configurePrefixThresholds() + { + CassandraRelevantProperties.SAI_POSTINGS_SKIP.setString(String.valueOf(POSTINGS_SKIP)); + CassandraRelevantProperties.SAI_MINIMUM_POSTINGS_LEAVES.setString(String.valueOf(MIN_POSTINGS_LEAVES)); + } + + // ------------------------------------------------------------------------- + // helpers + // ------------------------------------------------------------------------- + + private LiteralIndexSegmentTermsReader buildReader(List termPostings) throws Exception + { + IndexDescriptor desc = newIndexDescriptor(); + IndexIdentifier id = createIndexIdentifier("ks", "tbl", newIndex()); + + // SegmentTrieBuffer accumulates prefix postings at the right depths and emits the V2 + // header (exactCount, totalCount, rows...) that LiteralIndexWriter.writeCompleteSegment expects. + int skip = CassandraRelevantProperties.SAI_POSTINGS_SKIP.getInt(); + SegmentTrieBuffer buffer = new SegmentTrieBuffer(depth -> depth % skip == 0); + + for (TermPostings tp : termPostings) + { + ByteComparable term = ByteComparable.fixedLength(tp.term.getBytes()); + for (long rowId : tp.rowIds) + buffer.add(term, tp.term.length(), (int) rowId); + } + + LiteralIndexWriter writer = new LiteralIndexWriter(desc, id); + SegmentMetadata.ComponentMetadataMap meta = writer.writeCompleteSegment(buffer.iterator(), true); + + FileHandle termsData = desc.createPerIndexFileHandle(IndexComponent.TERMS_DATA, id, null); + FileHandle postingLists = desc.createPerIndexFileHandle(IndexComponent.POSTING_LISTS, id, null); + long footerPointer = Long.parseLong( + meta.get(IndexComponent.TERMS_DATA).attributes.get(SAICodecUtils.FOOTER_POINTER)); + + return new LiteralIndexSegmentTermsReader( + id, termsData, postingLists, + meta.get(IndexComponent.TERMS_DATA).root, + footerPointer, true /* isV2 */); + } + + private QueryEventListener.TrieIndexEventListener mockListener() + { + QueryEventListener.TrieIndexEventListener l = mock(QueryEventListener.TrieIndexEventListener.class); + when(l.postingListEventListener()).thenReturn(mock(QueryEventListener.PostingListEventListener.class)); + return l; + } + + private TreeSet drain(PostingList pl) throws IOException + { + TreeSet ids = new TreeSet<>(); + if (pl == null) return ids; + try (PostingList p = pl) + { + long id; + while ((id = p.nextPosting()) != PostingList.END_OF_STREAM) + ids.add(id); + } + return ids; + } + + private static ByteComparable bc(String s) + { + return ByteComparable.fixedLength(s.getBytes()); + } + + /** A term and its associated row IDs. */ + private static class TermPostings + { + final String term; + final long[] rowIds; + + TermPostings(String term, long... rowIds) + { + this.term = term; + this.rowIds = rowIds; + } + } + + // ------------------------------------------------------------------------- + // tests + // ------------------------------------------------------------------------- + + /** + * Dense 3-level trie under "abc": abc_[a-c]_[x-z]_NN (45 terms, 3×3×5 rows). + * Query prefix "xyz" shares no bytes with any term. + * + * Expected: null result, all stats zero — DFS never starts. + */ + @Test + public void testNoMatch() throws Exception + { + List data = new ArrayList<>(); + long rowId = 0; + for (char g : new char[]{'a', 'b', 'c'}) + for (char s : new char[]{'x', 'y', 'z'}) + for (int n = 0; n < 5; n++) + data.add(new TermPostings("abc_" + g + "_" + s + "_" + String.format("%02d", n), rowId++)); + + try (LiteralIndexSegmentTermsReader reader = buildReader(data)) + { + LiteralIndexSegmentTermsReader.TraversalStats stats = new LiteralIndexSegmentTermsReader.TraversalStats(); + PostingList result = reader.prefixMatchWithStats(bc("xyz"), bc("xyz~"), mockListener(), mock(QueryContext.class), stats); + + assertNull("Expected null for unmatched prefix", result); + assertEquals(0, stats.combinedSectionHits); + assertEquals(0, stats.exactSectionHits); + assertEquals(0, stats.emptyNodes); + } + } + + /** + * Prefix "ge_" sits at trie depth 3 (depth 3 % skip=3 == 0). + * Under it: 3 groups × 3 sub-groups × 10 rows = 90 rows (≥ 64 → combined section at "ge_"). + * Noise: "ga_*" and "gb_*" branch at depth 2; "other_*" branches at root. + * + * Expected: single combined-section hit; emptyNodes=0 proves DFS stopped at "ge_" without recursing. + */ + @Test + public void testCombinedSectionAtPrefixNode() throws Exception + { + List data = new ArrayList<>(); + TreeSet expectedIds = new TreeSet<>(); + long rowId = 1000; + + // Target: ge_[a-c]_[p-r]_NNNN — 3×3×10 = 90 rows + for (char g : new char[]{'a', 'b', 'c'}) + for (char s : new char[]{'p', 'q', 'r'}) + for (int n = 0; n < 10; n++, rowId++) + { + data.add(new TermPostings("ge_" + g + "_" + s + "_" + String.format("%04d", n), rowId)); + expectedIds.add(rowId); + } + + // Noise branching at depth 2 under same root letter 'g': ga_*, gb_* + for (char g : new char[]{'a', 'b'}) + for (char s : new char[]{'x', 'y', 'z'}) + for (int n = 0; n < 10; n++, rowId++) + data.add(new TermPostings("g" + g + "_" + s + "_" + String.format("%04d", n), rowId)); + + // Noise at a different root: other_[a-c]_NN + for (char g : new char[]{'a', 'b', 'c'}) + for (int n = 0; n < 10; n++, rowId++) + data.add(new TermPostings("other_" + g + "_" + String.format("%02d", n), rowId)); + + try (LiteralIndexSegmentTermsReader reader = buildReader(data)) + { + LiteralIndexSegmentTermsReader.TraversalStats stats = new LiteralIndexSegmentTermsReader.TraversalStats(); + PostingList result = reader.prefixMatchWithStats(bc("ge_"), bc("ge~"), mockListener(), mock(QueryContext.class), stats); + + assertEquals("Expected 90 matching rows", expectedIds, drain(result)); + assertEquals("Expected single combined-section hit at prefix node", 1, stats.combinedSectionHits); + assertEquals(0, stats.exactSectionHits); + // emptyNodes=0: DFS returned at "ge_" without recursing into any child. + assertEquals("Expected no recursion past prefix node", 0, stats.emptyNodes); + } + } + + /** + * Prefix "r" (depth 1, 1 % 3 ≠ 0) has no combined section. + * Children "ra", "rb" (depth 2, 2 % 3 ≠ 0) also have none. + * Grandchildren "rax", "ray", "rbx", "rby" sit at depth 3 (3 % 3 == 0), + * each with 80 rows (≥ 64) → combined sections written there. + * + * DFS path: r (empty) → ra (empty) → rax (combined! stop), ray (combined! stop) + * rb (empty) → rbx (combined! stop), rby (combined! stop) + * + * Expected: combinedSectionHits=4, emptyNodes≥3 (r, ra, rb are all intermediate). + */ + @Test + public void testCombinedSectionInGrandchildren() throws Exception + { + List data = new ArrayList<>(); + TreeSet expectedIds = new TreeSet<>(); + long rowId = 500; + + // Target: r[a-b][x-y]_NNNN — 4 sub-groups × 80 rows = 320 rows. + // rax/ray/rbx/rby nodes are at trie depth 3 (3 chars deep), so depth % 3 == 0. + for (char mid : new char[]{'a', 'b'}) + for (char leaf : new char[]{'x', 'y'}) + for (int n = 0; n < 80; n++, rowId++) + { + data.add(new TermPostings("r" + mid + leaf + "_" + String.format("%04d", n), rowId)); + expectedIds.add(rowId); + } + + // Noise at a different root: m[a-c][x-z]_NN + for (char g : new char[]{'a', 'b', 'c'}) + for (char s : new char[]{'x', 'y', 'z'}) + for (int n = 0; n < 5; n++, rowId++) + data.add(new TermPostings("m" + g + s + "_" + String.format("%02d", n), rowId)); + + try (LiteralIndexSegmentTermsReader reader = buildReader(data)) + { + LiteralIndexSegmentTermsReader.TraversalStats stats = new LiteralIndexSegmentTermsReader.TraversalStats(); + // Query "r": depth 1, no combined section. DFS descends through ra/rb (depth 2) + // then stops at rax/ray/rbx/rby (depth 3, combined sections). + PostingList result = reader.prefixMatchWithStats(bc("r"), bc("r~"), mockListener(), mock(QueryContext.class), stats); + + assertEquals("Expected 320 matching rows", expectedIds, drain(result)); + assertEquals("Expected 4 combined-section hits at grandchild nodes", 4, stats.combinedSectionHits); + assertEquals(0, stats.exactSectionHits); + assertTrue("Expected empty intermediate nodes (r, ra, rb)", stats.emptyNodes >= 3); + } + } + + /** + * Prefix "s_" over a dense 3-level trie: s_[a-c]_[x-z]_[0-2] (27 terms × 2 rowIds = 54 total). + * 54 < MIN_POSTINGS_LEAVES=64 → no combined section at any node, even at depth 6. + * + * DFS traverses all 3 levels: s_a_/s_b_/s_c_ → x_/y_/z_ → 0/1/2. + * Each of the 27 leaf terms has only an exact section. + * + * Expected: combinedSectionHits=0, exactSectionHits=27, emptyNodes>0. + */ + @Test + public void testFullSubtreeTraversalNoSections() throws Exception + { + List data = new ArrayList<>(); + TreeSet expectedIds = new TreeSet<>(); + long rowId = 10; + + // Target: s_[a-c]_[x-z]_[0-2] — 27 terms, 2 rows each (54 total, below 64 threshold) + for (char g : new char[]{'a', 'b', 'c'}) + for (char s : new char[]{'x', 'y', 'z'}) + for (char t : new char[]{'0', '1', '2'}) + { + data.add(new TermPostings("s_" + g + "_" + s + "_" + t, rowId, rowId + 1)); + expectedIds.add(rowId); + expectedIds.add(rowId + 1); + rowId += 2; + } + + // Noise at a different root: t_[a-c]_[x-z]_N + for (char g : new char[]{'a', 'b', 'c'}) + for (char s : new char[]{'x', 'y'}) + for (int n = 0; n < 3; n++, rowId++) + data.add(new TermPostings("t_" + g + "_" + s + "_" + n, rowId)); + + try (LiteralIndexSegmentTermsReader reader = buildReader(data)) + { + LiteralIndexSegmentTermsReader.TraversalStats stats = new LiteralIndexSegmentTermsReader.TraversalStats(); + PostingList result = reader.prefixMatchWithStats(bc("s_"), bc("s~"), mockListener(), mock(QueryContext.class), stats); + + assertEquals("Expected 54 matching rows", expectedIds, drain(result)); + assertEquals("Expected no combined-section hits", 0, stats.combinedSectionHits); + assertEquals("Expected one exact-section hit per leaf term", 27, stats.exactSectionHits); + assertTrue("Expected empty intermediate nodes within s_ subtree", stats.emptyNodes > 0); + } + } +} diff --git a/test/unit/org/apache/cassandra/index/sai/disk/v1/segment/PrefixSectionOptimizationTest.java b/test/unit/org/apache/cassandra/index/sai/disk/v1/segment/PrefixSectionOptimizationTest.java new file mode 100644 index 000000000000..8b58dadcbc59 --- /dev/null +++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/segment/PrefixSectionOptimizationTest.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sai.disk.v1.segment; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.index.sai.SAITester; +import org.apache.cassandra.index.sai.disk.v1.IndexWriterConfig; + +import static org.apache.cassandra.index.sai.SAITester.getRandom; + +/** + * Tests for prefix search optimization when query prefix lands at different depths + * relative to eligible prefix posting depths (multiples of SAI_POSTINGS_SKIP). + */ +public class PrefixSectionOptimizationTest extends SAITester +{ + @Before + public void setup() + { + // Ensure skip=3 and minimum=64 for predictable test behavior + CassandraRelevantProperties.SAI_POSTINGS_SKIP.setString("3"); + CassandraRelevantProperties.SAI_MINIMUM_POSTINGS_LEAVES.setString("64"); + } + + /** + * Test: Query prefix at depth 0 (root) with prefix section. + * Expected: Fast path - single I/O reading exact+prefix section. + */ + @Test + public void testPrefixAtDepth0WithSection() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, value text)"); + createIndex(String.format("CREATE INDEX ON %%s(value) USING 'sai' WITH OPTIONS = {'%s': 'true'}", + IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI)); + + // Insert enough rows to trigger prefix section at root (depth 0 % 3 == 0) + for (int i = 0; i < 100; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "term_" + i); + + flush(); + + // Query with single-char prefix that covers all rows + assertRowCount(execute("SELECT * FROM %s WHERE value LIKE ?", "t%"), 100); + } + + /** + * Test: Query prefix at depth 3 (eligible) with prefix section. + * Expected: Fast path - single I/O. + */ + @Test + public void testPrefixAtDepth3WithSection() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, value text)"); + createIndex(String.format("CREATE INDEX ON %%s(value) USING 'sai' WITH OPTIONS = {'%s': 'true'}", + IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI)); + + // Create 100 terms starting with "abc" (depth 3 = eligible) + for (int i = 0; i < 100; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "abc_" + i); + + flush(); + + // Query 'abc%' - should hit fast path + assertRowCount(execute("SELECT * FROM %s WHERE value LIKE ?", "abc%"), 100); + } + + /** + * Test: Query prefix at depth 2 (ineligible), children at depth 3 have sections. + * Expected: Smart fallback - read children's exact+prefix sections. + */ + @Test + public void testPrefixAtDepth2ChildrenHaveSections() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, value text)"); + createIndex(String.format("CREATE INDEX ON %%s(value) USING 'sai' WITH OPTIONS = {'%s': 'true'}", + IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI)); + + // Create two groups at depth 3, each with >64 rows + for (int i = 0; i < 80; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "aba_" + i); // "ab" depth 2, "aba" depth 3 + + for (int i = 80; i < 160; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "abb_" + i); // "ab" depth 2, "abb" depth 3 + + flush(); + + // Query 'ab%' (depth 2, no section) - should read 'aba' and 'abb' sections + assertRowCount(execute("SELECT * FROM %s WHERE value LIKE ?", "ab%"), 160); + } + + /** + * Test: Query prefix at depth 2, children at depth 3 do NOT have sections (too few rows). + * Expected: Smart fallback reads exact sections for each leaf term. + */ + @Test + public void testPrefixAtDepth2ChildrenNoSections() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, value text)"); + createIndex(String.format("CREATE INDEX ON %%s(value) USING 'sai' WITH OPTIONS = {'%s': 'true'}", + IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI)); + + // Create small groups at depth 3, each with <64 rows (no prefix sections) + for (int i = 0; i < 10; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "aba_" + i); + + for (int i = 10; i < 20; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "abb_" + i); + + flush(); + + // Query 'ab%' - should read exact sections from leaves + assertRowCount(execute("SELECT * FROM %s WHERE value LIKE ?", "ab%"), 20); + } + + /** + * Test: Mixed scenario - some children have sections, some don't. + * Expected: Read sections where available, exact-only for leaves. + */ + @Test + public void testMixedChildrenSomeSectionsNotAll() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, value text)"); + createIndex(String.format("CREATE INDEX ON %%s(value) USING 'sai' WITH OPTIONS = {'%s': 'true'}", + IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI)); + + // Group 1: >64 rows (will have prefix section at depth 3) + for (int i = 0; i < 80; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "pra_" + i); + + // Group 2: <64 rows (no prefix section) + for (int i = 80; i < 90; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "prb_" + i); + + flush(); + + // Query 'pr%' - should use section for 'pra', exact-only for 'prb' terms + assertRowCount(execute("SELECT * FROM %s WHERE value LIKE ?", "pr%"), 90); + } + + /** + * Test: Deep nesting - query at depth 1, sections at depth 6. + * Expected: Smart fallback navigates through intermediate depths to find sections. + */ + @Test + public void testDeepNestingQueryDepth1SectionsAtDepth6() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, value text)"); + createIndex(String.format("CREATE INDEX ON %%s(value) USING 'sai' WITH OPTIONS = {'%s': 'true'}", + IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI)); + + // Create terms at depth 6 with >64 rows each + for (int i = 0; i < 80; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "xabcde_" + i); // depth 6 + + for (int i = 80; i < 160; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "xabcdf_" + i); // depth 6 + + flush(); + + // Query 'x%' (depth 1) - should find and use sections at depth 6 + assertRowCount(execute("SELECT * FROM %s WHERE value LIKE ?", "x%"), 160); + } + + /** + * Test: No matching terms. + * Expected: Returns empty result without errors. + */ + @Test + public void testNoMatchingTerms() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, value text)"); + createIndex(String.format("CREATE INDEX ON %%s(value) USING 'sai' WITH OPTIONS = {'%s': 'true'}", + IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI)); + + for (int i = 0; i < 100; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "apple_" + i); + + flush(); + + // Query for non-existent prefix + assertRowCount(execute("SELECT * FROM %s WHERE value LIKE ?", "banana%"), 0); + } + + /** + * Test: Single matching term with prefix section. + * Expected: Returns correct result. + */ + @Test + public void testSingleTermWithPrefixSection() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, value text)"); + createIndex(String.format("CREATE INDEX ON %%s(value) USING 'sai' WITH OPTIONS = {'%s': 'true'}", + IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI)); + + // Single term repeated enough times to create prefix section + for (int i = 0; i < 100; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "unique"); + + flush(); + + assertRowCount(execute("SELECT * FROM %s WHERE value LIKE ?", "uniq%"), 100); + } + + /** + * Test: Multiple SSTable segments with different prefix structures. + * Expected: Correctly merges results across segments. + */ + @Test + public void testMultipleSegmentsWithDifferentStructures() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, value text)"); + createIndex(String.format("CREATE INDEX ON %%s(value) USING 'sai' WITH OPTIONS = {'%s': 'true'}", + IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI)); + + // Segment 1: Many terms starting with "seg1_" (will have sections) + for (int i = 0; i < 80; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "seg1_abc" + i); + flush(); + + // Segment 2: Few terms starting with "seg1_" (no sections) + for (int i = 1000; i < 1010; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "seg1_xyz" + i); + flush(); + + // Query should merge results from both segments + assertRowCount(execute("SELECT * FROM %s WHERE value LIKE ?", "seg1_%"), 90); + } + + /** + * Test: Prefix search with memtable + SSTable. + * Expected: Correctly merges memtable (no sections) and SSTable (with sections). + */ + @Test + public void testMemtableAndSSTableMerge() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, value text)"); + createIndex(String.format("CREATE INDEX ON %%s(value) USING 'sai' WITH OPTIONS = {'%s': 'true'}", + IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI)); + + // SSTable: many rows (will have prefix sections) + for (int i = 0; i < 80; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "merged_" + i); + flush(); + + // Memtable: additional rows (no sections yet) + for (int i = 1000; i < 1020; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "merged_" + i); + + // Query should merge both sources + assertRowCount(execute("SELECT * FROM %s WHERE value LIKE ?", "merged_%"), 100); + } + + /** + * Test: Exact match vs prefix match behavior. + * Expected: Exact match uses exact section only, prefix match uses exact+prefix. + */ + @Test + public void testExactMatchVsPrefixMatch() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, value text)"); + createIndex(String.format("CREATE INDEX ON %%s(value) USING 'sai' WITH OPTIONS = {'%s': 'true'}", + IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI)); + + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", 1, "exact"); + for (int i = 10; i < 90; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "exact_" + i); + + flush(); + + // Exact match: should return only "exact" + assertRowCount(execute("SELECT * FROM %s WHERE value = ?", "exact"), 1); + + // Prefix match: should return "exact" + all "exact_*" + assertRowCount(execute("SELECT * FROM %s WHERE value LIKE ?", "exact%"), 81); + } + + /** + * Test: Special characters in prefix. + * Expected: Handles special characters correctly. + */ + @Test + public void testSpecialCharactersInPrefix() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, value text)"); + createIndex(String.format("CREATE INDEX ON %%s(value) USING 'sai' WITH OPTIONS = {'%s': 'true'}", + IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI)); + + for (int i = 0; i < 80; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "test-value_" + i); + + flush(); + + assertRowCount(execute("SELECT * FROM %s WHERE value LIKE ? ALLOW FILTERING", "test-value%"), 80); + } + + /** + * Test: Very long prefix (many characters). + * Expected: Handles deep trie paths correctly. + */ + @Test + public void testVeryLongPrefix() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, value text)"); + createIndex(String.format("CREATE INDEX ON %%s(value) USING 'sai' WITH OPTIONS = {'%s': 'true'}", + IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI)); + + String longPrefix = "verylongprefixstring"; + for (int i = 0; i < 80; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, longPrefix + "_" + i); + + flush(); + + assertRowCount(execute("SELECT * FROM %s WHERE value LIKE ? ALLOW FILTERING", longPrefix + "%"), 80); + } + + /** + * Test: Prefix with all 'z' characters (edge case for successor computation). + * Expected: Handles unbounded upper range correctly. + */ + @Test + public void testPrefixWithMaxBytes() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, value text)"); + createIndex(String.format("CREATE INDEX ON %%s(value) USING 'sai' WITH OPTIONS = {'%s': 'true'}", + IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI)); + + // Note: Most string values don't contain 0xFF bytes, so this tests normal behavior + for (int i = 0; i < 80; i++) + execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, "zzz_" + i); + + flush(); + + assertRowCount(execute("SELECT * FROM %s WHERE value LIKE ? ALLOW FILTERING", "zzz%"), 80); + } +} diff --git a/test/unit/org/apache/cassandra/index/sai/disk/v1/segment/SegmentTrieBufferPrefixTest.java b/test/unit/org/apache/cassandra/index/sai/disk/v1/segment/SegmentTrieBufferPrefixTest.java new file mode 100644 index 000000000000..34d3bb4e9547 --- /dev/null +++ b/test/unit/org/apache/cassandra/index/sai/disk/v1/segment/SegmentTrieBufferPrefixTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.index.sai.disk.v1.segment; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.junit.Test; + +import org.apache.cassandra.index.sai.postings.PostingList; +import org.apache.cassandra.index.sai.utils.IndexEntry; +import org.apache.cassandra.index.sai.utils.SAIRandomizedTester; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class SegmentTrieBufferPrefixTest extends SAIRandomizedTester +{ + private void add(SegmentTrieBuffer buf, String term, int rowId) + { + byte[] bytes = term.getBytes(StandardCharsets.UTF_8); + ByteComparable bc = v -> ByteSource.of(bytes, v); + buf.add(bc, bytes.length, rowId); + } + + /** A single buffered entry decoded from the V2 (header + sections) posting list. */ + private static class Decoded + { + final int exactCount; + final int prefixCount; + final List exact = new ArrayList<>(); + final List prefix = new ArrayList<>(); + + Decoded(PostingList postings) throws Exception + { + this.exactCount = (int) postings.nextPosting(); + int total = (int) postings.nextPosting(); + this.prefixCount = total - exactCount; + for (int i = 0; i < exactCount; i++) + exact.add(postings.nextPosting()); + for (int i = 0; i < prefixCount; i++) + prefix.add(postings.nextPosting()); + } + } + + private List decodeAll(SegmentTrieBuffer buf) throws Exception + { + List result = new ArrayList<>(); + Iterator it = buf.iterator(); + while (it.hasNext()) + result.add(new Decoded(it.next().postingList)); + return result; + } + + @Test + public void testPrefixAccumulatedAtEligibleDepths() throws Exception + { + // skip = 1: accumulate prefix at every depth. + SegmentTrieBuffer buf = new SegmentTrieBuffer(depth -> depth % 1 == 0); + + add(buf, "apple", 0); + add(buf, "application", 1); + add(buf, "apt", 5); + + boolean foundApNode = false; // "ap" prefix node should cover all three rows [0,1,5] + boolean foundLeaf = false; // a leaf with a single exact posting + + for (Decoded d : decodeAll(buf)) + { + if (d.exactCount == 0 && d.prefixCount == 3) + { + foundApNode = true; + assertEquals(List.of(0L, 1L, 5L), d.prefix); + } + if (d.exactCount == 1 && d.prefixCount == 0) + foundLeaf = true; + } + + assertTrue("Expected an intermediate prefix node covering 3 rows", foundApNode); + assertTrue("Expected leaf nodes with a single exact posting", foundLeaf); + } + + @Test + public void testPrefixSkipGating() throws Exception + { + // skip = 2: accumulate prefix only at even depths (2, 4, ...). + SegmentTrieBuffer buf = new SegmentTrieBuffer(depth -> depth % 2 == 0); + + add(buf, "apple", 0); + add(buf, "application", 1); + + // Depth-1 node "a" must NOT have prefix postings; depth-2 node "ap" must. + boolean foundEvenDepthPrefixNode = false; + for (Decoded d : decodeAll(buf)) + { + if (d.exactCount == 0 && d.prefixCount == 2) + { + foundEvenDepthPrefixNode = true; + assertEquals(List.of(0L, 1L), d.prefix); + } + } + assertTrue("Expected an even-depth prefix node covering both rows", foundEvenDepthPrefixNode); + } + + @Test + public void testNoPolicyMeansRawExactPostings() throws Exception + { + SegmentTrieBuffer buf = new SegmentTrieBuffer(); // null policy = V1 raw + + add(buf, "apple", 0); + add(buf, "application", 1); + + // V1 buffer emits raw exact postings (no header). Each leaf has exactly one row. + Iterator it = buf.iterator(); + int entries = 0; + while (it.hasNext()) + { + PostingList postings = it.next().postingList; + assertEquals(1, postings.size()); + assertTrue(postings.nextPosting() != PostingList.END_OF_STREAM); + assertEquals(PostingList.END_OF_STREAM, postings.nextPosting()); + entries++; + } + assertEquals(2, entries); // only the two leaf terms, no intermediate nodes + } +}