From d18ed21356d861a3a69d2fff559591f4b4f84e38 Mon Sep 17 00:00:00 2001 From: Brian Harrington Date: Thu, 4 Jun 2026 11:57:22 -0500 Subject: [PATCH] Avoid cloning index bitmaps for read-only query operands RoaringTagIndex query evaluation cloned the matching bitmap at every leaf (`equal`/`hasKey` via withOffsetClone, plus `True`) so the result could be mutated by an enclosing in-place and/or. A multi-term query therefore cloned once per term, and a standalone `:eq` cloned a bitmap that the caller only reads. Roaring bitmap intermediates were ~38% of allocation in an alloc profile, almost all on the query path. Split evaluation into findReadOnly (may return a shared reference into the index -- read only) and findOwned (caller may mutate; copies only the Equal/HasKey/True cases that can be shared). and/or take the mutated accumulator via findOwned(q1) and the read-only operand via findReadOnly(q2), so only the accumulator is copied. Shared bitmaps are safe because the index is immutable while querying. Defer the paging offset to a single application on the final set instead of cloning it into every leaf (prefix removal commutes with and/or/andNot). This also fixes a latent paging bug: previously a `:not` query with an offset left `all` untrimmed and re-emitted the already-paged prefix. Allocation (gc.alloc.rate.norm, limit=1): standalone `:eq` ~952 -> ~24 B/op; multi-term and/or roughly halved. Adds adversarial ownership tests (mutation-verified: they fail if a shared bitmap is mutated), a `:not`+offset paging test, and a QueryAllocation benchmark. --- .../atlas/core/index/RoaringTagIndex.scala | 126 +++++++++----- .../core/index/RoaringTagIndexSuite.scala | 160 ++++++++++++++++++ .../atlas/core/index/QueryAllocation.scala | 88 ++++++++++ 3 files changed, 333 insertions(+), 41 deletions(-) create mode 100644 atlas-jmh/src/main/scala/com/netflix/atlas/core/index/QueryAllocation.scala diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/index/RoaringTagIndex.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/index/RoaringTagIndex.scala index d31190005..47f2c6847 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/index/RoaringTagIndex.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/index/RoaringTagIndex.scala @@ -225,26 +225,65 @@ class RoaringTagIndex[T <: TaggedItem](items: Array[T], stats: IndexStats) exten /** Extract the value for an encoded tag. */ private def tagValue(t: Long): Int = (t & 0x00000000FFFFFFFFL).toInt - private[index] def findImpl(query: Query, offset: Int): RoaringBitmap = { + /** + * Evaluate a query into the set of matching item positions. + * + * OWNERSHIP: the returned bitmap must be treated as READ-ONLY. For some queries it + * is a shared reference into the index (e.g. the stored bitmap for an exact tag + * match, or `all`), and mutating it would corrupt the index for every later query. + * Callers that need to mutate the result (in-place `and`/`or`, offset removal) must + * obtain an owned copy via [[findOwned]]. + * + * Offset is NOT applied here. Removing a fixed prefix range commutes with + * and/or/andNot, so it is applied once on the final set at the top-level entry + * points instead of being cloned into every leaf. + */ + private def findReadOnly(query: Query): RoaringBitmap = { import com.netflix.atlas.core.model.Query.* query match { - case And(q1, q2) => and(q1, q2, offset) - case Or(q1, q2) => or(q1, q2, offset) - case Not(q) => diff(all, findImpl(q, offset)) - case Equal(k, v) => equal(k, v, offset) - case GreaterThan(k, v) => greaterThan(k, v, false, offset) - case GreaterThanEqual(k, v) => greaterThan(k, v, true, offset) - case LessThan(k, v) => lessThan(k, v, false, offset) - case LessThanEqual(k, v) => lessThan(k, v, true, offset) - case In(k, vs) => in(k, vs, offset) - case q: PatternQuery => strPattern(q, offset) - case HasKey(k) => hasKey(k, offset) - case True => withOffset(all.clone(), offset) + case And(q1, q2) => and(q1, q2) + case Or(q1, q2) => or(q1, q2) + case Not(q) => diff(all, findReadOnly(q)) + case Equal(k, v) => equal(k, v) + case GreaterThan(k, v) => greaterThan(k, v, false) + case GreaterThanEqual(k, v) => greaterThan(k, v, true) + case LessThan(k, v) => lessThan(k, v, false) + case LessThanEqual(k, v) => lessThan(k, v, true) + case In(k, vs) => in(k, vs) + case q: PatternQuery => strPattern(q) + case HasKey(k) => hasKey(k) + case True => all case False => new RoaringBitmap() } } + /** + * Evaluate a query into a bitmap the caller OWNS and may mutate in place. Only the + * cases that [[findReadOnly]] can answer with a shared index bitmap need a copy; + * every other case already produces a freshly-allocated set. + * + * MAINTENANCE: the shared-returning leaves are exactly `Equal` (`vidx.get`), + * `HasKey` (`keyIndex.get`), and `True` (`all`). If a new leaf is added that + * returns a stored index bitmap rather than a fresh one, it MUST be added to the + * match below, or using it as the left operand of `and`/`or` will corrupt the + * index. (See the "ownership" tests in RoaringTagIndexSuite.) + */ + private def findOwned(query: Query): RoaringBitmap = { + import com.netflix.atlas.core.model.Query.* + query match { + case _: Equal | _: HasKey | True => ownedCopy(findReadOnly(query)) + case _ => findReadOnly(query) + } + } + + /** Copy a possibly-shared bitmap so it can be mutated. Empty sets are already owned. */ + private def ownedCopy(set: RoaringBitmap): RoaringBitmap = { + if (set.isEmpty) set else set.clone() + } + private def diff(s1: RoaringBitmap, s2: RoaringBitmap): RoaringBitmap = { + // Static andNot allocates a new owned result and reads (does not mutate) s1/s2, + // so passing the shared `all` for s1 is safe. RoaringBitmap.andNot(s1, s2) } @@ -254,44 +293,41 @@ class RoaringTagIndex[T <: TaggedItem](items: Array[T], stats: IndexStats) exten set } - private def withOffsetClone(set: RoaringBitmap, offset: Int): RoaringBitmap = { - withOffset(set.clone(), offset) - } - - private def and(q1: Query, q2: Query, offset: Int): RoaringBitmap = { - val s1 = findImpl(q1, offset) + private def and(q1: Query, q2: Query): RoaringBitmap = { + val s1 = findOwned(q1) // mutated in place below, so must be owned if (s1.isEmpty) s1 else { - // Short circuit, only perform second query if s1 is not empty - val s2 = findImpl(q2, offset) - s1.and(s2) + // Short circuit, only perform second query if s1 is not empty. `and` only reads + // s2, so a shared bitmap is fine here -- no copy needed. + s1.and(findReadOnly(q2)) s1 } } - private def or(q1: Query, q2: Query, offset: Int): RoaringBitmap = { - val s1 = findImpl(q1, offset) - val s2 = findImpl(q2, offset) - s1.or(s2) + private def or(q1: Query, q2: Query): RoaringBitmap = { + val s1 = findOwned(q1) + s1.or(findReadOnly(q2)) // s2 only read, shared is fine s1 } - private def equal(k: String, v: String, offset: Int): RoaringBitmap = { + private def equal(k: String, v: String): RoaringBitmap = { val kp = keyMap.get(k, -1) val vidx = itemIndex.get(kp) if (vidx == null) new RoaringBitmap() else { val vp = valueMap.get(v, -1) + // Shared reference into the index -- read only (see findReadOnly/findOwned). val matchSet = vidx.get(vp) - if (matchSet == null) new RoaringBitmap() else withOffsetClone(matchSet, offset) + if (matchSet == null) new RoaringBitmap() else matchSet } } - private def greaterThan(k: String, v: String, orEqual: Boolean, offset: Int): RoaringBitmap = { + private def greaterThan(k: String, v: String, orEqual: Boolean): RoaringBitmap = { val kp = keyMap.get(k, -1) val vidx = itemIndex.get(kp) if (vidx == null) new RoaringBitmap() else { + // Fresh accumulator -- owned, safe to return for either read-only or owned use. val set = new LazyOrBitmap() val vp = findOffset(values, v, if (orEqual) 0 else 1) val t = tag(kp, vp) @@ -303,11 +339,11 @@ class RoaringTagIndex[T <: TaggedItem](items: Array[T], stats: IndexStats) exten i += 1 } set.repairAfterLazy() - withOffset(set, offset) + set } } - private def lessThan(k: String, v: String, orEqual: Boolean, offset: Int): RoaringBitmap = { + private def lessThan(k: String, v: String, orEqual: Boolean): RoaringBitmap = { val kp = keyMap.get(k, -1) val vidx = itemIndex.get(kp) if (vidx == null) new RoaringBitmap() @@ -332,11 +368,11 @@ class RoaringTagIndex[T <: TaggedItem](items: Array[T], stats: IndexStats) exten } } set.repairAfterLazy() - withOffset(set, offset) + set } } - private def in(k: String, vs: List[String], offset: Int): RoaringBitmap = { + private def in(k: String, vs: List[String]): RoaringBitmap = { val kp = keyMap.get(k, -1) val vidx = itemIndex.get(kp) if (vidx == null) new RoaringBitmap() @@ -349,11 +385,11 @@ class RoaringTagIndex[T <: TaggedItem](items: Array[T], stats: IndexStats) exten set.naivelazyor(matchSet) } set.repairAfterLazy() - withOffset(set, offset) + set } } - private def strPattern(q: Query.PatternQuery, offset: Int): RoaringBitmap = { + private def strPattern(q: Query.PatternQuery): RoaringBitmap = { val kp = keyMap.get(q.k, -1) val vidx = itemIndex.get(kp) if (vidx == null) new RoaringBitmap() @@ -382,14 +418,15 @@ class RoaringTagIndex[T <: TaggedItem](items: Array[T], stats: IndexStats) exten } } set.repairAfterLazy() - withOffset(set, offset) + set } } - private def hasKey(k: String, offset: Int): RoaringBitmap = { + private def hasKey(k: String): RoaringBitmap = { val kp = keyMap.get(k, -1) + // Shared reference into the index -- read only (see findReadOnly/findOwned). val matchSet = keyIndex.get(kp) - if (matchSet == null) new RoaringBitmap() else withOffsetClone(matchSet, offset) + if (matchSet == null) new RoaringBitmap() else matchSet } private def itemOffset(v: String): Int = { @@ -461,7 +498,7 @@ class RoaringTagIndex[T <: TaggedItem](items: Array[T], stats: IndexStats) exten builder.result() } else { val q = query.query.getOrElse(Query.True) - val itemSet = findImpl(q, 0) + val itemSet = findReadOnly(q) // read-only below (iterate); no offset on item ids val offset = findOffset(keys, query.offset) val results = new util.BitSet(keys.length) @@ -504,7 +541,7 @@ class RoaringTagIndex[T <: TaggedItem](items: Array[T], stats: IndexStats) exten // Need to restrict by the query, always include restriction for items with the key val has = Query.HasKey(k) val q = query.query.fold[Query](has)(q => q.and(has)) - val itemSet = findImpl(q, 0) + val itemSet = findReadOnly(q) // read-only below (intersect/iterate) // Double check if there were any matches if (itemSet.isEmpty) return Nil @@ -547,7 +584,14 @@ class RoaringTagIndex[T <: TaggedItem](items: Array[T], stats: IndexStats) exten def findItems(query: TagQuery): List[T] = { val offset = itemOffset(query.offset) val limit = query.limit - val intSet = query.query.fold(withOffset(all, offset))(q => findImpl(q, offset)) + // Apply the offset once on the final set. When offset > 0 the prefix removal + // mutates, so the set must be owned; otherwise the result is only read by + // createResultList and a shared bitmap is fine. + val intSet = + if (offset > 0) + withOffset(query.query.fold(all.clone())(q => findOwned(q)), offset) + else + query.query.fold(all)(q => findReadOnly(q)) createResultList(items, intSet, limit) } diff --git a/atlas-core/src/test/scala/com/netflix/atlas/core/index/RoaringTagIndexSuite.scala b/atlas-core/src/test/scala/com/netflix/atlas/core/index/RoaringTagIndexSuite.scala index 36596e610..c1f3741ef 100644 --- a/atlas-core/src/test/scala/com/netflix/atlas/core/index/RoaringTagIndexSuite.scala +++ b/atlas-core/src/test/scala/com/netflix/atlas/core/index/RoaringTagIndexSuite.scala @@ -267,4 +267,164 @@ class RoaringTagIndexSuite extends TagIndexSuite { val rs = idx.findItems(TagQuery(Some(q))) assertEquals(rs.map(_.tags("k")).toSet, Set("Zasg")) } + + // -------------------------------------------------------------------------- + // Ownership / shared-bitmap safety. + // + // findReadOnly may hand back a reference to a bitmap stored in the index (for + // Equal/HasKey/True). If any query path mutated one of those, the index would be + // silently corrupted and later queries for the same tag would return wrong + // results. These tests run leaf queries, then a battery of compound queries that + // route those leaves through the in-place and/or, and assert the leaf results are + // unchanged afterward. + // -------------------------------------------------------------------------- + + // Each test builds its own index: a buggy implementation would corrupt the shared + // one and the tests would corrupt each other (order-dependent failures). A fresh + // index per test makes a regression fail deterministically. + private def freshIndex(): TagIndex[TimeSeries] = + RoaringTagIndex(TagIndexSuite.dataset.toArray, new IndexStats()) + + private def ids(idx: TagIndex[TimeSeries], q: Query): List[com.netflix.atlas.core.model.ItemId] = + idx.findItems(TagQuery(Some(q))).map(_.id) + + // Real (key, value) pairs and keys drawn from the dataset so the queries match. + private val sampleTags: List[(String, String)] = + TagIndexSuite.dataset.iterator.flatMap(_.tags).toList.distinct.take(12) + + private val sampleKeys: List[String] = sampleTags.map(_._1).distinct + + // The key with the most distinct values, and its (sorted) values. Used to build + // In/Regex/range leaves so the ownership battery also exercises the non-Equal + // leaves as and/or operands -- locking the invariant that they return fresh sets. + private val multiKey: String = + TagIndexSuite.dataset.iterator + .flatMap(_.tags) + .toList + .groupBy(_._1) + .view + .mapValues(_.map(_._2).distinct.size) + .maxBy(_._2) + ._1 + + private val multiVals: List[String] = + TagIndexSuite.dataset.iterator + .flatMap(_.tags) + .collect { case (k, v) if k == multiKey => v } + .toList + .distinct + .sorted + + private val inQuery: Query = Query.In(multiKey, multiVals.take(3)) + private val regexQuery: Query = Query.Regex(multiKey, multiVals.head.take(2)) + private val geQuery: Query = Query.GreaterThanEqual(multiKey, multiVals(multiVals.size / 2)) + + private val leafQueries: List[Query] = + Query.True :: inQuery :: regexQuery :: geQuery :: + sampleKeys.map(Query.HasKey.apply) ::: + sampleTags.map { case (k, v) => Query.Equal(k, v) } + + test("ownership: AND does not corrupt the shared bitmap of its first operand") { + val idx = freshIndex() + val q = Query.Equal(sampleTags.head._1, sampleTags.head._2) + val before = ids(idx, q) + // q is the first operand, evaluated via findOwned (must clone); the in-place + // `and` then mutates the accumulator. AND with *different* equality tags so the + // intersection is a strict subset -- if the clone is missing, q's stored bitmap + // shrinks. Guard against a vacuous setup: require at least one term to shrink q. + assert( + sampleTags.tail.exists { + case (k, v) => ids(idx, Query.And(q, Query.Equal(k, v))).size < before.size + }, + "test setup is vacuous: no AND term shrinks the first operand" + ) + sampleTags.tail.foreach { + case (k, v) => idx.findItems(TagQuery(Some(Query.And(q, Query.Equal(k, v))))) + } + assertEquals(ids(idx, q), before, "Equal result changed after AND -> shared bitmap mutated") + } + + test("ownership: OR does not corrupt the shared bitmap of its first operand") { + val idx = freshIndex() + val q = Query.Equal(sampleTags.head._1, sampleTags.head._2) + val before = ids(idx, q) + // Guard against vacuity: require at least one OR term to grow the result. + assert( + sampleTags.tail.exists { + case (k, v) => ids(idx, Query.Or(q, Query.Equal(k, v))).size > before.size + }, + "test setup is vacuous: no OR term grows the first operand" + ) + // in-place `or` would add bits to q's stored bitmap if it were not cloned. + sampleTags.tail.foreach { + case (k, v) => idx.findItems(TagQuery(Some(Query.Or(q, Query.Equal(k, v))))) + } + assertEquals(ids(idx, q), before, "Equal result changed after OR -> shared bitmap mutated") + } + + test("ownership: HasKey/True survive NOT and a deep compound battery") { + val idx = freshIndex() + val baseline = leafQueries.map(q => q -> ids(idx, q)).toMap + + val a = Query.Equal(sampleTags.head._1, sampleTags.head._2) + val b = Query.Equal(sampleTags(1)._1, sampleTags(1)._2) + val compounds = List( + Query.And(a, Query.HasKey(sampleKeys.head)), + Query.And(Query.HasKey(sampleKeys.head), a), + Query.Or(a, b), + Query.Or(Query.True, a), + Query.Not(a), + Query.And(Query.Not(a), Query.HasKey(sampleKeys.head)), + Query.And(Query.And(a, Query.HasKey(sampleKeys.head)), b), + Query.Or(Query.Or(a, b), Query.HasKey(sampleKeys.last)), + // Non-Equal leaves as the first (mutated) operand -- locks the invariant that + // In/Regex/range return fresh sets and don't need a copy in findOwned. + Query.And(inQuery, Query.HasKey(sampleKeys.head)), + Query.Or(regexQuery, a), + Query.And(geQuery, Query.HasKey(sampleKeys.head)) + ) + // Run twice to surface progressive corruption (each run would further + // shrink/grow a shared bitmap that was wrongly aliased). + (0 until 2).foreach(_ => compounds.foreach(c => idx.findItems(TagQuery(Some(c))))) + + leafQueries.foreach { q => + assertEquals(ids(idx, q), baseline(q), s"leaf query changed after compound battery: $q") + } + } + + test("ownership: offset query does not corrupt `all`") { + val idx = freshIndex() + val full = idx.findItems(TagQuery(None, limit = Int.MaxValue)) + val before = full.map(_.id) + // Use an id from the MIDDLE of the sorted set so itemOffset(someId) > 0 and the + // offset>0 branch (which clones `all` before removing the prefix) actually runs. + // The lowest id is at sorted position 0 and would yield offset 0 (read-only path). + val someId = full(full.size / 2).id.toString + val paged = idx.findItems(TagQuery(None, offset = someId, limit = Int.MaxValue)) + assert(paged.size < full.size, "offset did not trim; offset>0 clone path not exercised") + // Run both offset>0 paths: no-query (all.clone()) and True-query (findOwned(True)). + idx.findItems(TagQuery(Some(Query.True), offset = someId, limit = Int.MaxValue)) + idx.findItems(TagQuery(None, offset = someId, limit = Int.MaxValue)) + assertEquals( + idx.findItems(TagQuery(None, limit = Int.MaxValue)).map(_.id), + before, + "all changed after offset query -> shared `all` was mutated" + ) + } + + test("offset: paging a :not query excludes ids at/below the cursor") { + // Locks the offset-deferral fix: previously offset was applied per-leaf and the + // Not branch (`andNot(all, ...)`) left `all` untrimmed, so paging a :not query + // re-emitted the already-paged prefix. Offset is now applied once on the result. + val idx = freshIndex() + val q = Query.Not(Query.Equal(sampleTags.head._1, sampleTags.head._2)) + val full = idx.findItems(TagQuery(Some(q), limit = Int.MaxValue)) + val cursor = full(full.size / 2).id.toString + val paged = idx.findItems(TagQuery(Some(q), offset = cursor, limit = Int.MaxValue)) + assert(paged.nonEmpty && paged.size < full.size, "offset did not page the :not result") + assert( + paged.forall(_.id.toString > cursor), + "paged :not result contains ids <= the offset cursor" + ) + } } diff --git a/atlas-jmh/src/main/scala/com/netflix/atlas/core/index/QueryAllocation.scala b/atlas-jmh/src/main/scala/com/netflix/atlas/core/index/QueryAllocation.scala new file mode 100644 index 000000000..a4756eace --- /dev/null +++ b/atlas-jmh/src/main/scala/com/netflix/atlas/core/index/QueryAllocation.scala @@ -0,0 +1,88 @@ +/* + * Copyright 2014-2026 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.core.index + +import com.netflix.atlas.core.model.BasicTaggedItem +import com.netflix.atlas.core.model.Query +import com.netflix.atlas.core.util.SortedTagMap +import org.openjdk.jmh.annotations.Benchmark +import org.openjdk.jmh.annotations.Scope +import org.openjdk.jmh.annotations.State +import org.openjdk.jmh.infra.Blackhole + +import java.util.UUID + +/** + * Allocation cost of query evaluation. Each `:eq`/`:hasKey` leaf used to clone the + * matching bitmap from the index so an enclosing and/or could mutate it; a multi-term + * query cloned once per term. With shared read-only leaves only the mutated + * accumulator is copied (one clone for a left-nested chain). Compare + * `gc.alloc.rate.norm` on `main` vs the change: + * + * ``` + * > jmh:run -wi 5 -i 8 -f1 -t1 -prof gc .*QueryAllocation.* + * ``` + * + * Design notes so the benchmark actually measures the clones: + * - Each key has many values and items get a random id, so after the id-sort the + * matching positions are scattered -> real ArrayContainer/BitmapContainer bitmaps + * (not run-compressed), which is the shape that dominated the alloc profile. + * - Queries are LEFT-nested (the shape ASL builds: `a,:eq,b,:eq,:and,c,:eq,:and`). + * - `limit = 1` suppresses result-list materialization so the bitmap-eval allocation + * is what is measured. + */ +@State(Scope.Thread) +class QueryAllocation { + + private val numItems = 20000 + private val numKeys = 5 + + // Key k$j is "hi" when bit j of i is set, else "lo". So `k$j=hi` matches ~50% of + // items, and because each item gets a random id the matches are scattered after the + // id-sort -> BitmapContainer/ArrayContainer leaves (not run-compressed). The bits + // are independent, so the left-nested AND stays non-empty through all 5 terms + // (~1/32 of items) rather than short-circuiting to empty, and the OR covers ~31/32. + private val items = (0 until numItems).map { i => + val tags = (0 until numKeys).map(j => s"k$j" -> (if (((i >> j) & 1) == 1) "hi" else "lo")).toMap + BasicTaggedItem(SortedTagMap(tags + ("id" -> UUID.randomUUID().toString))) + } + + private val index = RoaringTagIndex[BasicTaggedItem](items.toArray, new IndexStats()) + + private def eq(j: Int): Query = Query.Equal(s"k$j", "hi") + + private val singleEq: Query = eq(0) + + // Left-nested: And(And(And(And(k0,k1),k2),k3),k4) + private val and5: Query = + (1 until numKeys).foldLeft(eq(0))((acc, j) => Query.And(acc, eq(j))) + + private val or5: Query = + (1 until numKeys).foldLeft(eq(0))((acc, j) => Query.Or(acc, eq(j))) + + // limit = 1 so result materialization does not dominate the measurement. + private def run(q: Query): List[BasicTaggedItem] = + index.findItems(TagQuery(Some(q), limit = 1)) + + @Benchmark + def singleEquality(bh: Blackhole): Unit = bh.consume(run(singleEq)) + + @Benchmark + def andFiveTerms(bh: Blackhole): Unit = bh.consume(run(and5)) + + @Benchmark + def orFiveTerms(bh: Blackhole): Unit = bh.consume(run(or5)) +}