Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,26 +225,65 @@ class RoaringTagIndex[T <: TaggedItem](items: Array[T], stats: IndexStats) exten
/** Extract the value for an encoded tag. */
private def tagValue(t: Long): Int = (t & 0x00000000FFFFFFFFL).toInt

private[index] def findImpl(query: Query, offset: Int): RoaringBitmap = {
/**
* Evaluate a query into the set of matching item positions.
*
* OWNERSHIP: the returned bitmap must be treated as READ-ONLY. For some queries it
* is a shared reference into the index (e.g. the stored bitmap for an exact tag
* match, or `all`), and mutating it would corrupt the index for every later query.
* Callers that need to mutate the result (in-place `and`/`or`, offset removal) must
* obtain an owned copy via [[findOwned]].
*
* Offset is NOT applied here. Removing a fixed prefix range commutes with
* and/or/andNot, so it is applied once on the final set at the top-level entry
* points instead of being cloned into every leaf.
*/
private def findReadOnly(query: Query): RoaringBitmap = {
import com.netflix.atlas.core.model.Query.*
query match {
case And(q1, q2) => and(q1, q2, offset)
case Or(q1, q2) => or(q1, q2, offset)
case Not(q) => diff(all, findImpl(q, offset))
case Equal(k, v) => equal(k, v, offset)
case GreaterThan(k, v) => greaterThan(k, v, false, offset)
case GreaterThanEqual(k, v) => greaterThan(k, v, true, offset)
case LessThan(k, v) => lessThan(k, v, false, offset)
case LessThanEqual(k, v) => lessThan(k, v, true, offset)
case In(k, vs) => in(k, vs, offset)
case q: PatternQuery => strPattern(q, offset)
case HasKey(k) => hasKey(k, offset)
case True => withOffset(all.clone(), offset)
case And(q1, q2) => and(q1, q2)
case Or(q1, q2) => or(q1, q2)
case Not(q) => diff(all, findReadOnly(q))
case Equal(k, v) => equal(k, v)
case GreaterThan(k, v) => greaterThan(k, v, false)
case GreaterThanEqual(k, v) => greaterThan(k, v, true)
case LessThan(k, v) => lessThan(k, v, false)
case LessThanEqual(k, v) => lessThan(k, v, true)
case In(k, vs) => in(k, vs)
case q: PatternQuery => strPattern(q)
case HasKey(k) => hasKey(k)
case True => all
case False => new RoaringBitmap()
}
}

/**
* Evaluate a query into a bitmap the caller OWNS and may mutate in place. Only the
* cases that [[findReadOnly]] can answer with a shared index bitmap need a copy;
* every other case already produces a freshly-allocated set.
*
* MAINTENANCE: the shared-returning leaves are exactly `Equal` (`vidx.get`),
* `HasKey` (`keyIndex.get`), and `True` (`all`). If a new leaf is added that
* returns a stored index bitmap rather than a fresh one, it MUST be added to the
* match below, or using it as the left operand of `and`/`or` will corrupt the
* index. (See the "ownership" tests in RoaringTagIndexSuite.)
*/
private def findOwned(query: Query): RoaringBitmap = {
import com.netflix.atlas.core.model.Query.*
query match {
case _: Equal | _: HasKey | True => ownedCopy(findReadOnly(query))
case _ => findReadOnly(query)
}
}

/** Copy a possibly-shared bitmap so it can be mutated. Empty sets are already owned. */
private def ownedCopy(set: RoaringBitmap): RoaringBitmap = {
if (set.isEmpty) set else set.clone()
}

private def diff(s1: RoaringBitmap, s2: RoaringBitmap): RoaringBitmap = {
// Static andNot allocates a new owned result and reads (does not mutate) s1/s2,
// so passing the shared `all` for s1 is safe.
RoaringBitmap.andNot(s1, s2)
}

Expand All @@ -254,44 +293,41 @@ class RoaringTagIndex[T <: TaggedItem](items: Array[T], stats: IndexStats) exten
set
}

private def withOffsetClone(set: RoaringBitmap, offset: Int): RoaringBitmap = {
withOffset(set.clone(), offset)
}

