Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 89 additions & 4 deletions src/java/org/apache/cassandra/db/tries/InMemoryTrie.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.IntPredicate;

import com.google.common.annotations.VisibleForTesting;

Expand Down Expand Up @@ -809,6 +810,17 @@ public interface UpsertTransformer<T, U>
* @return The combined value to use. Cannot be null.
*/
T apply(T existing, U update);

/**
* Called when intermediate-node content is applied during a recursive put driven by an insertion policy
* (see {@link #putSingleton(ByteComparable, Object, UpsertTransformer, boolean, IntPredicate)}).
* The default implementation delegates to {@link #apply(Object, Object)}; transformers that need to
* distinguish intermediate (prefix) applications from terminal (exact) ones should override it.
*/
default T applyIntermediate(T existing, U update)
{
return apply(existing, update);
}
}

/**
Expand Down Expand Up @@ -899,6 +911,69 @@ public <R> void putRecursive(ByteComparable key, R value, final UpsertTransforme
root = newRoot;
}

/**
* A version of {@link #putSingleton(ByteComparable, Object, UpsertTransformer, boolean)} which, in addition to
* applying the value at the terminal node, also applies it as intermediate content at every depth selected by
* {@code accumulateIntermediateAtDepth}. Intermediate applications go through
* {@link UpsertTransformer#applyIntermediate}, while the terminal application uses {@link UpsertTransformer#apply}.
* <p>
* Depth is 1-based on the key bytes (the first byte is depth 1); the empty prefix (depth 0, the root) is never
* accumulated. A null policy is equivalent to {@link #putSingleton(ByteComparable, Object, UpsertTransformer, boolean)}.
*/
public <R> void putSingleton(ByteComparable key,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: avoid code duplication. probably we could pass a new arg to putSingleton() and/or putRecursive.

We might need to support both the flows as well.

R value,
UpsertTransformer<T, ? super R> transformer,
boolean useRecursive,
IntPredicate accumulateIntermediateAtDepth) throws SpaceExhaustedException
{
if (accumulateIntermediateAtDepth == null)
{
putSingleton(key, value, transformer, useRecursive);
return;
}
putRecursiveWithPolicy(key, value, transformer, accumulateIntermediateAtDepth);
}

@SuppressWarnings("unchecked")
public <R> void putRecursiveWithPolicy(ByteComparable key,
R value,
UpsertTransformer<T, ? super R> transformer,
IntPredicate accumulateIntermediateAtDepth) throws SpaceExhaustedException
{
int newRoot = putRecursiveWithPolicy(root, key.asComparableBytes(BYTE_COMPARABLE_VERSION), value,
(UpsertTransformer<T, R>) transformer, accumulateIntermediateAtDepth, 0);
if (newRoot != root)
root = newRoot;
}

private <R> int putRecursiveWithPolicy(int node, ByteSource key, R value, final UpsertTransformer<T, R> transformer,
IntPredicate accumulateIntermediateAtDepth, int depth) throws SpaceExhaustedException
{
int transition = key.next();
if (transition == ByteSource.END_OF_STREAM)
return applyContent(node, value, transformer, false);

boolean appliedHere = false;
if (depth >= 1 && accumulateIntermediateAtDepth.test(depth))
{
node = applyContent(node, value, transformer, true);
appliedHere = true;
}

int child = getChild(node, transition);

int newChild = putRecursiveWithPolicy(child, key, value, transformer, accumulateIntermediateAtDepth, depth + 1);
if (newChild == child && !appliedHere)
return node;

int skippedContent = followContentTransition(node);
int attachedChild = !isNull(skippedContent)
? attachChild(skippedContent, transition, newChild) // Single path, no copying required
: expandOrCreateChainNode(transition, newChild);

return preserveContent(node, skippedContent, attachedChild);
}

