diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/AdversarialMetricsBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/AdversarialMetricsBenchmark.java
new file mode 100644
index 00000000000..0e86edceb48
--- /dev/null
+++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/AdversarialMetricsBenchmark.java
@@ -0,0 +1,167 @@
+package datadog.trace.common.metrics;
+
+import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
+import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import datadog.trace.api.WellKnownTags;
+import datadog.trace.core.CoreSpan;
+import datadog.trace.core.monitor.HealthMetrics;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * Adversarial JMH benchmark designed to stress every cardinality + capacity dimension of the
+ * metrics subsystem at once.
+ *
+ *
The metrics aggregator is supposed to be bounded by design:
+ *
+ *
+ * - {@link AggregateTable} caps total entries at {@code tracerMetricsMaxAggregates} (default
+ * 2048) and rejects further inserts when full.
+ *
- Each cardinality handler caps distinct values per reporting cycle; overflow collapses to
+ * {@code blocked_by_tracer}.
+ *
- The producer/consumer inbox is a fixed-size MPSC queue ({@code tracerMetricsMaxPending},
+ * default 2048); when full, producer {@code offer} returns false and the snapshot is dropped
+ * via {@link HealthMetrics#onStatsInboxFull()}.
+ *
- Histograms use {@code CollapsingLowestDenseStore(1024)} -- bounded per-histogram memory.
+ *
- Cardinality handlers are flat open-addressed tables of fixed capacity -- no allocation on
+ * the producer thread; allocation only on the consumer (handler reset clears, doesn't
+ * reallocate).
+ *
+ *
+ * This benchmark hammers all of those bounds simultaneously with 8 producer threads, unique
+ * labels per op (so handlers cap and the table fills+evicts repeatedly), random durations across a
+ * wide range (so histograms accept many distinct bins), and random {@code error}/{@code topLevel}
+ * flags (so both histograms are exercised). After the run, prints the drop counters so you can
+ * verify the subsystem stayed bounded under attack.
+ *
+ *
What "OOM the metrics subsystem" looks like if the bounds break: producer-thread allocation
+ * would grow unbounded (snapshots faster than inbox can drain produces dropped snapshots, not heap
+ * growth); aggregator-thread heap would grow if entries weren't capped, if handlers grew past their
+ * cap, or if histograms grew past their dense-store limit.
+ */
+@State(Scope.Benchmark)
+@Warmup(iterations = 2, time = 15, timeUnit = SECONDS)
+@Measurement(iterations = 5, time = 15, timeUnit = SECONDS)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(SECONDS)
+@Threads(8)
+@Fork(value = 1)
+public class AdversarialMetricsBenchmark {
+
+ private ClientStatsAggregator aggregator;
+ private CountingHealthMetrics health;
+
+ @State(Scope.Thread)
+ public static class ThreadState {
+ int cursor;
+ }
+
+ @Setup
+ public void setup() {
+ this.health = new CountingHealthMetrics();
+ this.aggregator =
+ new ClientStatsAggregator(
+ new WellKnownTags("", "", "", "", "", ""),
+ Collections.emptySet(),
+ new ClientStatsAggregatorBenchmark.FixedAgentFeaturesDiscovery(
+ Collections.singleton("peer.hostname"), Collections.emptySet()),
+ this.health,
+ new ClientStatsAggregatorBenchmark.NullSink(),
+ 2048,
+ 2048,
+ false);
+ this.aggregator.start();
+ }
+
+ @TearDown
+ public void tearDown() {
+ aggregator.close();
+ System.err.println(
+ "[ADVERSARIAL] snapshots offered (across all threads, both forks combined for this run):");
+ System.err.println(
+ " onStatsInboxFull = "
+ + health.inboxFull
+ + " (snapshots dropped because the MPSC inbox was full)");
+ System.err.println(
+ " onStatsAggregateDropped = "
+ + health.aggregateDropped
+ + " (snapshots dropped because the AggregateTable was full with no stale entry)");
+ System.err.println(
+ " onClientStatTraceComputed total = "
+ + health.traceComputedCalls
+ + " spans counted = "
+ + health.totalSpansCounted);
+ }
+
+ @Benchmark
+ public void publish(ThreadState ts, Blackhole blackhole) {
+ int idx = ts.cursor++;
+ ThreadLocalRandom rng = ThreadLocalRandom.current();
+
+ // Mix indices so labels don't fall into linear order in the handler tables. Distinct labels
+ // exceed every cap (RESOURCE=512, OPERATION=128, SERVICE=128, peer.hostname=512), so handlers
+ // saturate fast and most ops resolve to the blocked-by-tracer sentinel.
+ int scrambled = idx * 0x9E3779B1; // golden ratio multiplier
+ String service = "svc-" + (scrambled & 0xFFFF);
+ String operation = "op-" + ((scrambled >>> 8) & 0x3FFFF);
+ String resource = "res-" + ((scrambled ^ 0x5A5A5A) & 0xFFFFF);
+ String hostname = "host-" + ((scrambled >>> 12) & 0x7FFF);
+ boolean error = (idx & 7) == 0;
+ boolean topLevel = (idx & 3) == 0;
+ // Wide duration spread forces histogram bins to populate broadly.
+ long durationNanos = 1L + (rng.nextLong() & 0x3FFFFFFFL); // 1 ns .. ~1.07 s
+
+ SimpleSpan span =
+ new SimpleSpan(
+ service, operation, resource, "web", true, topLevel, error, 0, durationNanos, 200);
+ span.setTag(SPAN_KIND, SPAN_KIND_CLIENT);
+ span.setTag("peer.hostname", hostname);
+
+ List> trace = Collections.singletonList(span);
+ blackhole.consume(aggregator.publish(trace));
+ }
+
+ /**
+ * Counts what gets dropped. The aggregator publishes onto these counters from many threads, so
+ * the fields are {@code volatile long} with non-atomic increments -- precise counts aren't the
+ * point, order-of-magnitude is.
+ */
+ static final class CountingHealthMetrics extends HealthMetrics {
+ volatile long inboxFull;
+ volatile long aggregateDropped;
+ volatile long traceComputedCalls;
+ volatile long totalSpansCounted;
+
+ @Override
+ public void onStatsInboxFull() {
+ inboxFull++;
+ }
+
+ @Override
+ public void onStatsAggregateDropped() {
+ aggregateDropped++;
+ }
+
+ @Override
+ public void onClientStatTraceComputed(int counted, int total, boolean dropped) {
+ traceComputedCalls++;
+ totalSpansCounted += counted;
+ }
+ }
+}
diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorMissPathBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorMissPathBenchmark.java
new file mode 100644
index 00000000000..2079cbf15ec
--- /dev/null
+++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorMissPathBenchmark.java
@@ -0,0 +1,83 @@
+package datadog.trace.common.metrics;
+
+import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
+import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
+import datadog.trace.api.WellKnownTags;
+import datadog.trace.core.CoreSpan;
+import datadog.trace.core.monitor.HealthMetrics;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * Miss-path variant of {@link ClientStatsAggregatorBenchmark}. Each op publishes a single-span
+ * trace from a pre-built pool where every span has a unique (service, operation, resource) tuple.
+ * After cardinality budgets fill, fields canonicalize to the {@code blocked_by_tracer} sentinel,
+ * but the producer still allocates a {@link SpanSnapshot} per op and enqueues it for the aggregator
+ * -- so the steady state exercises the per-op publish allocations + the consumer's
+ * canonicalize/match work, not the hit-path-only pattern of the other benchmarks.
+ *
+ * Run with {@code -prof gc} to compare allocation rates against master's {@code
+ * ConflatingMetricsAggregator}.
+ */
+@State(Scope.Benchmark)
+@Warmup(iterations = 1, time = 15, timeUnit = SECONDS)
+@Measurement(iterations = 3, time = 15, timeUnit = SECONDS)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(MICROSECONDS)
+@Fork(value = 1)
+public class ClientStatsAggregatorMissPathBenchmark {
+
+ private static final int POOL_SIZE = 4096;
+
+ private final DDAgentFeaturesDiscovery featuresDiscovery =
+ new ClientStatsAggregatorBenchmark.FixedAgentFeaturesDiscovery(
+ Collections.singleton("peer.hostname"), Collections.emptySet());
+ private final ClientStatsAggregator aggregator =
+ new ClientStatsAggregator(
+ new WellKnownTags("", "", "", "", "", ""),
+ Collections.emptySet(),
+ featuresDiscovery,
+ HealthMetrics.NO_OP,
+ new ClientStatsAggregatorBenchmark.NullSink(),
+ 2048,
+ 2048,
+ false);
+
+ private final List>> pool = generatePool(POOL_SIZE);
+ private int cursor;
+
+ static List>> generatePool(int n) {
+ List>> out = new ArrayList<>(n);
+ for (int i = 0; i < n; i++) {
+ SimpleSpan span =
+ new SimpleSpan(
+ "svc-" + i, "op-" + i, "res-" + i, "type-" + (i & 7), true, true, false, 0, 10, -1);
+ span.setTag(SPAN_KIND, SPAN_KIND_CLIENT);
+ span.setTag("peer.hostname", "host-" + i);
+ out.add(Collections.singletonList(span));
+ }
+ return out;
+ }
+
+ @Benchmark
+ public void benchmark(Blackhole blackhole) {
+ int idx = cursor;
+ cursor = (idx + 1) % POOL_SIZE;
+ blackhole.consume(aggregator.publish(pool.get(idx)));
+ }
+}
diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/TracePipelineBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/TracePipelineBenchmark.java
new file mode 100644
index 00000000000..cc1d4a37538
--- /dev/null
+++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/TracePipelineBenchmark.java
@@ -0,0 +1,176 @@
+package datadog.trace.common.metrics;
+
+import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND;
+import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT;
+import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_SERVER;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import datadog.trace.api.WellKnownTags;
+import datadog.trace.bootstrap.instrumentation.api.AgentScope;
+import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
+import datadog.trace.common.writer.Writer;
+import datadog.trace.core.CoreTracer;
+import datadog.trace.core.DDSpan;
+import datadog.trace.core.monitor.HealthMetrics;
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.List;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * End-to-end JMH benchmark of a 3-span HTTP-style trace through {@link CoreTracer}: one {@code
+ * span.kind=server} root + two {@code span.kind=client} children, as if a service handled an
+ * incoming request that made two outbound HTTP calls. Children inherit the server span as parent
+ * via implicit scope-based parentage; the root finishes last so {@code PendingTrace.write} ->
+ * {@code tracer.write(trace)} -> metricsAggregator.publish + writer.write (no-op) runs
+ * synchronously on the producing thread.
+ *
+ * Runs multi-threaded ({@link Threads} = 8 by default; override with {@code -t N}) so the
+ * allocation rate {@code -prof gc} reports reflects multiple producers hitting the shared metrics
+ * aggregator + writer pipeline, and so we can compare total throughput between revisions.
+ *
+ *
Reflection is used to swap the tracer's default no-op {@code metricsAggregator} for a real
+ * {@link ClientStatsAggregator} so the metrics pipeline actually runs.
+ *
+ *
Two modes via {@code @Param}:
+ *
+ *
+ * - {@code stable} -- every op uses the same labels (cache-hit path on the consumer).
+ *
- {@code varied} -- every op uses unique service / operation / resource per span (miss path
+ * until cardinality budgets fill, then sentinel collapse).
+ *
+ */
+@State(Scope.Benchmark)
+@Warmup(iterations = 2, time = 15, timeUnit = SECONDS)
+@Measurement(iterations = 5, time = 15, timeUnit = SECONDS)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(SECONDS)
+@Threads(8)
+@Fork(value = 2)
+public class TracePipelineBenchmark {
+
+ @Param({"stable", "varied"})
+ String mode;
+
+ private CoreTracer tracer;
+ private ClientStatsAggregator aggregator;
+ private boolean stable;
+
+ @State(Scope.Thread)
+ public static class ThreadState {
+ int cursor;
+ }
+
+ @Setup
+ public void setup() throws Exception {
+ this.stable = "stable".equals(mode);
+ this.tracer = CoreTracer.builder().writer(new NoopWriter()).strictTraceWrites(false).build();
+ this.aggregator =
+ new ClientStatsAggregator(
+ new WellKnownTags("", "", "", "", "", ""),
+ Collections.emptySet(),
+ new ClientStatsAggregatorBenchmark.FixedAgentFeaturesDiscovery(
+ Collections.singleton("peer.hostname"), Collections.emptySet()),
+ HealthMetrics.NO_OP,
+ new ClientStatsAggregatorBenchmark.NullSink(),
+ 2048,
+ 2048,
+ false);
+ this.aggregator.start();
+ // Replace the no-op aggregator the tracer was constructed with. The field is package-private
+ // in datadog.trace.core; reflect since this benchmark lives in the metrics package.
+ Field f = CoreTracer.class.getDeclaredField("metricsAggregator");
+ f.setAccessible(true);
+ f.set(this.tracer, this.aggregator);
+ }
+
+ @TearDown
+ public void tearDown() {
+ aggregator.close();
+ tracer.close();
+ }
+
+ @Benchmark
+ public void threeSpanTrace(ThreadState ts, Blackhole blackhole) {
+ int idx = ts.cursor++;
+ String service = stable ? "svc" : "svc-" + idx;
+ String serverOp = stable ? "servlet.request" : "servlet.request-" + idx;
+ String serverResource = stable ? "GET /widgets/{id}" : "GET /widgets/" + idx;
+ String clientOp = stable ? "http.request" : "http.request-" + idx;
+ String clientResource1 = stable ? "GET /downstream-a" : "GET /downstream-a/" + idx;
+ String clientResource2 = stable ? "GET /downstream-b" : "GET /downstream-b/" + idx;
+ String hostA = stable ? "host-a" : "host-a-" + idx;
+ String hostB = stable ? "host-b" : "host-b-" + idx;
+
+ AgentSpan server = tracer.startSpan("servlet", serverOp);
+ server.setResourceName(serverResource);
+ server.setServiceName(service);
+ server.setTag(SPAN_KIND, SPAN_KIND_SERVER);
+ AgentScope serverScope = tracer.activateSpan(server);
+ try {
+ AgentSpan client1 = tracer.startSpan("okhttp", clientOp);
+ client1.setResourceName(clientResource1);
+ client1.setServiceName(service);
+ client1.setTag(SPAN_KIND, SPAN_KIND_CLIENT);
+ client1.setTag("peer.hostname", hostA);
+ AgentScope client1Scope = tracer.activateSpan(client1);
+ try {
+ // simulated unit of in-call work would go here
+ } finally {
+ client1Scope.close();
+ }
+ client1.finish();
+
+ AgentSpan client2 = tracer.startSpan("okhttp", clientOp);
+ client2.setResourceName(clientResource2);
+ client2.setServiceName(service);
+ client2.setTag(SPAN_KIND, SPAN_KIND_CLIENT);
+ client2.setTag("peer.hostname", hostB);
+ AgentScope client2Scope = tracer.activateSpan(client2);
+ try {
+ // simulated unit of in-call work would go here
+ } finally {
+ client2Scope.close();
+ }
+ client2.finish();
+ } finally {
+ serverScope.close();
+ }
+ // Finishing the root last triggers PendingTrace.write -> tracer.write -> metrics + writer on
+ // this thread, since all child refs have already decremented to zero.
+ server.finish();
+ blackhole.consume(server);
+ }
+
+ private static final class NoopWriter implements Writer {
+ @Override
+ public void write(List trace) {}
+
+ @Override
+ public void start() {}
+
+ @Override
+ public boolean flush() {
+ return true;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public void incrementDropCounts(int spanCount) {}
+ }
+}
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java
index 8f2ae1cc6b3..2b6d5ee0a26 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java
@@ -5,8 +5,7 @@
import datadog.trace.util.Hashtable;
import datadog.trace.util.LongHashingUtils;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLongArray;
@@ -30,6 +29,12 @@
* The handlers are reset on the aggregator thread every reporting cycle via {@link
* #resetCardinalityHandlers()}.
*
+ *
EMPTY-as-absent contract: all UTF8 fields are non-null. The optional fields ({@code
+ * serviceSource}, {@code httpMethod}, {@code httpEndpoint}, {@code grpcStatusCode}) carry {@link
+ * UTF8BytesString#EMPTY} when the snapshot had no value; {@link SerializingMetricWriter} tests
+ * against {@code EMPTY} (identity comparison on the singleton) to decide whether to emit each field
+ * on the wire.
+ *
*
Thread-safety: not thread-safe. Counter and histogram updates, cardinality-handler
* registration, and {@link Canonical} use all run on the aggregator thread. Producer threads tag
* durations via {@link #ERROR_TAG} / {@link #TOP_LEVEL_TAG} bits and hand them off through the
@@ -44,6 +49,9 @@ final class AggregateEntry extends Hashtable.Entry {
public static final long ERROR_TAG = 0x8000000000000000L;
public static final long TOP_LEVEL_TAG = 0x4000000000000000L;
+ /** Shared empty array used by entries with no peer tags. */
+ private static final UTF8BytesString[] EMPTY_PEER_TAGS = new UTF8BytesString[0];
+
// Per-field cardinality handlers. Limits live on MetricCardinalityLimits -- see that class for
// per-field rationale.
static final PropertyCardinalityHandler RESOURCE_HANDLER =
@@ -80,11 +88,19 @@ final class AggregateEntry extends Hashtable.Entry {
final short httpStatusCode;
final boolean synthetic;
final boolean traceRoot;
- final List peerTags;
+ final UTF8BytesString[] peerTags;
// Mutable aggregate state -- single-thread (aggregator) writer.
private final Histogram okLatencies = Histogram.newHistogram();
- private final Histogram errorLatencies = Histogram.newHistogram();
+
+ /**
+ * Lazily allocated on the first recorded error. Most entries never see an error and keep this
+ * field {@code null} forever; {@link #getErrorLatencies()} returns a shared empty histogram in
+ * that case. Once allocated, {@link #clear()} just clears it (does not null) since an entry that
+ * errored once tends to error again.
+ */
+ private Histogram errorLatencies;
+
private int errorCount;
private int hitCount;
private int topLevelCount;
@@ -105,7 +121,7 @@ private AggregateEntry(
short httpStatusCode,
boolean synthetic,
boolean traceRoot,
- List peerTags) {
+ UTF8BytesString[] peerTags) {
super(keyHash);
this.resource = resource;
this.service = service;
@@ -132,7 +148,7 @@ AggregateEntry recordDurations(int count, AtomicLongArray durations) {
}
if ((duration & ERROR_TAG) == ERROR_TAG) {
duration ^= ERROR_TAG;
- errorLatencies.accept(duration);
+ errorLatenciesForWrite().accept(duration);
++errorCount;
} else {
okLatencies.accept(duration);
@@ -154,7 +170,7 @@ AggregateEntry recordOneDuration(long tagAndDuration) {
}
if ((tagAndDuration & ERROR_TAG) == ERROR_TAG) {
tagAndDuration ^= ERROR_TAG;
- errorLatencies.accept(tagAndDuration);
+ errorLatenciesForWrite().accept(tagAndDuration);
++errorCount;
} else {
okLatencies.accept(tagAndDuration);
@@ -163,6 +179,16 @@ AggregateEntry recordOneDuration(long tagAndDuration) {
return this;
}
+ /** Lazy-initializes {@link #errorLatencies} on the first error write. */
+ private Histogram errorLatenciesForWrite() {
+ Histogram h = this.errorLatencies;
+ if (h == null) {
+ h = Histogram.newHistogram();
+ this.errorLatencies = h;
+ }
+ return h;
+ }
+
int getErrorCount() {
return errorCount;
}
@@ -183,6 +209,11 @@ Histogram getOkLatencies() {
return okLatencies;
}
+ /**
+ * Returns the error histogram if any error was recorded, or {@code null} otherwise. Callers (only
+ * {@link SerializingMetricWriter}) treat null as "no errors this cycle" -- it serializes an empty
+ * histogram in that case.
+ */
Histogram getErrorLatencies() {
return errorLatencies;
}
@@ -194,7 +225,9 @@ void clear() {
this.topLevelCount = 0;
this.duration = 0;
this.okLatencies.clear();
- this.errorLatencies.clear();
+ if (this.errorLatencies != null) {
+ this.errorLatencies.clear();
+ }
}
/**
@@ -226,7 +259,10 @@ static AggregateEntry of(
UTF8BytesString httpMethodUtf = createUtf8(httpMethod);
UTF8BytesString httpEndpointUtf = createUtf8(httpEndpoint);
UTF8BytesString grpcUtf = createUtf8(grpcStatusCode);
- List peerTagsList = peerTags == null ? Collections.emptyList() : peerTags;
+ UTF8BytesString[] peerTagsArray =
+ peerTags == null || peerTags.isEmpty()
+ ? EMPTY_PEER_TAGS
+ : peerTags.toArray(new UTF8BytesString[0]);
long keyHash =
hashOf(
resourceUtf,
@@ -241,7 +277,8 @@ static AggregateEntry of(
(short) httpStatusCode,
synthetic,
traceRoot,
- peerTagsList);
+ peerTagsArray,
+ peerTagsArray.length);
return new AggregateEntry(
keyHash,
resourceUtf,
@@ -256,7 +293,7 @@ static AggregateEntry of(
(short) httpStatusCode,
synthetic,
traceRoot,
- peerTagsList);
+ peerTagsArray);
}
/**
@@ -283,6 +320,10 @@ static void resetCardinalityHandlers() {
* all canonicalize to the same sentinel {@link UTF8BytesString}) collide in the same bucket.
* {@link UTF8BytesString#hashCode()} returns the underlying String hash, so entries built via
* {@link #of} produce the same hash as entries built from a snapshot with matching content.
+ *
+ * {@code peerTags} is taken as a {@code (array, length)} pair so the same routine works for
+ * the {@link Canonical} scratch buffer (where {@code length < array.length}) and the entry's
+ * fixed-size array.
*/
static long hashOf(
UTF8BytesString resource,
@@ -297,7 +338,8 @@ static long hashOf(
short httpStatusCode,
boolean synthetic,
boolean traceRoot,
- List peerTags) {
+ UTF8BytesString[] peerTags,
+ int peerTagsLen) {
long h = 0;
h = LongHashingUtils.addToHash(h, resource);
h = LongHashingUtils.addToHash(h, service);
@@ -308,10 +350,8 @@ static long hashOf(
h = LongHashingUtils.addToHash(h, synthetic);
h = LongHashingUtils.addToHash(h, traceRoot);
h = LongHashingUtils.addToHash(h, spanKind);
- // indexed iteration -- avoids the iterator allocation a for-each over a List would do
- int peerTagCount = peerTags.size();
- for (int i = 0; i < peerTagCount; i++) {
- h = LongHashingUtils.addToHash(h, peerTags.get(i));
+ for (int i = 0; i < peerTagsLen; i++) {
+ h = LongHashingUtils.addToHash(h, peerTags[i]);
}
h = LongHashingUtils.addToHash(h, httpMethod);
h = LongHashingUtils.addToHash(h, httpEndpoint);
@@ -368,7 +408,7 @@ boolean isTraceRoot() {
return traceRoot;
}
- List getPeerTags() {
+ UTF8BytesString[] getPeerTags() {
return peerTags;
}
@@ -391,7 +431,7 @@ public boolean equals(Object o) {
&& Objects.equals(serviceSource, that.serviceSource)
&& Objects.equals(type, that.type)
&& Objects.equals(spanKind, that.spanKind)
- && peerTags.equals(that.peerTags)
+ && Arrays.equals(peerTags, that.peerTags)
&& Objects.equals(httpMethod, that.httpMethod)
&& Objects.equals(httpEndpoint, that.httpEndpoint)
&& Objects.equals(grpcStatusCode, that.grpcStatusCode);
@@ -425,11 +465,15 @@ static final class Canonical {
boolean traceRoot;
/**
- * Reusable buffer of canonicalized peer-tag UTF8 forms. Cleared and refilled in {@link
- * #populate}; on miss, {@link #toEntry} copies it into an immutable list for the entry to own.
- * Zero allocation on the hit path.
+ * Reusable buffer of canonicalized peer-tag UTF8 forms. Slots {@code [0..peerTagsCount)} are
+ * the live entries; the rest is dead space. The buffer doubles when it runs out of room (rare,
+ * since typical peer-tag schemas have very few tags). On miss, {@link #toEntry} snapshots into
+ * a tight {@link UTF8BytesString}{@code []} for the entry to own. Zero allocation on the hit
+ * path.
*/
- final ArrayList peerTagsBuffer = new ArrayList<>(4);
+ UTF8BytesString[] peerTagsBuffer = new UTF8BytesString[4];
+
+ int peerTagsCount;
long keyHash;
@@ -462,18 +506,18 @@ void populate(SpanSnapshot s) {
httpStatusCode,
synthetic,
traceRoot,
- peerTagsBuffer);
+ peerTagsBuffer,
+ peerTagsCount);
}
/**
* Fills {@link #peerTagsBuffer} with canonical UTF8 forms, applying the schema's per-tag
* handler + warn-once notification at the same index. Returns {@code EMPTY} for null inputs;
* we elide those from the buffer so the wire-format list-of-pairs only contains present peer
- * tags. No allocation when the schema/values are absent or all values are null (buffer is just
- * cleared).
+ * tags.
*/
private void populatePeerTags(PeerTagSchema schema, String[] values) {
- peerTagsBuffer.clear();
+ peerTagsCount = 0;
if (schema == null || values == null) {
return;
}
@@ -481,7 +525,10 @@ private void populatePeerTags(PeerTagSchema schema, String[] values) {
for (int i = 0; i < n; i++) {
UTF8BytesString utf8 = schema.register(i, values[i]);
if (utf8 != UTF8BytesString.EMPTY) {
- peerTagsBuffer.add(utf8);
+ if (peerTagsCount == peerTagsBuffer.length) {
+ peerTagsBuffer = Arrays.copyOf(peerTagsBuffer, peerTagsBuffer.length * 2);
+ }
+ peerTagsBuffer[peerTagsCount++] = utf8;
}
}
}
@@ -501,20 +548,21 @@ boolean matches(AggregateEntry e) {
&& Objects.equals(serviceSource, e.serviceSource)
&& Objects.equals(type, e.type)
&& Objects.equals(spanKind, e.spanKind)
- && peerTagsEqual(peerTagsBuffer, e.peerTags)
+ && peerTagsEqual(peerTagsBuffer, peerTagsCount, e.peerTags)
&& Objects.equals(httpMethod, e.httpMethod)
&& Objects.equals(httpEndpoint, e.httpEndpoint)
&& Objects.equals(grpcStatusCode, e.grpcStatusCode);
}
- /** Indexed list comparison -- avoids the iterator a {@code List.equals} would allocate. */
- private static boolean peerTagsEqual(List a, List b) {
- int n = a.size();
- if (n != b.size()) {
+ /**
+ * Length-aware indexed comparison so the scratch buffer can be compared against a tight array.
+ */
+ private static boolean peerTagsEqual(UTF8BytesString[] a, int aLen, UTF8BytesString[] b) {
+ if (aLen != b.length) {
return false;
}
- for (int i = 0; i < n; i++) {
- if (!a.get(i).equals(b.get(i))) {
+ for (int i = 0; i < aLen; i++) {
+ if (!a[i].equals(b[i])) {
return false;
}
}
@@ -523,18 +571,16 @@ private static boolean peerTagsEqual(List a, List snapshottedPeerTags;
- int n = peerTagsBuffer.size();
- if (n == 0) {
- snapshottedPeerTags = Collections.emptyList();
- } else if (n == 1) {
- snapshottedPeerTags = Collections.singletonList(peerTagsBuffer.get(0));
+ UTF8BytesString[] snapshottedPeerTags;
+ if (peerTagsCount == 0) {
+ snapshottedPeerTags = EMPTY_PEER_TAGS;
} else {
- snapshottedPeerTags = new ArrayList<>(peerTagsBuffer);
+ snapshottedPeerTags = new UTF8BytesString[peerTagsCount];
+ System.arraycopy(peerTagsBuffer, 0, snapshottedPeerTags, 0, peerTagsCount);
}
return new AggregateEntry(
keyHash,
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricCardinalityLimits.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricCardinalityLimits.java
index f7d91343d4b..6a0b1775c6c 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricCardinalityLimits.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricCardinalityLimits.java
@@ -5,30 +5,33 @@
* values collapse to a {@code blocked_by_tracer} sentinel so they merge into one aggregate row
* instead of fragmenting the table.
*
- * Values are sized to the typical-service workload with headroom; "typical" estimates are noted
- * inline. Raise if a workload routinely hits the sentinel; lower carries proportional memory
- * savings but risks suppressing legitimate distinctions.
+ *
Values are sized to cover realistic workload cardinality per 10s reporting window with
+ * headroom -- the prior DDCache-inherited limits (RESOURCE=32, OPERATION=64, ...) were chosen for
+ * memory conservation and were tight enough that a single REST API with a couple hundred routes
+ * would exhaust the budget within seconds. Memory cost with the flat handler tables is ~20 KB
+ * across all 9 handlers -- negligible relative to the {@code maxAggregates}-sized entry table.
*/
final class MetricCardinalityLimits {
private MetricCardinalityLimits() {}
/**
- * Distinct {@code resource.name} values per cycle. Highest-cardinality field by far: DB-query
- * obfuscations, HTTP route templates, custom resources. Typical service: 30-200 unique.
+ * Distinct {@code resource.name} values per cycle. Highest-cardinality field: HTTP route
+ * templates, SQL query templates, custom resources. A web app with one parameterized route per
+ * controller method easily hits low hundreds.
*/
- static final int RESOURCE = 128;
+ static final int RESOURCE = 512;
/**
* Distinct {@code service.name} values per cycle. Local service plus downstream peer-service
- * names. Microservice meshes typically reference 10-50 distinct services.
+ * names; sized for service-mesh hubs that fan out to many downstreams.
*/
- static final int SERVICE = 32;
+ static final int SERVICE = 128;
/**
* Distinct {@code operation.name} values per cycle. Names like {@code http.request}, {@code
- * db.query}, etc. Typical service: 10-30 across integrations.
+ * db.query}, etc. One per integration kind; production services often span 30-60.
*/
- static final int OPERATION = 64;
+ static final int OPERATION = 128;
/**
* Distinct {@code _dd.base_service} override values per cycle. Used rarely; usually empty or one
@@ -37,16 +40,16 @@ private MetricCardinalityLimits() {}
static final int SERVICE_SOURCE = 16;
/**
- * Distinct {@code span.type} values per cycle. {@code DDSpanTypes} catalog is ~30; a single
- * service usually spans 5-10 integration types.
+ * Distinct {@code span.type} values per cycle. {@code DDSpanTypes} catalog has ~30 known values;
+ * a single service typically spans 5-10 integration types.
*/
- static final int TYPE = 16;
+ static final int TYPE = 32;
/**
- * Distinct {@code span.kind} values per cycle. OTel defines exactly 5 (server/client/producer/
- * consumer/internal); 8 still leaves 60% headroom in case a producer invents new kinds.
+ * Distinct {@code span.kind} values per cycle. OTel defines 5 standard kinds (server/client/
+ * producer/consumer/internal); the 16 cap leaves headroom in case producers invent new kinds.
*/
- static final int SPAN_KIND = 8;
+ static final int SPAN_KIND = 16;
/**
* Distinct HTTP method values per cycle. Standard verbs are 7-9; WebDAV/custom adds a few more.
@@ -57,7 +60,7 @@ private MetricCardinalityLimits() {}
* Distinct {@code http.endpoint} values per cycle. Path templates -- same shape as {@code
* RESOURCE} for HTTP-heavy services. Only used when {@code includeEndpointInMetrics} is enabled.
*/
- static final int HTTP_ENDPOINT = 64;
+ static final int HTTP_ENDPOINT = 256;
/**
* Distinct gRPC status code values per cycle. gRPC spec defines exactly 17 codes (0-16); 24
diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java
index f592dfe26f6..86d9a89738e 100644
--- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java
+++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java
@@ -13,7 +13,6 @@
import datadog.trace.api.git.GitInfo;
import datadog.trace.api.git.GitInfoProvider;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
-import java.util.List;
import java.util.function.Function;
public final class SerializingMetricWriter implements MetricWriter {
@@ -185,8 +184,8 @@ public void add(AggregateEntry entry) {
writer.writeUTF8(entry.getSpanKind());
writer.writeUTF8(PEER_TAGS);
- final List peerTags = entry.getPeerTags();
- writer.startArray(peerTags.size());
+ final UTF8BytesString[] peerTags = entry.getPeerTags();
+ writer.startArray(peerTags.length);
for (UTF8BytesString peerTag : peerTags) {
writer.writeUTF8(peerTag);
@@ -230,7 +229,31 @@ public void add(AggregateEntry entry) {
writer.writeBinary(entry.getOkLatencies().serialize());
writer.writeUTF8(ERROR_SUMMARY);
- writer.writeBinary(entry.getErrorLatencies().serialize());
+ final datadog.metrics.api.Histogram errorLatencies = entry.getErrorLatencies();
+ if (errorLatencies != null) {
+ writer.writeBinary(errorLatencies.serialize());
+ } else {
+ // Entry never saw an error; emit a cached empty-histogram payload so the wire format is
+ // unchanged without allocating a histogram per error-free entry.
+ writer.writeBinary(emptyErrorHistogramBytes());
+ }
+ }
+
+ private byte[] emptyHistogramBytesCache;
+
+ /**
+ * Returns the cached serialized form of an empty histogram. Lazily computed so the {@link
+ * datadog.metrics.api.Histograms} factory has been registered by the time we sample it.
+ */
+ private byte[] emptyErrorHistogramBytes() {
+ byte[] cached = emptyHistogramBytesCache;
+ if (cached == null) {
+ java.nio.ByteBuffer buf = datadog.metrics.api.Histogram.newHistogram().serialize();
+ cached = new byte[buf.remaining()];
+ buf.get(cached);
+ emptyHistogramBytesCache = cached;
+ }
+ return cached;
}
@Override
diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java
index c90594b1895..f09b59cd3a7 100644
--- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java
+++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java
@@ -88,21 +88,21 @@ void peerTagPairsParticipateInIdentity() {
@Test
void cardinalityBlockedValuesCollapseIntoOneEntry() {
- // SERVICE_HANDLER has a cardinality limit of 32. With 50 distinct service names, services 33+
- // canonicalize to the "blocked_by_tracer" sentinel. Because the table hashes from the canonical
- // (post-handler) form, all blocked services land in the same bucket and merge into a single
- // entry rather than fragmenting.
+ // SERVICE_HANDLER has a cardinality limit of 128. With 150 distinct service names, services
+ // 129+ canonicalize to the "blocked_by_tracer" sentinel. Because the table hashes from the
+ // canonical (post-handler) form, all blocked services land in the same bucket and merge into
+ // a single entry rather than fragmenting.
AggregateEntry.resetCardinalityHandlers();
- AggregateTable table = new AggregateTable(128);
+ AggregateTable table = new AggregateTable(256);
- for (int i = 0; i < 50; i++) {
- AggregateEntry agg = table.findOrInsert(snapshot("svc-" + i, "op", "client"));
- assertNotNull(agg);
- agg.recordOneDuration(1L);
+ for (int i = 0; i < 150; i++) {
+ AggregateEntry entry = table.findOrInsert(snapshot("svc-" + i, "op", "client"));
+ assertNotNull(entry);
+ entry.recordOneDuration(1L);
}
- // 32 in-budget services + 1 collapsed "blocked_by_tracer" entry = 33 total.
- assertEquals(33, table.size());
+ // 128 in-budget services + 1 collapsed "blocked_by_tracer" entry = 129 total.
+ assertEquals(129, table.size());
AggregateEntry.resetCardinalityHandlers();
}
diff --git a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy
index 81a476c67c8..4883543cf68 100644
--- a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy
+++ b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy
@@ -39,10 +39,10 @@ class MetricsIntegrationTest extends AbstractTraceAgentTest {
)
writer.startBucket(2, System.nanoTime(), SECONDS.toNanos(10))
def entry1 = AggregateEntry.of("resource1", "service1", "operation1", null, "sql", 0, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null)
- entry1.aggregate.recordDurations(5, new AtomicLongArray(2, 1, 2, 250, 4, 5))
+ entry1.recordDurations(5, new AtomicLongArray(2, 1, 2, 250, 4, 5))
writer.add(entry1)
def entry2 = AggregateEntry.of("resource2", "service2", "operation2", null, "web", 200, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null)
- entry2.aggregate.recordDurations(10, new AtomicLongArray(1, 1, 200, 2, 3, 4, 5, 6, 7, 8, 9))
+ entry2.recordDurations(10, new AtomicLongArray(1, 1, 200, 2, 3, 4, 5, 6, 7, 8, 9))
writer.add(entry2)
writer.finishBucket()
diff --git a/docs/client_metrics_design.md b/docs/client_metrics_design.md
index ca5f200c97f..0bb82d36bdd 100644
--- a/docs/client_metrics_design.md
+++ b/docs/client_metrics_design.md
@@ -8,6 +8,83 @@ does not have to sample every span to know request rates and latencies.
Code lives in package `datadog.trace.common.metrics`.
+## Overview
+
+Tracers emit thousands of spans per second. Reporting every one to the Datadog
+Agent — let alone storing them — would be expensive and mostly redundant. The
+question "what's my p95 latency for `GET /users/:id` on `web-frontend` over the
+last 10s" doesn't need every individual span; it needs an aggregate.
+
+The client-stats pipeline computes those aggregates on the tracer itself and
+ships rolled-up **buckets** to the agent on a fixed cadence. Each bucket is a
+tuple of label values (resource, service, operation, span kind, peer tags, http
+method/endpoint/status, grpc status, ...) plus a small accumulator: hit count,
+error count, top-level count, duration sum, ok-latency histogram, error-latency
+histogram. A bucket spans one reporting cycle (default 10 seconds); at the end
+of the cycle the buckets are serialized to the agent's `/v0.6/stats` endpoint
+and the in-memory accumulators are cleared.
+
+### Goals
+
+- **Bounded memory.** The aggregator's footprint must not grow without limit no
+ matter how many distinct label combinations the workload produces, or how
+ high the span throughput is.
+- **No producer-thread contention.** Application threads that complete a span
+ shouldn't block on a lock or do meaningful work beyond cheap field
+ extraction. The tracer is a guest in the application's process; it must not
+ show up as overhead.
+- **Correctness under reset.** Cardinality budgets and histograms are dropped
+ every reporting cycle. Mid-cycle drops and agent-downgrade clears can't
+ corrupt the aggregate table or fragment a single logical bucket.
+- **Stable wire format.** The bucket payload matches the existing `/v0.6/stats`
+ schema. This is a re-implementation of an existing protocol, not a protocol
+ change.
+
+### Architecture in one paragraph
+
+A single **aggregator thread** owns every piece of mutable state — the bucket
+table, the per-field cardinality budgets, the histograms. Application threads
+build a small immutable **span snapshot** per metrics-eligible span and post it
+to a bounded MPSC inbox. The aggregator drains the inbox, **canonicalizes**
+each snapshot's label values through cardinality-capped UTF8 interners, hashes
+the canonical form, and finds-or-inserts a bucket. A scheduled signal flushes
+buckets to the agent every reporting interval; the cardinality budgets are
+reset in lockstep with the flush.
+
+### Three rules
+
+1. **Producer threads never touch shared state.** They build a snapshot and
+ hand it off through the inbox. The aggregator does all the heavy work
+ (canonicalization, hashing, lookup, accumulator updates).
+2. **Cardinality is capped per field, per cycle.** Each label field has its
+ own budget; overflow values collapse to a single `blocked_by_tracer`
+ sentinel so the bucket table can never grow past
+ `maxAggregates × (sum of per-field budgets)` distinct combinations.
+3. **Aggregation happens on canonical UTF8 forms.** Two snapshots that disagree
+ only on representation (same content delivered once as a `String`, once as
+ a `UTF8BytesString`) collapse to the same bucket because hashing and
+ matching happen *after* canonicalization.
+
+### Trade-offs
+
+- **One snapshot allocation per metrics-eligible span.** ~100 bytes per
+ snapshot; cheap individually but a meaningful share of producer allocation
+ at high span throughput. Snapshots could be pooled or replaced with a
+ struct-of-arrays inbox; neither is currently worth the complexity.
+- **Cap-overrun drops the new key, not LRU.** When the bucket table is at
+ capacity and no entry is stale enough to evict, an incoming snapshot for a
+ new label combination is dropped (and reported via
+ `onStatsAggregateDropped`). This protects the steady-state workload from a
+ burst of new keys that would otherwise displace established buckets.
+- **One aggregator thread.** The whole consumer side is single-threaded by
+ design — locks, races, and visibility footguns are confined to the producer
+ → inbox handoff. If the producer rate is sustainedly higher than the
+ aggregator can drain, the inbox fills and snapshots are dropped
+ (`onStatsInboxFull`).
+- **Fixed bucket table.** The hashtable's bucket array is sized once at
+ startup from `maxAggregates`. No dynamic resizing; entries beyond the cap
+ trigger the drop-new-key path above.
+
## High-level shape
```
@@ -225,8 +302,8 @@ Two distinct cadences:
## Memory and lifetime
-- `AggregateMetric` is **not thread-safe**. It is mutated only by the
- aggregator thread.
+- `AggregateEntry`'s per-bucket counters + histograms are **not thread-safe**;
+ they are mutated only by the aggregator thread.
- `AggregateTable` is **not thread-safe**. All paths (producer-side `CLEAR`,
schedule-driven `REPORT`, drainer-driven inserts) route through the inbox.
- `Canonical` and the cardinality handlers are aggregator-thread-only.
@@ -262,6 +339,81 @@ The producer reports per-trace stats via `HealthMetrics`:
| Cardinality budget exhausted | Overflow values canonicalize to a `blocked_by_tracer` sentinel and merge into one bucket. Total entry count stays bounded by `maxAggregates`. |
| Producer throws mid-trace | Caught by the writer's normal error path; `onClientStatTraceComputed` is not called for that trace. |
+## Behavior under adversarial load
+
+A useful stress test (captured as `AdversarialMetricsBenchmark`): 8 producer
+threads call `publish` in a tight loop with **unique** `(service, operation,
+resource, peer.hostname)` per op, random durations across 30 orders of
+magnitude, and random `error` / `topLevel` flags. Every cardinality dimension
+saturates within milliseconds.
+
+### What "OOM the metrics subsystem" would look like
+
+A successful attack would either grow the aggregator's heap unboundedly, or
+back up the producer so a synchronous structure (cache, map) grew with each
+unique label combination. The current shape rules both out by construction:
+
+- **Inbox is a fixed-size MPSC queue.** Overflow returns `false` from
+ `offer` and the producer drops the snapshot via `onStatsInboxFull`.
+ The snapshot becomes garbage immediately; no queue growth.
+- **`AggregateTable` is a fixed-size bucket array.** Insertion when the
+ table is full triggers an evict-stale pass (one entry with
+ `hitCount == 0`); if that fails the snapshot is dropped via
+ `onStatsAggregateDropped`. The table never resizes.
+- **Cardinality handlers are flat open-addressed arrays.** Overflow values
+ canonicalize to the shared `blocked_by_tracer` sentinel — same hash,
+ same bucket, merged in. No node allocations, no rehash.
+- **Histograms use `CollapsingLowestDenseStore(1024)`.** Bucket array
+ caps at ~8 KB per histogram. Worst case at full table cap: 2048 entries
+ × 2 histograms × ~8 KB ≈ 32 MB. That's the headline upper bound.
+- **Empty error histograms aren't allocated until first error
+ recorded.** Entries that never error keep `errorLatencies = null`,
+ saving the wrapper allocation.
+
+### Measured behavior (1f × 1wi × 3i × 15s, 8 threads each side)
+
+| | master (`ConflatingMetricsAggregator`) | this design (`ClientStatsAggregator`) |
+|---|---:|---:|
+| Iteration 1 throughput | 1,506,007 ops/s | ~5,853,917 ops/s |
+| Iteration 2 throughput | 1,255,258 ops/s | ~5,800,000 ops/s |
+| Iteration 3 throughput | **410,097 ops/s** (-73%) | ~5,853,917 ops/s (stable) |
+| GC time / 15 s wall | iter 1: 8.7 s — iter 2: 9.8 s — iter 3: **18.6 s** (multi-thread GC saturation) | ~150 ms total |
+| Producer allocation | ~1,108 B/op | ~823 B/op |
+| Aggregator thread state at end | "Skipped metrics reporting because the queue is full" + thread idle waiting for inbox | Continuously draining; ~13M snapshots/sec consumed |
+| Inbox-full drops | (no counter on master) | ~139 M dropped over 45 s, all reported via `onStatsInboxFull` |
+| Aggregate-table drops | 0 | 0 |
+
+### Why master degrades
+
+On master, the producer does **everything** synchronously on the calling
+thread: `MetricKey` canonicalization, `DDCache` lookups for each label field,
+`LRUCache` insertion. There is no queue between producer and consumer
+— there *is* no consumer thread for the storage work, only for the
+periodic report. So a 1,108 B/op allocation rate × 8 threads × ~1.5 M ops/s
+generates ~13 GB/sec of garbage on the same thread that has to keep up with
+incoming spans. The young gen fills, then survivor, then old gen, then full
+GC. By iteration 3 the JVM is spending more than its wall-clock budget on
+GC (multiple concurrent GC threads, summed > 15 s during a 15 s window) and
+throughput collapses 73 %.
+
+### Why this design holds
+
+The producer/consumer split converts allocation pressure into **backpressure
+at the inbox boundary**. The producer's per-op work is just "allocate one
+`SpanSnapshot`, set a few `volatile` refs, `inbox.offer`, return." On
+overflow, `offer` returns `false` and the snapshot is dropped on the spot —
+no waiting, no allocation amplification. The aggregator thread runs at
+its natural rate (~13 M snapshots/sec on the test machine), and the gap
+between producer and consumer becomes the drop rate, not heap growth.
+`onStatsInboxFull` makes that gap observable so operators can size
+`tracerMetricsMaxPending` and `tracerMetricsMaxAggregates` for their
+workload.
+
+Net: under adversarial input the new design absorbs what it can compute
+meaningfully and drops what it can't, with both numbers exposed via health
+metrics. The bounded-design properties hold to the ~32 MB worst-case
+ceiling described above.
+
## Why the redesign (history)
The pipeline was previously `ConflatingMetricsAggregator` with:
@@ -299,8 +451,20 @@ showed the producer dominating CPU time. The major shifts:
### Benchmark summary
-`ClientStatsAggregatorDDSpanBenchmark` (64 client-kind DDSpans per op, single
-trace, real `CoreTracer` with a no-op writer):
+Four JMH benchmarks cover the producer pipeline at different angles:
+
+- `ClientStatsAggregatorBenchmark` — 64 SimpleSpans per op, identical labels
+ (consumer-side cache hit path).
+- `ClientStatsAggregatorDDSpanBenchmark` — same as above but real `DDSpan`
+ via `CoreTracer`; exercises the production `isKind` / cached span-kind
+ ordinal path.
+- `ClientStatsAggregatorMissPathBenchmark` — pool of 4096 single-span
+ traces with unique labels; exercises miss + insert + handler saturation.
+- `AdversarialMetricsBenchmark` — 8 threads, unique labels per op, random
+ durations + error flags; pushes every bound to its limit (see
+ [Behavior under adversarial load](#behavior-under-adversarial-load)).
+
+Optimization progression on the DDSpan benchmark:
| Variant | µs/op |
|---|---|
@@ -309,6 +473,11 @@ trace, real `CoreTracer` with a no-op writer):
| with peer-tag schema hoist | 2.410 |
| with cached span-kind ordinal + isSynthetic fix | 1.995 |
+On the producer-bound miss-path benchmark (single-span trace, unique
+labels), `ClientStatsAggregatorMissPathBenchmark` measures **0.057 µs/op +
+96 B/op** vs master's **0.305 µs/op + 399 B/op** — 5.3× faster, 4.2× less
+producer-thread allocation per metrics-eligible span.
+
The remaining producer-thread hotspots (from JFR sampling) are tag-map
lookups for `peer.hostname` / other peer-tag values inside
`capturePeerTagValues`. A bulk peer-tag accessor on `DDSpan` would crack that