private def and(q1: Query, q2: Query, offset: Int): RoaringBitmap = {
val s1 = findImpl(q1, offset)
private def and(q1: Query, q2: Query): RoaringBitmap = {
val s1 = findOwned(q1) // mutated in place below, so must be owned
if (s1.isEmpty) s1
else {
// Short circuit, only perform second query if s1 is not empty
val s2 = findImpl(q2, offset)
s1.and(s2)
// Short circuit, only perform second query if s1 is not empty. `and` only reads
// s2, so a shared bitmap is fine here -- no copy needed.
s1.and(findReadOnly(q2))
s1
}
}

private def or(q1: Query, q2: Query, offset: Int): RoaringBitmap = {
val s1 = findImpl(q1, offset)
val s2 = findImpl(q2, offset)
s1.or(s2)
private def or(q1: Query, q2: Query): RoaringBitmap = {
val s1 = findOwned(q1)
s1.or(findReadOnly(q2)) // s2 only read, shared is fine
s1
}

private def equal(k: String, v: String, offset: Int): RoaringBitmap = {
private def equal(k: String, v: String): RoaringBitmap = {
val kp = keyMap.get(k, -1)
val vidx = itemIndex.get(kp)
if (vidx == null) new RoaringBitmap()
else {
val vp = valueMap.get(v, -1)
// Shared reference into the index -- read only (see findReadOnly/findOwned).
val matchSet = vidx.get(vp)
if (matchSet == null) new RoaringBitmap() else withOffsetClone(matchSet, offset)
if (matchSet == null) new RoaringBitmap() else matchSet
}
}

private def greaterThan(k: String, v: String, orEqual: Boolean, offset: Int): RoaringBitmap = {
private def greaterThan(k: String, v: String, orEqual: Boolean): RoaringBitmap = {
val kp = keyMap.get(k, -1)
val vidx = itemIndex.get(kp)
if (vidx == null) new RoaringBitmap()
else {
// Fresh accumulator -- owned, safe to return for either read-only or owned use.
val set = new LazyOrBitmap()
val vp = findOffset(values, v, if (orEqual) 0 else 1)
val t = tag(kp, vp)
Expand All @@ -303,11 +339,11 @@ class RoaringTagIndex[T <: TaggedItem](items: Array[T], stats: IndexStats) exten
i += 1
}
set.repairAfterLazy()
withOffset(set, offset)
set
}
}

private def lessThan(k: String, v: String, orEqual: Boolean, offset: Int): RoaringBitmap = {
private def lessThan(k: String, v: String, orEqual: Boolean): RoaringBitmap = {
val kp = keyMap.get(k, -1)
val vidx = itemIndex.get(kp)
if (vidx == null) new RoaringBitmap()
Expand All @@ -332,11 +368,11 @@ class RoaringTagIndex[T <: TaggedItem](items: Array[T], stats: IndexStats) exten
}
}
set.repairAfterLazy()
withOffset(set, offset)
set
}
}

private def in(k: String, vs: List[String], offset: Int): RoaringBitmap = {
private def in(k: String, vs: List[String]): RoaringBitmap = {
val kp = keyMap.get(k, -1)
val vidx = itemIndex.get(kp)
if (vidx == null) new RoaringBitmap()
Expand All @@ -349,11 +385,11 @@ class RoaringTagIndex[T <: TaggedItem](items: Array[T], stats: IndexStats) exten
set.naivelazyor(matchSet)
}
set.repairAfterLazy()
withOffset(set, offset)
set
}
}

private def strPattern(q: Query.PatternQuery, offset: Int): RoaringBitmap = {
private def strPattern(q: Query.PatternQuery): RoaringBitmap = {
val kp = keyMap.get(q.k, -1)
val vidx = itemIndex.get(kp)
if (vidx == null) new RoaringBitmap()
Expand Down Expand Up @@ -382,14 +418,15 @@ class RoaringTagIndex[T <: TaggedItem](items: Array[T], stats: IndexStats) exten
}
}
set.repairAfterLazy()
withOffset(set, offset)
set
}
}