private <R> int putRecursive(int node, ByteSource key, R value, final UpsertTransformer<T, R> transformer) throws SpaceExhaustedException
{
int transition = key.next();
Expand All @@ -920,25 +995,35 @@ private <R> int putRecursive(int node, ByteSource key, R value, final UpsertTran
}

private <R> int applyContent(int node, R value, UpsertTransformer<T, R> transformer) throws SpaceExhaustedException
{
return applyContent(node, value, transformer, false);
}

private <R> int applyContent(int node, R value, UpsertTransformer<T, R> transformer, boolean intermediate) throws SpaceExhaustedException
{
if (isNull(node))
return ~addContent(transformer.apply(null, value));
return ~addContent(combine(transformer, null, value, intermediate));

if (isLeaf(node))
{
int contentIndex = ~node;
setContent(contentIndex, transformer.apply(getContent(contentIndex), value));
setContent(contentIndex, combine(transformer, getContent(contentIndex), value, intermediate));
return node;
}

if (offset(node) == PREFIX_OFFSET)
{
int contentIndex = getInt(node + PREFIX_CONTENT_OFFSET);
setContent(contentIndex, transformer.apply(getContent(contentIndex), value));
setContent(contentIndex, combine(transformer, getContent(contentIndex), value, intermediate));
return node;
}
else
return createPrefixNode(addContent(transformer.apply(null, value)), node, false);
return createPrefixNode(addContent(combine(transformer, null, value, intermediate)), node, false);
}

private <R> T combine(UpsertTransformer<T, R> transformer, T existing, R value, boolean intermediate)
{
return intermediate ? transformer.applyIntermediate(existing, value) : transformer.apply(existing, value);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public class StorageAttachedIndex implements Index
IndexWriterConfig.CONSTRUCTION_BEAM_WIDTH,
IndexWriterConfig.SIMILARITY_FUNCTION,
IndexWriterConfig.OPTIMIZE_FOR,
IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI,
NonTokenizingOptions.CASE_SENSITIVE,
NonTokenizingOptions.NORMALIZE,
NonTokenizingOptions.ASCII);
Expand Down Expand Up @@ -279,6 +280,17 @@ public static Map<String, String> validateOptions(Map<String, String> options, T
AbstractAnalyzer.fromOptions(indexTermType, analysisOptions);
IndexWriterConfig config = IndexWriterConfig.fromOptions(null, indexTermType, options);

if (options.containsKey(IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI))
{
String val = options.get(IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI);
if (!"true".equalsIgnoreCase(val) && !"false".equalsIgnoreCase(val))
throw new InvalidRequestException(
IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI + " must be 'true' or 'false', got: " + val);
if (!indexTermType.isLiteral())
throw new InvalidRequestException(
IndexWriterConfig.ENABLE_LITERAL_PREFIX_SAI + " is only supported on string/literal columns");
}

// If we are indexing map entries we need to validate the subtypes
if (indexTermType.isComposite())
{
Expand Down Expand Up @@ -450,7 +462,16 @@ public boolean dependsOn(ColumnMetadata column)
@Override
public boolean supportsExpression(ColumnMetadata column, Operator operator)
{
return dependsOn(column) && indexTermType.supports(operator);
if (!dependsOn(column))
return false;

// LIKE is initially parsed as the generic operator (the specific variant is resolved from the bound value),
// so advertise support for both the generic LIKE and LIKE_PREFIX. Only prefix-enabled literal indexes qualify;
// other LIKE variants (suffix/contains/matches) are rejected at execution time.
if (operator == Operator.LIKE || operator == Operator.LIKE_PREFIX)
return indexTermType.isLiteral() && indexWriterConfig.isLiteralPrefixEnabled();

return indexTermType.supports(operator);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ public class IndexWriterConfig

public static final String OPTIMIZE_FOR = "optimize_for";
private static final OptimizeFor DEFAULT_OPTIMIZE_FOR = OptimizeFor.LATENCY;

public static final String ENABLE_LITERAL_PREFIX_SAI = "enable_literal_prefix_sai";
private static final String validOptimizeFor = Arrays.stream(OptimizeFor.values())
.map(Enum::name)
.collect(Collectors.joining(", "));

public static final int MAX_TOP_K = SAI_VECTOR_SEARCH_MAX_TOP_K.getInt();

private static final IndexWriterConfig EMPTY_CONFIG = new IndexWriterConfig(-1, -1, null, null);
private static final IndexWriterConfig EMPTY_CONFIG = new IndexWriterConfig(-1, -1, null, null, false);

// The maximum number of outgoing connections a node can have in a graph.
private final int maximumNodeConnections;
Expand All @@ -71,15 +73,19 @@ public class IndexWriterConfig

private final OptimizeFor optimizeFor;

private final boolean literalPrefixEnabled;

public IndexWriterConfig(int maximumNodeConnections,
int constructionBeamWidth,
VectorSimilarityFunction similarityFunction,
OptimizeFor optimizerFor)
OptimizeFor optimizerFor,
boolean literalPrefixEnabled)
{
this.maximumNodeConnections = maximumNodeConnections;
this.constructionBeamWidth = constructionBeamWidth;
this.similarityFunction = similarityFunction;
this.optimizeFor = optimizerFor;
this.literalPrefixEnabled = literalPrefixEnabled;
}

public int getMaximumNodeConnections()
Expand All @@ -102,6 +108,11 @@ public OptimizeFor getOptimizeFor()
return optimizeFor;
}

public boolean isLiteralPrefixEnabled()
{
return literalPrefixEnabled;
}

public static IndexWriterConfig fromOptions(String indexName, IndexTermType indexTermType, Map<String, String> options)
{
int maximumNodeConnections = DEFAULT_MAXIMUM_NODE_CONNECTIONS;
Expand Down Expand Up @@ -178,7 +189,8 @@ public static IndexWriterConfig fromOptions(String indexName, IndexTermType inde
}
}
}
return new IndexWriterConfig(maximumNodeConnections, queueSize, similarityFunction, optimizeFor);
boolean literalPrefixEnabled = "true".equalsIgnoreCase(options.get(ENABLE_LITERAL_PREFIX_SAI));
return new IndexWriterConfig(maximumNodeConnections, queueSize, similarityFunction, optimizeFor, literalPrefixEnabled);
}

public static IndexWriterConfig emptyConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,22 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.index.sai.disk.PerColumnIndexWriter;
import org.apache.cassandra.index.sai.disk.RowMapping;
import org.apache.cassandra.index.sai.disk.format.IndexComponent;
import org.apache.cassandra.index.sai.disk.format.IndexDescriptor;
import org.apache.cassandra.index.sai.disk.v1.bbtree.NumericIndexWriter;
import org.apache.cassandra.index.sai.disk.v1.segment.SegmentMetadata;
import org.apache.cassandra.index.sai.disk.v1.segment.SegmentTrieBuffer;
import org.apache.cassandra.index.sai.disk.v1.segment.SegmentWriter;
import org.apache.cassandra.index.sai.disk.v1.trie.LiteralIndexWriter;
import org.apache.cassandra.index.sai.memory.MemtableIndex;
import org.apache.cassandra.index.sai.memory.MemtableTermsIterator;
import org.apache.cassandra.index.sai.metrics.IndexMetrics;
import org.apache.cassandra.index.sai.postings.PostingList;
import org.apache.cassandra.index.sai.utils.IndexEntry;
import org.apache.cassandra.index.sai.utils.IndexIdentifier;
import org.apache.cassandra.index.sai.utils.IndexTermType;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
Expand All @@ -55,13 +59,15 @@ public class MemtableIndexWriter implements PerColumnIndexWriter
{
private static final Logger logger = LoggerFactory.getLogger(MemtableIndexWriter.class);
private static final int NO_ROWS = -1;
private static final int MAX_RECURSIVE_TERM_LENGTH = 128;

private final IndexDescriptor indexDescriptor;
private final IndexTermType indexTermType;
private final IndexIdentifier indexIdentifier;
private final IndexMetrics indexMetrics;
private final MemtableIndex memtable;
private final RowMapping rowMapping;
private final boolean literalPrefixEnabled;

private PrimaryKey minKey;
private PrimaryKey maxKey;
Expand All @@ -73,7 +79,8 @@ public MemtableIndexWriter(MemtableIndex memtable,
IndexTermType indexTermType,
IndexIdentifier indexIdentifier,
IndexMetrics indexMetrics,
RowMapping rowMapping)
RowMapping rowMapping,
boolean literalPrefixEnabled)
{
assert rowMapping != null && rowMapping != RowMapping.DUMMY : "Row mapping must exist during FLUSH.";

Expand All @@ -83,6 +90,7 @@ public MemtableIndexWriter(MemtableIndex memtable,
this.indexMetrics = indexMetrics;
this.memtable = memtable;
this.rowMapping = rowMapping;
this.literalPrefixEnabled = literalPrefixEnabled;
}

@Override
Expand Down Expand Up @@ -176,13 +184,39 @@ public void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException

private long flush(MemtableTermsIterator terms) throws IOException
{
SegmentWriter writer = indexTermType.isLiteral() ? new LiteralIndexWriter(indexDescriptor, indexIdentifier)
: new NumericIndexWriter(indexDescriptor,
indexIdentifier,
indexTermType.fixedSizeOf());
SegmentMetadata.ComponentMetadataMap indexMetas;
long numRows;

SegmentMetadata.ComponentMetadataMap indexMetas = writer.writeCompleteSegment(terms);
long numRows = writer.getNumberOfRows();
if (indexTermType.isLiteral() && literalPrefixEnabled)
{
int skip = CassandraRelevantProperties.SAI_POSTINGS_SKIP.getInt();
SegmentTrieBuffer buffer = new SegmentTrieBuffer(depth -> depth % skip == 0);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: avoid using segmentriebuffer instead try to do it in place for memtableIndexwriter


while (terms.hasNext())
{
IndexEntry entry = terms.next();
try (PostingList postings = entry.postingList)
{
long rowId;
while ((rowId = postings.nextPosting()) != PostingList.END_OF_STREAM)
buffer.add(entry.term, MAX_RECURSIVE_TERM_LENGTH, (int) rowId);
}
}

LiteralIndexWriter writer = new LiteralIndexWriter(indexDescriptor, indexIdentifier);
indexMetas = writer.writeCompleteSegment(buffer.iterator(), true);
numRows = writer.getNumberOfRows();
}
else
{
SegmentWriter writer = indexTermType.isLiteral() ? new LiteralIndexWriter(indexDescriptor, indexIdentifier)
: new NumericIndexWriter(indexDescriptor,
indexIdentifier,
indexTermType.fixedSizeOf());

indexMetas = writer.writeCompleteSegment(terms);
numRows = writer.getNumberOfRows();
}

// If no rows were written we need to delete any created column index components
// so that the index is correctly identified as being empty (only having a completion marker)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ public PerColumnIndexWriter newPerColumnIndexWriter(StorageAttachedIndex index,
index.termType(),
index.identifier(),
index.indexMetrics(),
rowMapping);
rowMapping,
index.indexWriterConfig().isLiteralPrefixEnabled());
}

@Override
Expand Down
Loading