private def hasKey(k: String, offset: Int): RoaringBitmap = {
private def hasKey(k: String): RoaringBitmap = {
val kp = keyMap.get(k, -1)
// Shared reference into the index -- read only (see findReadOnly/findOwned).
val matchSet = keyIndex.get(kp)
if (matchSet == null) new RoaringBitmap() else withOffsetClone(matchSet, offset)
if (matchSet == null) new RoaringBitmap() else matchSet
}

private def itemOffset(v: String): Int = {
Expand Down Expand Up @@ -461,7 +498,7 @@ class RoaringTagIndex[T <: TaggedItem](items: Array[T], stats: IndexStats) exten
builder.result()
} else {
val q = query.query.getOrElse(Query.True)
val itemSet = findImpl(q, 0)
val itemSet = findReadOnly(q) // read-only below (iterate); no offset on item ids
val offset = findOffset(keys, query.offset)

val results = new util.BitSet(keys.length)
Expand Down Expand Up @@ -504,7 +541,7 @@ class RoaringTagIndex[T <: TaggedItem](items: Array[T], stats: IndexStats) exten
// Need to restrict by the query, always include restriction for items with the key
val has = Query.HasKey(k)
val q = query.query.fold[Query](has)(q => q.and(has))
val itemSet = findImpl(q, 0)
val itemSet = findReadOnly(q) // read-only below (intersect/iterate)

// Double check if there were any matches
if (itemSet.isEmpty) return Nil
Expand Down Expand Up @@ -547,7 +584,14 @@ class RoaringTagIndex[T <: TaggedItem](items: Array[T], stats: IndexStats) exten
def findItems(query: TagQuery): List[T] = {
val offset = itemOffset(query.offset)
val limit = query.limit
val intSet = query.query.fold(withOffset(all, offset))(q => findImpl(q, offset))
// Apply the offset once on the final set. When offset > 0 the prefix removal
// mutates, so the set must be owned; otherwise the result is only read by
// createResultList and a shared bitmap is fine.
val intSet =
if (offset > 0)
withOffset(query.query.fold(all.clone())(q => findOwned(q)), offset)
else
query.query.fold(all)(q => findReadOnly(q))
createResultList(items, intSet, limit)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,4 +267,164 @@ class RoaringTagIndexSuite extends TagIndexSuite {
val rs = idx.findItems(TagQuery(Some(q)))
assertEquals(rs.map(_.tags("k")).toSet, Set("Zasg"))
}

// --------------------------------------------------------------------------
// Ownership / shared-bitmap safety.
//
// findReadOnly may hand back a reference to a bitmap stored in the index (for
// Equal/HasKey/True). If any query path mutated one of those, the index would be
// silently corrupted and later queries for the same tag would return wrong
// results. These tests run leaf queries, then a battery of compound queries that
// route those leaves through the in-place and/or, and assert the leaf results are
// unchanged afterward.
// --------------------------------------------------------------------------

// Each test builds its own index: a buggy implementation would corrupt the shared
// one and the tests would corrupt each other (order-dependent failures). A fresh
// index per test makes a regression fail deterministically.
private def freshIndex(): TagIndex[TimeSeries] =
RoaringTagIndex(TagIndexSuite.dataset.toArray, new IndexStats())

private def ids(idx: TagIndex[TimeSeries], q: Query): List[com.netflix.atlas.core.model.ItemId] =
idx.findItems(TagQuery(Some(q))).map(_.id)

// Real (key, value) pairs and keys drawn from the dataset so the queries match.
private val sampleTags: List[(String, String)] =
TagIndexSuite.dataset.iterator.flatMap(_.tags).toList.distinct.take(12)

private val sampleKeys: List[String] = sampleTags.map(_._1).distinct

// The key with the most distinct values, and its (sorted) values. Used to build
// In/Regex/range leaves so the ownership battery also exercises the non-Equal
// leaves as and/or operands -- locking the invariant that they return fresh sets.
private val multiKey: String =
TagIndexSuite.dataset.iterator
.flatMap(_.tags)
.toList
.groupBy(_._1)
.view
.mapValues(_.map(_._2).distinct.size)
.maxBy(_._2)
._1

private val multiVals: List[String] =
TagIndexSuite.dataset.iterator
.flatMap(_.tags)
.collect { case (k, v) if k == multiKey => v }
.toList
.distinct
.sorted

private val inQuery: Query = Query.In(multiKey, multiVals.take(3))
private val regexQuery: Query = Query.Regex(multiKey, multiVals.head.take(2))
private val geQuery: Query = Query.GreaterThanEqual(multiKey, multiVals(multiVals.size / 2))

private val leafQueries: List[Query] =
Query.True :: inQuery :: regexQuery :: geQuery ::
sampleKeys.map(Query.HasKey.apply) :::
sampleTags.map { case (k, v) => Query.Equal(k, v) }

test("ownership: AND does not corrupt the shared bitmap of its first operand") {
val idx = freshIndex()
val q = Query.Equal(sampleTags.head._1, sampleTags.head._2)
val before = ids(idx, q)
// q is the first operand, evaluated via findOwned (must clone); the in-place
// `and` then mutates the accumulator. AND with *different* equality tags so the
// intersection is a strict subset -- if the clone is missing, q's stored bitmap
// shrinks. Guard against a vacuous setup: require at least one term to shrink q.
assert(
sampleTags.tail.exists {
case (k, v) => ids(idx, Query.And(q, Query.Equal(k, v))).size < before.size
},
"test setup is vacuous: no AND term shrinks the first operand"
)
sampleTags.tail.foreach {
case (k, v) => idx.findItems(TagQuery(Some(Query.And(q, Query.Equal(k, v)))))
}
assertEquals(ids(idx, q), before, "Equal result changed after AND -> shared bitmap mutated")
}

test("ownership: OR does not corrupt the shared bitmap of its first operand") {
val idx = freshIndex()
val q = Query.Equal(sampleTags.head._1, sampleTags.head._2)
val before = ids(idx, q)
// Guard against vacuity: require at least one OR term to grow the result.
assert(
sampleTags.tail.exists {
case (k, v) => ids(idx, Query.Or(q, Query.Equal(k, v))).size > before.size
},
"test setup is vacuous: no OR term grows the first operand"
)
// in-place `or` would add bits to q's stored bitmap if it were not cloned.
sampleTags.tail.foreach {
case (k, v) => idx.findItems(TagQuery(Some(Query.Or(q, Query.Equal(k, v)))))
}
assertEquals(ids(idx, q), before, "Equal result changed after OR -> shared bitmap mutated")
}

test("ownership: HasKey/True survive NOT and a deep compound battery") {
val idx = freshIndex()
val baseline = leafQueries.map(q => q -> ids(idx, q)).toMap

val a = Query.Equal(sampleTags.head._1, sampleTags.head._2)
val b = Query.Equal(sampleTags(1)._1, sampleTags(1)._2)
val compounds = List(
Query.And(a, Query.HasKey(sampleKeys.head)),
Query.And(Query.HasKey(sampleKeys.head), a),
Query.Or(a, b),
Query.Or(Query.True, a),
Query.Not(a),
Query.And(Query.Not(a), Query.HasKey(sampleKeys.head)),
Query.And(Query.And(a, Query.HasKey(sampleKeys.head)), b),
Query.Or(Query.Or(a, b), Query.HasKey(sampleKeys.last)),
// Non-Equal leaves as the first (mutated) operand -- locks the invariant that
// In/Regex/range return fresh sets and don't need a copy in findOwned.
Query.And(inQuery, Query.HasKey(sampleKeys.head)),
Query.Or(regexQuery, a),
Query.And(geQuery, Query.HasKey(sampleKeys.head))
)
// Run twice to surface progressive corruption (each run would further
// shrink/grow a shared bitmap that was wrongly aliased).
(0 until 2).foreach(_ => compounds.foreach(c => idx.findItems(TagQuery(Some(c)))))

leafQueries.foreach { q =>
assertEquals(ids(idx, q), baseline(q), s"leaf query changed after compound battery: $q")
}
}

test("ownership: offset query does not corrupt `all`") {
val idx = freshIndex()
val full = idx.findItems(TagQuery(None, limit = Int.MaxValue))
val before = full.map(_.id)
// Use an id from the MIDDLE of the sorted set so itemOffset(someId) > 0 and the
// offset>0 branch (which clones `all` before removing the prefix) actually runs.
// The lowest id is at sorted position 0 and would yield offset 0 (read-only path).
val someId = full(full.size / 2).id.toString
val paged = idx.findItems(TagQuery(None, offset = someId, limit = Int.MaxValue))
assert(paged.size < full.size, "offset did not trim; offset>0 clone path not exercised")
// Run both offset>0 paths: no-query (all.clone()) and True-query (findOwned(True)).
idx.findItems(TagQuery(Some(Query.True), offset = someId, limit = Int.MaxValue))
idx.findItems(TagQuery(None, offset = someId, limit = Int.MaxValue))
assertEquals(
idx.findItems(TagQuery(None, limit = Int.MaxValue)).map(_.id),
before,
"all changed after offset query -> shared `all` was mutated"
)
}

test("offset: paging a :not query excludes ids at/below the cursor") {
// Locks the offset-deferral fix: previously offset was applied per-leaf and the
// Not branch (`andNot(all, ...)`) left `all` untrimmed, so paging a :not query
// re-emitted the already-paged prefix. Offset is now applied once on the result.
val idx = freshIndex()
val q = Query.Not(Query.Equal(sampleTags.head._1, sampleTags.head._2))
val full = idx.findItems(TagQuery(Some(q), limit = Int.MaxValue))
val cursor = full(full.size / 2).id.toString
val paged = idx.findItems(TagQuery(Some(q), offset = cursor, limit = Int.MaxValue))
assert(paged.nonEmpty && paged.size < full.size, "offset did not page the :not result")
assert(
paged.forall(_.id.toString > cursor),
"paged :not result contains ids <= the offset cursor"
)
}
}
Loading